Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit e0df55a

Browse files
authored
GH-167: Fix consumer start for close/open shards
Fixes #167 A combo of closing / opening shards leads to consumer not starting
1 parent 10e96e2 commit e0df55a

File tree

2 files changed

+73
-13
lines changed

2 files changed

+73
-13
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,14 +708,16 @@ private void populateConsumer(KinesisShardOffset shardOffset) {
708708
this.consumerExecutor.execute(consumerInvoker);
709709
}
710710
else {
711+
boolean consumerAdded = false;
711712
for (ConsumerInvoker consumerInvoker : this.consumerInvokers) {
712713
if (consumerInvoker.consumers.size() < this.consumerInvokerMaxCapacity) {
713714
consumerInvoker.addConsumer(shardConsumer);
714-
return;
715+
consumerAdded = true;
716+
break;
715717
}
716718
}
717719

718-
if (this.concurrency != 0) {
720+
if (this.concurrency != 0 && !consumerAdded) {
719721
ConsumerInvoker firstConsumerInvoker = this.consumerInvokers.get(0);
720722
firstConsumerInvoker.addConsumer(shardConsumer);
721723
this.consumerInvokerMaxCapacity = firstConsumerInvoker.consumers.size();

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -207,11 +207,12 @@ void testResharding() throws InterruptedException {
207207
Map shardConsumers = TestUtils.getPropertyValue(this.reshardingChannelAdapter, "shardConsumers", Map.class);
208208

209209
int n = 0;
210-
while (!shardConsumers.isEmpty() && n++ < 100) {
210+
while (shardConsumers.size() != 4 && n++ < 100) {
211211
Thread.sleep(100);
212212
}
213213
assertThat(n).isLessThan(100);
214214

215+
215216
// When resharding happens the describeStream() is performed again
216217
verify(this.amazonKinesisForResharding, atLeast(1))
217218
.listShards(any(ListShardsRequest.class));
@@ -222,7 +223,7 @@ void testResharding() throws InterruptedException {
222223

223224
assertThat(kinesisShardEndedEvent).isNotNull()
224225
.extracting(KinesisShardEndedEvent::getShardKey)
225-
.isEqualTo("SpringIntegration:streamForResharding:closedShard");
226+
.isEqualTo("SpringIntegration:streamForResharding:closedShard4");
226227
}
227228

228229
@Configuration
@@ -329,23 +330,79 @@ public PollableChannel kinesisChannel() {
329330
public AmazonKinesis amazonKinesisForResharding() {
330331
AmazonKinesis amazonKinesis = mock(AmazonKinesis.class);
331332

333+
// kinesis handles adding a shard by closing a shard and opening 2 new instead, creating a scenario where it
334+
// happens couple of times
332335
given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM_FOR_RESHARDING)))
333336
.willReturn(new ListShardsResult()
334-
.withShards(new Shard().withShardId("closedShard").withSequenceNumberRange(
335-
new SequenceNumberRange().withEndingSequenceNumber("1"))));
337+
.withShards(
338+
new Shard().withShardId("closedShard1")
339+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1"))))
340+
.willReturn(new ListShardsResult()
341+
.withShards(
342+
new Shard().withShardId("closedShard1")
343+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")),
344+
new Shard().withShardId("newShard2")
345+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")),
346+
new Shard().withShardId("newShard3")
347+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")),
348+
new Shard().withShardId("closedShard4")
349+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4"))))
350+
.willReturn(new ListShardsResult()
351+
.withShards(
352+
new Shard().withShardId("closedShard1")
353+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")),
354+
new Shard().withShardId("newShard2")
355+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")),
356+
new Shard().withShardId("newShard3")
357+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")),
358+
new Shard().withShardId("closedShard4")
359+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4")),
360+
new Shard().withShardId("newShard5")
361+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("5")),
362+
new Shard().withShardId("newShard6")
363+
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("6"))));
364+
365+
366+
setClosedShard(amazonKinesis, "1");
367+
setNewShard(amazonKinesis, "2");
368+
setNewShard(amazonKinesis, "3");
369+
setClosedShard(amazonKinesis, "4");
370+
setNewShard(amazonKinesis, "5");
371+
setNewShard(amazonKinesis, "6");
336372

337-
String shard1Iterator1 = "shard1Iterator1";
373+
return amazonKinesis;
374+
}
375+
376+
private void setClosedShard(AmazonKinesis amazonKinesis, String shardIndex) {
377+
String shardIterator = String.format("shard%sIterator1", shardIndex);
338378

339379
given(amazonKinesis.getShardIterator(
340-
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard").toShardIteratorRequest()))
341-
.willReturn(new GetShardIteratorResult().withShardIterator(shard1Iterator1));
380+
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard" + shardIndex).toShardIteratorRequest()))
381+
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator));
342382

343-
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shard1Iterator1).withLimit(25)))
383+
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(25)))
344384
.willReturn(new GetRecordsResult().withNextShardIterator(null)
345-
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber("1")
385+
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber(shardIndex)
346386
.withData(ByteBuffer.wrap("foo".getBytes()))));
387+
}
347388

348-
return amazonKinesis;
389+
private void setNewShard(AmazonKinesis amazonKinesis, String shardIndex) {
390+
String shardIterator1 = String.format("shard%sIterator1", shardIndex);
391+
String shardIterator2 = String.format("shard%sIterator2", shardIndex);
392+
393+
given(amazonKinesis.getShardIterator(
394+
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "newShard" + shardIndex).toShardIteratorRequest()))
395+
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator1));
396+
397+
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator2).withLimit(25)))
398+
.willReturn(new GetRecordsResult().withNextShardIterator(shardIterator2)
399+
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber(shardIndex)
400+
.withData(ByteBuffer.wrap("foo".getBytes()))));
401+
402+
403+
given(amazonKinesis.getShardIterator(
404+
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "newShard" + shardIndex).toShardIteratorRequest()))
405+
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator2));
349406
}
350407

351408
@Bean
@@ -357,6 +414,7 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() {
357414
adapter.setStartTimeout(10000);
358415
adapter.setDescribeStreamRetries(1);
359416
adapter.setRecordsLimit(25);
417+
adapter.setConcurrency(1);
360418

361419
DirectFieldAccessor dfa = new DirectFieldAccessor(adapter);
362420
dfa.setPropertyValue("describeStreamBackoff", 10);

0 commit comments

Comments
 (0)