|
31 | 31 | import java.util.UUID; |
32 | 32 | import java.util.concurrent.CompletableFuture; |
33 | 33 | import java.util.concurrent.ExecutionException; |
| 34 | +import java.util.concurrent.atomic.AtomicLong; |
34 | 35 | import java.util.stream.Collectors; |
35 | 36 | import java.util.stream.IntStream; |
| 37 | +import java.util.stream.Stream; |
36 | 38 |
|
37 | 39 | /** |
38 | | - * @author Mark Vollmary |
| 40 | + * @author Michele Rastelli |
39 | 41 | */ |
40 | 42 | public class ImportDocumentExample extends ExampleBase { |
41 | 43 |
|
42 | 44 | private static final Logger log = LoggerFactory.getLogger(ImportDocumentExample.class); |
| 45 | + private static final int MAX_PENDING_REQUESTS = 100; |
43 | 46 |
|
44 | 47 | @Test |
45 | 48 | public void importDocument() { |
46 | | - List<CompletableFuture<DocumentImportEntity>> completableFutures = |
47 | | - IntStream.range(0, 100) |
48 | | - .mapToObj(i -> IntStream.range(0, 500) |
49 | | - .mapToObj(it -> new TestEntity(UUID.randomUUID().toString())).collect(Collectors.toList()) |
50 | | - ) |
51 | | - .map(p -> collection.importDocuments(p, new DocumentImportOptions())) |
52 | | - .collect(Collectors.toList()); |
| 49 | + AtomicLong pendingReqsCount = new AtomicLong(); |
| 50 | + |
| 51 | + Stream<List<TestEntity>> chunks = IntStream.range(0, 1000) |
| 52 | + .mapToObj(i -> IntStream.range(0, 500) |
| 53 | + .mapToObj(it -> new TestEntity(UUID.randomUUID().toString())).collect(Collectors.toList()) |
| 54 | + ); |
| 55 | + |
| 56 | + List<CompletableFuture<DocumentImportEntity>> completableFutures = chunks |
| 57 | + .map(p -> { |
| 58 | + // wait for pending requests |
| 59 | + while (pendingReqsCount.get() > MAX_PENDING_REQUESTS) { |
| 60 | + try { |
| 61 | + Thread.sleep(10); |
| 62 | + } catch (InterruptedException e) { |
| 63 | + e.printStackTrace(); |
| 64 | + } |
| 65 | + } |
| 66 | + |
| 67 | + pendingReqsCount.incrementAndGet(); |
| 68 | + return collection.importDocuments(p, new DocumentImportOptions()) |
| 69 | + .thenApply(it -> { |
| 70 | + pendingReqsCount.decrementAndGet(); |
| 71 | + log.info(String.valueOf(it.getCreated())); |
| 72 | + return it; |
| 73 | + }); |
| 74 | + } |
| 75 | + ) |
| 76 | + .collect(Collectors.toList()); |
53 | 77 |
|
54 | 78 | completableFutures.forEach(cf -> { |
55 | 79 | try { |
56 | | - log.info(String.valueOf(cf.get().getCreated())); |
| 80 | + cf.get(); |
57 | 81 | } catch (InterruptedException | ExecutionException e) { |
58 | 82 | e.printStackTrace(); |
59 | 83 | } |
|
0 commit comments