Skip to content

Commit f5ddd9f

Browse files
authored
Bugifx/async tests (#22)
* bugfix async tests * bugfix async tests: ArangoCollectionTest * bugfix async tests: ArangoDatabaseTest * bugfix async tests: ArangoDBTest * bugfix pending futures * added dbg console logs * simplified ArangoExecutorAsync::execute * working deleteIndex test example * bugfix cascade futures execution * nestedGetVersion test * cleanup tests console logs * fix geo test * fix explain test * bugfix updateDocuments test * commented getSelectivityEstimate assertions * concurrency limit test * output buffer thread test * outgoing executor service
1 parent 1aea502 commit f5ddd9f

File tree

9 files changed

+3826
-3972
lines changed

9 files changed

+3826
-3972
lines changed

src/main/java/com/arangodb/internal/ArangoExecutorAsync.java

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,65 +20,75 @@
2020

2121
package com.arangodb.internal;
2222

23-
import java.io.IOException;
24-
import java.lang.reflect.Type;
25-
import java.util.concurrent.CompletableFuture;
26-
2723
import com.arangodb.ArangoDBException;
2824
import com.arangodb.internal.net.HostHandle;
2925
import com.arangodb.internal.util.ArangoSerializationFactory;
3026
import com.arangodb.internal.velocystream.VstCommunicationAsync;
3127
import com.arangodb.velocypack.exception.VPackException;
3228
import com.arangodb.velocystream.Request;
3329

30+
import java.io.IOException;
31+
import java.lang.reflect.Type;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
35+
3436
/**
3537
* @author Mark Vollmary
36-
*
38+
* @author Michele Rastelli
3739
*/
3840
public class ArangoExecutorAsync extends ArangoExecutor {
3941

40-
private final VstCommunicationAsync communication;
42+
private final VstCommunicationAsync communication;
43+
private final ExecutorService outgoingExecutor = Executors.newSingleThreadExecutor();
44+
45+
public ArangoExecutorAsync(final VstCommunicationAsync communication, final ArangoSerializationFactory util,
46+
final DocumentCache documentCache) {
47+
super(util, documentCache);
48+
this.communication = communication;
49+
}
4150

42-
public ArangoExecutorAsync(final VstCommunicationAsync communication, final ArangoSerializationFactory util,
43-
final DocumentCache documentCache) {
44-
super(util, documentCache);
45-
this.communication = communication;
46-
}
51+
public <T> CompletableFuture<T> execute(final Request request, final Type type) {
52+
return execute(request, (response) -> createResult(type, response));
53+
}
4754

48-
public <T> CompletableFuture<T> execute(final Request request, final Type type) {
49-
return execute(request, (response) -> createResult(type, response));
50-
}
55+
public <T> CompletableFuture<T> execute(final Request request, final Type type, final HostHandle hostHandle) {
56+
return execute(request, (response) -> createResult(type, response), hostHandle);
57+
}
5158

52-
public <T> CompletableFuture<T> execute(final Request request, final Type type, final HostHandle hostHandle) {
53-
return execute(request, (response) -> createResult(type, response), hostHandle);
54-
}
59+
public <T> CompletableFuture<T> execute(final Request request, final ResponseDeserializer<T> responseDeserializer) {
60+
return execute(request, responseDeserializer, null);
61+
}
5562

56-
public <T> CompletableFuture<T> execute(final Request request, final ResponseDeserializer<T> responseDeserializer) {
57-
return execute(request, responseDeserializer, null);
58-
}
63+
public <T> CompletableFuture<T> execute(
64+
final Request request,
65+
final ResponseDeserializer<T> responseDeserializer,
66+
final HostHandle hostHandle) {
5967

60-
public <T> CompletableFuture<T> execute(
61-
final Request request,
62-
final ResponseDeserializer<T> responseDeserializer,
63-
final HostHandle hostHandle) {
64-
final CompletableFuture<T> result = new CompletableFuture<>();
65-
communication.execute(request, hostHandle).whenComplete((response, ex) -> {
66-
if (response != null) {
67-
try {
68-
result.complete(responseDeserializer.deserialize(response));
69-
} catch (final VPackException | ArangoDBException e) {
70-
result.completeExceptionally(e);
71-
}
72-
} else if (ex != null) {
73-
result.completeExceptionally(ex);
74-
} else {
75-
result.cancel(true);
76-
}
77-
});
78-
return result;
79-
}
68+
CompletableFuture<T> result = new CompletableFuture<>();
69+
outgoingExecutor.execute(() -> {
70+
try {
71+
communication.execute(request, hostHandle)
72+
.whenCompleteAsync((response, ex) -> {
73+
if (ex != null) {
74+
result.completeExceptionally(ex);
75+
} else if (response != null) {
76+
try {
77+
result.complete(responseDeserializer.deserialize(response));
78+
} catch (final VPackException | ArangoDBException e) {
79+
result.completeExceptionally(e);
80+
}
81+
}
82+
});
83+
} catch (ArangoDBException e) {
84+
result.completeExceptionally(e);
85+
}
86+
}
87+
);
88+
return result;
89+
}
8090

81-
public void disconnect() throws IOException {
82-
communication.close();
83-
}
91+
public void disconnect() throws IOException {
92+
communication.close();
93+
}
8494
}

0 commit comments

Comments
 (0)