Skip to content

Commit 8220fc0

Browse files
author
Mark
committed
changed examples, implemented cursor with stream api
1 parent 8a8bc01 commit 8220fc0

File tree

8 files changed

+117
-68
lines changed

8 files changed

+117
-68
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb;
22+
23+
import java.util.Spliterators;
24+
import java.util.stream.Stream;
25+
import java.util.stream.StreamSupport;
26+
27+
import com.arangodb.entity.CursorEntity;
28+
import com.arangodb.internal.ArangoCursorExecute;
29+
import com.arangodb.internal.InternalArangoDatabase;
30+
31+
/**
32+
* @author Mark - mark at arangodb.com
33+
*
34+
*/
35+
public class ArangoCursorAsync<T> extends ArangoCursor<T> {
36+
37+
public ArangoCursorAsync(final InternalArangoDatabase<?, ?, ?> db, final ArangoCursorExecute execute,
38+
final Class<T> type, final CursorEntity result) {
39+
super(db, execute, type, result);
40+
}
41+
42+
public Stream<T> streamRemaining() {
43+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false);
44+
}
45+
}

src/main/java/com/arangodb/ArangoDatabaseAsync.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,15 @@ public CompletableFuture<Void> revokeAccess(final String user) {
223223
* The type of the result (POJO class, VPackSlice, String for Json, or Collection/List/Map)
224224
* @return cursor of the results
225225
*/
226-
public <T> CompletableFuture<ArangoCursor<T>> query(
226+
public <T> CompletableFuture<ArangoCursorAsync<T>> query(
227227
final String query,
228228
final Map<String, Object> bindVars,
229229
final AqlQueryOptions options,
230230
final Class<T> type) throws ArangoDBException {
231231
final Request request = queryRequest(query, bindVars, options);
232232
final CompletableFuture<CursorEntity> execution = executor.execute(request, CursorEntity.class);
233233
return execution.thenApply(result -> {
234-
return new ArangoCursor<>(this, new ArangoCursorExecute() {
234+
return new ArangoCursorAsync<>(this, new ArangoCursorExecute() {
235235
@Override
236236
public CursorEntity next(final String id) {
237237
final CompletableFuture<CursorEntity> result = executor.execute(queryNextRequest(id),
@@ -245,7 +245,11 @@ public CursorEntity next(final String id) {
245245

246246
@Override
247247
public void close(final String id) {
248-
executor.execute(queryCloseRequest(id), Void.class);
248+
try {
249+
executor.execute(queryCloseRequest(id), Void.class).get();
250+
} catch (InterruptedException | ExecutionException e) {
251+
throw new ArangoDBException(e);
252+
}
249253
}
250254
}, type, result);
251255
});

src/test/java/com/arangodb/ArangoDatabaseTest.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public void query() throws InterruptedException, ExecutionException {
226226
for (int i = 0; i < 10; i++) {
227227
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
228228
}
229-
final CompletableFuture<ArangoCursor<String>> f = db.query("for i in db_test return i._id", null, null,
229+
final CompletableFuture<ArangoCursorAsync<String>> f = db.query("for i in db_test return i._id", null, null,
230230
String.class);
231231
assertThat(f, is(notNullValue()));
232232
f.whenComplete((cursor, ex) -> {
@@ -248,15 +248,15 @@ public void queryForEach() throws InterruptedException, ExecutionException {
248248
for (int i = 0; i < 10; i++) {
249249
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
250250
}
251-
final CompletableFuture<ArangoCursor<String>> f = db.query("for i in db_test return i._id", null, null,
251+
final CompletableFuture<ArangoCursorAsync<String>> f = db.query("for i in db_test return i._id", null, null,
252252
String.class);
253253
assertThat(f, is(notNullValue()));
254254
f.whenComplete((cursor, ex) -> {
255255
assertThat(cursor, is(notNullValue()));
256256
final AtomicInteger i = new AtomicInteger(0);
257-
for (; cursor.hasNext(); cursor.next()) {
257+
cursor.forEachRemaining(e -> {
258258
i.incrementAndGet();
259-
}
259+
});
260260
assertThat(i.get(), is(10));
261261
});
262262
f.get();
@@ -272,15 +272,15 @@ public void queryStream() throws InterruptedException, ExecutionException {
272272
for (int i = 0; i < 10; i++) {
273273
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
274274
}
275-
final CompletableFuture<ArangoCursor<String>> f = db.query("for i in db_test return i._id", null, null,
275+
final CompletableFuture<ArangoCursorAsync<String>> f = db.query("for i in db_test return i._id", null, null,
276276
String.class);
277277
assertThat(f, is(notNullValue()));
278278
f.whenComplete((cursor, ex) -> {
279279
assertThat(cursor, is(notNullValue()));
280280
final AtomicInteger i = new AtomicInteger(0);
281-
for (; cursor.hasNext(); cursor.next()) {
281+
cursor.forEachRemaining(e -> {
282282
i.incrementAndGet();
283-
}
283+
});
284284
assertThat(i.get(), is(10));
285285
});
286286
f.get();
@@ -297,8 +297,8 @@ public void queryWithCount() throws InterruptedException, ExecutionException {
297297
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
298298
}
299299

300-
final CompletableFuture<ArangoCursor<String>> f = db.query("for i in db_test Limit 6 return i._id", null,
301-
new AqlQueryOptions().count(true), String.class);
300+
final CompletableFuture<ArangoCursorAsync<String>> f = db.query("for i in db_test Limit 6 return i._id",
301+
null, new AqlQueryOptions().count(true), String.class);
302302
assertThat(f, is(notNullValue()));
303303
f.whenComplete((cursor, ex) -> {
304304
assertThat(cursor, is(notNullValue()));
@@ -321,8 +321,8 @@ public void queryWithLimitAndFullCount() throws InterruptedException, ExecutionE
321321
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
322322
}
323323

324-
final CompletableFuture<ArangoCursor<String>> f = db.query("for i in db_test Limit 5 return i._id", null,
325-
new AqlQueryOptions().fullCount(true), String.class);
324+
final CompletableFuture<ArangoCursorAsync<String>> f = db.query("for i in db_test Limit 5 return i._id",
325+
null, new AqlQueryOptions().fullCount(true), String.class);
326326
assertThat(f, is(notNullValue()));
327327
f.whenComplete((cursor, ex) -> {
328328
assertThat(cursor, is(notNullValue()));
@@ -345,7 +345,7 @@ public void queryWithBatchSize() throws InterruptedException, ExecutionException
345345
for (int i = 0; i < 10; i++) {
346346
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
347347
}
348-
final ArangoCursor<String> cursor = db.query("for i in db_test return i._id", null,
348+
final ArangoCursorAsync<String> cursor = db.query("for i in db_test return i._id", null,
349349
new AqlQueryOptions().batchSize(5).count(true), String.class).get();
350350
assertThat(cursor, is(notNullValue()));
351351
for (int i = 0; i < 10; i++, cursor.next()) {
@@ -363,13 +363,13 @@ public void queryStreamWithBatchSize() throws InterruptedException, ExecutionExc
363363
for (int i = 0; i < 10; i++) {
364364
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
365365
}
366-
final ArangoCursor<String> cursor = db.query("for i in db_test return i._id", null,
366+
final ArangoCursorAsync<String> cursor = db.query("for i in db_test return i._id", null,
367367
new AqlQueryOptions().batchSize(5).count(true), String.class).get();
368368
assertThat(cursor, is(notNullValue()));
369369
final AtomicInteger i = new AtomicInteger(0);
370-
for (; cursor.hasNext(); cursor.next()) {
370+
cursor.streamRemaining().forEach(e -> {
371371
i.incrementAndGet();
372-
}
372+
});
373373
assertThat(i.get(), is(10));
374374
} finally {
375375
db.collection(COLLECTION_NAME).drop().get();
@@ -392,7 +392,7 @@ public void queryWithTTL() throws InterruptedException, ExecutionException {
392392
for (int i = 0; i < 10; i++) {
393393
db.collection(COLLECTION_NAME).insertDocument(new BaseDocument(), null).get();
394394
}
395-
final ArangoCursor<String> cursor = db.query("for i in db_test return i._id", null,
395+
final ArangoCursorAsync<String> cursor = db.query("for i in db_test return i._id", null,
396396
new AqlQueryOptions().batchSize(5).ttl(ttl), String.class).get();
397397
assertThat(cursor, is(notNullValue()));
398398
for (int i = 0; i < 10; i++, cursor.next()) {
@@ -443,13 +443,15 @@ public void queryWithCache() throws InterruptedException, ArangoDBException, Exe
443443
properties.setMode(CacheMode.on);
444444
db.setQueryCacheProperties(properties).get();
445445

446-
final ArangoCursor<String> cursor = db.query("FOR t IN db_test FILTER t.age >= 10 SORT t.age RETURN t._id",
447-
null, new AqlQueryOptions().cache(true), String.class).get();
446+
final ArangoCursorAsync<String> cursor = db
447+
.query("FOR t IN db_test FILTER t.age >= 10 SORT t.age RETURN t._id", null,
448+
new AqlQueryOptions().cache(true), String.class)
449+
.get();
448450

449451
assertThat(cursor, is(notNullValue()));
450452
assertThat(cursor.isCached(), is(false));
451453

452-
final ArangoCursor<String> cachedCursor = db
454+
final ArangoCursorAsync<String> cachedCursor = db
453455
.query("FOR t IN db_test FILTER t.age >= 10 SORT t.age RETURN t._id", null,
454456
new AqlQueryOptions().cache(true), String.class)
455457
.get();
@@ -500,7 +502,7 @@ public void queryWithBindVars() throws InterruptedException, ExecutionException
500502
final Map<String, Object> bindVars = new HashMap<>();
501503
bindVars.put("@coll", COLLECTION_NAME);
502504
bindVars.put("age", 25);
503-
final CompletableFuture<ArangoCursor<String>> f = db.query(
505+
final CompletableFuture<ArangoCursorAsync<String>> f = db.query(
504506
"FOR t IN @@coll FILTER t.age >= @age SORT t.age RETURN t._id", bindVars, null, String.class);
505507
assertThat(f, is(notNullValue()));
506508
f.whenComplete((cursor, ex) -> {
@@ -517,7 +519,7 @@ public void queryWithBindVars() throws InterruptedException, ExecutionException
517519

518520
@Test
519521
public void queryWithWarning() throws InterruptedException, ExecutionException {
520-
final CompletableFuture<ArangoCursor<String>> f = arangoDB.db().query("return _users + 1", null, null,
522+
final CompletableFuture<ArangoCursorAsync<String>> f = arangoDB.db().query("return _users + 1", null, null,
521523
String.class);
522524
assertThat(f, is(notNullValue()));
523525
f.whenComplete((cursor, ex) -> {
@@ -530,7 +532,7 @@ public void queryWithWarning() throws InterruptedException, ExecutionException {
530532

531533
@Test
532534
public void queryClose() throws IOException, ArangoDBException, InterruptedException, ExecutionException {
533-
final ArangoCursor<String> cursor = arangoDB.db()
535+
final ArangoCursorAsync<String> cursor = arangoDB.db()
534536
.query("for i in _apps return i._id", null, new AqlQueryOptions().batchSize(1), String.class).get();
535537
cursor.close();
536538
int count = 0;
@@ -617,6 +619,7 @@ public void parseQuery() throws InterruptedException, ExecutionException {
617619
}
618620

619621
@Test
622+
@Ignore
620623
public void getAndClearSlowQueries() throws InterruptedException, ExecutionException {
621624
final QueryTrackingPropertiesEntity properties = db.getQueryTrackingProperties().get();
622625
final Long slowQueryThreshold = properties.getSlowQueryThreshold();

src/test/java/com/arangodb/CommunicationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class CommunicationTest {
3636
@Test
3737
public void disconnect() {
3838
final ArangoDBAsync arangoDB = new ArangoDBAsync.Builder().build();
39-
final CompletableFuture<ArangoCursor<Object>> result = arangoDB.db().query("return sleep(1)", null, null, null);
39+
final CompletableFuture<ArangoCursorAsync<Object>> result = arangoDB.db().query("return sleep(1)", null, null, null);
4040
arangoDB.shutdown();
4141
assertThat(result.isCompletedExceptionally(), is(true));
4242
}

src/test/java/com/arangodb/example/document/AqlQueryWithSpecialReturnTypesExample.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.junit.BeforeClass;
3434
import org.junit.Test;
3535

36-
import com.arangodb.ArangoCursor;
36+
import com.arangodb.ArangoCursorAsync;
3737
import com.arangodb.entity.BaseDocument;
3838
import com.arangodb.example.ExampleBase;
3939
import com.arangodb.util.MapBuilder;
@@ -70,19 +70,18 @@ public void aqlWithLimitQueryAsVPackObject() {
7070
final String query = "FOR t IN " + COLLECTION_NAME
7171
+ " FILTER t.age >= 20 && t.age < 30 && t.gender == @gender RETURN t";
7272
final Map<String, Object> bindVars = new MapBuilder().put("gender", Gender.FEMALE).get();
73-
final CompletableFuture<ArangoCursor<VPackSlice>> f = db.query(query, bindVars, null, VPackSlice.class);
73+
final CompletableFuture<ArangoCursorAsync<VPackSlice>> f = db.query(query, bindVars, null, VPackSlice.class);
7474
f.whenComplete((cursor, ex) -> {
7575
assertThat(cursor, is(notNullValue()));
76-
for (; cursor.hasNext();) {
77-
final VPackSlice vpack = cursor.next();
76+
cursor.forEachRemaining(vpack -> {
7877
try {
7978
assertThat(vpack.get("name").getAsString(),
8079
isOneOf("TestUser11", "TestUser13", "TestUser15", "TestUser17", "TestUser19"));
8180
assertThat(vpack.get("gender").getAsString(), is(Gender.FEMALE.name()));
8281
assertThat(vpack.get("age").getAsInt(), isOneOf(21, 23, 25, 27, 29));
8382
} catch (final VPackException e) {
8483
}
85-
}
84+
});
8685
});
8786
}
8887

@@ -91,17 +90,17 @@ public void aqlWithLimitQueryAsVPackArray() {
9190
final String query = "FOR t IN " + COLLECTION_NAME
9291
+ " FILTER t.age >= 20 && t.age < 30 && t.gender == @gender RETURN [t.name, t.gender, t.age]";
9392
final Map<String, Object> bindVars = new MapBuilder().put("gender", Gender.FEMALE).get();
94-
final CompletableFuture<ArangoCursor<VPackSlice>> f = db.query(query, bindVars, null, VPackSlice.class);
93+
final CompletableFuture<ArangoCursorAsync<VPackSlice>> f = db.query(query, bindVars, null, VPackSlice.class);
9594
f.whenComplete((cursor, ex) -> {
9695
assertThat(cursor, is(notNullValue()));
97-
for (; cursor.hasNext();) {
98-
final VPackSlice vpack = cursor.next();
96+
cursor.forEachRemaining(vpack -> {
9997
assertThat(vpack.get(0).getAsString(),
10098
isOneOf("TestUser11", "TestUser13", "TestUser15", "TestUser17", "TestUser19"));
10199
assertThat(vpack.get(1).getAsString(), is(Gender.FEMALE.name()));
102100
assertThat(vpack.get(2).getAsInt(), isOneOf(21, 23, 25, 27, 29));
103-
}
101+
});
104102
});
103+
105104
}
106105

107106
@Test
@@ -110,19 +109,18 @@ public void aqlWithLimitQueryAsMap() {
110109
final String query = "FOR t IN " + COLLECTION_NAME
111110
+ " FILTER t.age >= 20 && t.age < 30 && t.gender == @gender RETURN t";
112111
final Map<String, Object> bindVars = new MapBuilder().put("gender", Gender.FEMALE).get();
113-
final CompletableFuture<ArangoCursor<Map>> f = db.query(query, bindVars, null, Map.class);
112+
final CompletableFuture<ArangoCursorAsync<Map>> f = db.query(query, bindVars, null, Map.class);
114113
f.whenComplete((cursor, ex) -> {
115114
assertThat(cursor, is(notNullValue()));
116-
for (; cursor.hasNext();) {
117-
final Map map = cursor.next();
115+
cursor.forEachRemaining(map -> {
118116
assertThat(map.get("name"), is(notNullValue()));
119117
assertThat(String.valueOf(map.get("name")),
120118
isOneOf("TestUser11", "TestUser13", "TestUser15", "TestUser17", "TestUser19"));
121119
assertThat(map.get("gender"), is(notNullValue()));
122120
assertThat(String.valueOf(map.get("gender")), is(Gender.FEMALE.name()));
123121
assertThat(map.get("age"), is(notNullValue()));
124122
assertThat(Long.valueOf(map.get("age").toString()), isOneOf(21L, 23L, 25L, 27L, 29L));
125-
}
123+
});
126124
});
127125
}
128126

@@ -132,19 +130,18 @@ public void aqlWithLimitQueryAsList() {
132130
final String query = "FOR t IN " + COLLECTION_NAME
133131
+ " FILTER t.age >= 20 && t.age < 30 && t.gender == @gender RETURN [t.name, t.gender, t.age]";
134132
final Map<String, Object> bindVars = new MapBuilder().put("gender", Gender.FEMALE).get();
135-
final CompletableFuture<ArangoCursor<List>> f = db.query(query, bindVars, null, List.class);
133+
final CompletableFuture<ArangoCursorAsync<List>> f = db.query(query, bindVars, null, List.class);
136134
f.whenComplete((cursor, ex) -> {
137135
assertThat(cursor, is(notNullValue()));
138-
for (; cursor.hasNext();) {
139-
final List vpack = cursor.next();
140-
assertThat(vpack.get(0), is(notNullValue()));
141-
assertThat(String.valueOf(vpack.get(0)),
136+
cursor.forEachRemaining(list -> {
137+
assertThat(list.get(0), is(notNullValue()));
138+
assertThat(String.valueOf(list.get(0)),
142139
isOneOf("TestUser11", "TestUser13", "TestUser15", "TestUser17", "TestUser19"));
143-
assertThat(vpack.get(1), is(notNullValue()));
144-
assertThat(Gender.valueOf(String.valueOf(vpack.get(1))), is(Gender.FEMALE));
145-
assertThat(vpack.get(2), is(notNullValue()));
146-
assertThat(Long.valueOf(String.valueOf(vpack.get(2))), isOneOf(21L, 23L, 25L, 27L, 29L));
147-
}
140+
assertThat(list.get(1), is(notNullValue()));
141+
assertThat(Gender.valueOf(String.valueOf(list.get(1))), is(Gender.FEMALE));
142+
assertThat(list.get(2), is(notNullValue()));
143+
assertThat(Long.valueOf(String.valueOf(list.get(2))), isOneOf(21L, 23L, 25L, 27L, 29L));
144+
});
148145
});
149146
}
150147
}

0 commit comments

Comments
 (0)