Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 2b25b7e

Browse files
Greg Ealesartembilan
authored andcommitted
GH-177: Checkpoint closed shards for Kinesis
Fixes #177 * When the end of a shard is detected, checkpoint with the `endingSequenceNumber` * Add a test * code review * Default to directly checkpointing closed shards except when in manual checkpoint mode * update tests * `endingSequenceNumber` should be higher than the last record's sequence number * Checkpoint when in manual mode if shard was empty
1 parent 5a3c6a1 commit 2b25b7e

File tree

2 files changed

+74
-21
lines changed

2 files changed

+74
-21
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@
9191
* @author Krzysztof Witkowski
9292
* @author Hervé Fortin
9393
* @author Dirk Bonhomme
94+
* @author Greg Eales
95+
*
9496
* @since 1.1
9597
*/
9698
@ManagedResource
@@ -293,7 +295,6 @@ public void setStartTimeout(int startTimeout) {
293295
* will be processed sequentially. In other words each shard is tied with the particular thread.
294296
* By default the concurrency is unlimited and shard is processed in the {@link #consumerExecutor}
295297
* directly.
296-
*
297298
* @param concurrency the concurrency maximum number
298299
*/
299300
public void setConcurrency(int concurrency) {
@@ -303,7 +304,6 @@ public void setConcurrency(int concurrency) {
303304
/**
304305
* The sleep interval in milliseconds used in the main loop between shards polling cycles.
305306
* Defaults to {@code 1000}l minimum {@code 250}.
306-
*
307307
* @param idleBetweenPolls the interval to sleep between shards polling cycles.
308308
*/
309309
public void setIdleBetweenPolls(int idleBetweenPolls) {
@@ -313,7 +313,6 @@ public void setIdleBetweenPolls(int idleBetweenPolls) {
313313
/**
314314
* Specify an {@link InboundMessageMapper} to extract message headers embedded into the record
315315
* data.
316-
*
317316
* @param embeddedHeadersMapper the {@link InboundMessageMapper} to use.
318317
* @since 2.0
319318
*/
@@ -324,7 +323,6 @@ public void setEmbeddedHeadersMapper(InboundMessageMapper<byte[]> embeddedHeader
324323
/**
325324
* Specify a {@link LockRegistry} for an exclusive access to provided streams. This is not used
326325
* when shards-based configuration is provided.
327-
*
328326
* @param lockRegistry the {@link LockRegistry} to use.
329327
* @since 2.0
330328
*/
@@ -335,7 +333,6 @@ public void setLockRegistry(LockRegistry lockRegistry) {
335333
/**
336334
* Set to true to bind the source consumer record in the header named {@link
337335
* IntegrationMessageHeaderAccessor#SOURCE_DATA}. Does not apply to batch listeners.
338-
*
339336
* @param bindSourceRecord true to bind.
340337
* @since 2.2
341338
*/
@@ -347,6 +344,7 @@ public void setBindSourceRecord(boolean bindSourceRecord) {
347344
* Specify a {@link Function Function&lt;List&lt;Shard&gt;, List&lt;Shard&gt;&gt;} to filter the shards which will
348345
* be read from.
349346
* @param shardListFilter the filter {@link Function Function&lt;List&lt;Shard&gt;, List&lt;Shard&gt;&gt;}
347+
* @since 2.3.4
350348
*/
351349
public void setShardListFilter(Function<List<Shard>, List<Shard>> shardListFilter) {
352350
this.shardListFilter = shardListFilter;
@@ -1027,6 +1025,22 @@ private Runnable processTask() {
10271025
.remove(this.key);
10281026
}
10291027
// Shard is closed: nothing to consume any more.
1028+
// Checkpoint endingSequenceNumber to ensure shard is marked exhausted.
1029+
// If in CheckpointMode.manual, only checkpoint if lastCheckpointValue is also null, as this
1030+
// means that no records have ever been read and so the shard was empty
1031+
if (!CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)
1032+
|| this.checkpointer.getLastCheckpointValue() == null) {
1033+
for (Shard shard : readShardList(this.shardOffset.getStream())) {
1034+
if (shard.getShardId().equals(this.shardOffset.getShard())) {
1035+
String endingSequenceNumber =
1036+
shard.getSequenceNumberRange().getEndingSequenceNumber();
1037+
if (endingSequenceNumber != null) {
1038+
this.checkpointer.checkpoint(endingSequenceNumber);
1039+
}
1040+
break;
1041+
}
1042+
}
1043+
}
10301044
// Resharding is possible.
10311045
if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
10321046
KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent(

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
/**
7575
* @author Artem Bilan
7676
* @author Matthias Wesolowski
77+
* @author Greg Eales
78+
*
7779
* @since 1.1
7880
*/
7981
@SpringJUnitConfig
@@ -93,6 +95,9 @@ public class KinesisMessageDrivenChannelAdapterTests {
9395
@Autowired
9496
private MetadataStore checkpointStore;
9597

98+
@Autowired
99+
private MetadataStore reshardingCheckpointStore;
100+
96101
@Autowired
97102
private KinesisMessageDrivenChannelAdapter reshardingChannelAdapter;
98103

@@ -108,7 +113,7 @@ void setup() {
108113
}
109114

110115
@Test
111-
@SuppressWarnings({"unchecked", "rawtypes"})
116+
@SuppressWarnings({ "unchecked", "rawtypes" })
112117
void testKinesisMessageDrivenChannelAdapter() {
113118
this.kinesisMessageDrivenChannelAdapter.start();
114119
final Set<KinesisShardOffset> shardOffsets = TestUtils.getPropertyValue(this.kinesisMessageDrivenChannelAdapter,
@@ -219,11 +224,14 @@ void testResharding() throws InterruptedException {
219224

220225
this.reshardingChannelAdapter.stop();
221226

227+
assertThat(this.reshardingCheckpointStore.get("SpringIntegration:streamForResharding:closedEmptyShard5"))
228+
.isEqualTo("50");
229+
222230
KinesisShardEndedEvent kinesisShardEndedEvent = this.config.shardEndedEventReference.get();
223231

224232
assertThat(kinesisShardEndedEvent).isNotNull()
225233
.extracting(KinesisShardEndedEvent::getShardKey)
226-
.isEqualTo("SpringIntegration:streamForResharding:closedShard4");
234+
.isEqualTo("SpringIntegration:streamForResharding:closedEmptyShard5");
227235
}
228236

229237
@Configuration
@@ -336,39 +344,51 @@ public AmazonKinesis amazonKinesisForResharding() {
336344
.willReturn(new ListShardsResult()
337345
.withShards(
338346
new Shard().withShardId("closedShard1")
339-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1"))))
347+
.withSequenceNumberRange(new SequenceNumberRange()
348+
.withEndingSequenceNumber("10"))))
340349
.willReturn(new ListShardsResult()
341350
.withShards(
342351
new Shard().withShardId("closedShard1")
343-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")),
352+
.withSequenceNumberRange(new SequenceNumberRange()
353+
.withEndingSequenceNumber("10")),
344354
new Shard().withShardId("newShard2")
345-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")),
355+
.withSequenceNumberRange(new SequenceNumberRange()),
346356
new Shard().withShardId("newShard3")
347-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")),
357+
.withSequenceNumberRange(new SequenceNumberRange()),
348358
new Shard().withShardId("closedShard4")
349-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4"))))
359+
.withSequenceNumberRange(new SequenceNumberRange()
360+
.withEndingSequenceNumber("40")),
361+
new Shard().withShardId("closedEmptyShard5")
362+
.withSequenceNumberRange(new SequenceNumberRange()
363+
.withEndingSequenceNumber("50"))))
350364
.willReturn(new ListShardsResult()
351365
.withShards(
352366
new Shard().withShardId("closedShard1")
353-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("1")),
367+
.withSequenceNumberRange(new SequenceNumberRange()
368+
.withEndingSequenceNumber("10")),
354369
new Shard().withShardId("newShard2")
355-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("2")),
370+
.withSequenceNumberRange(new SequenceNumberRange()),
356371
new Shard().withShardId("newShard3")
357-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("3")),
372+
.withSequenceNumberRange(new SequenceNumberRange()),
358373
new Shard().withShardId("closedShard4")
359-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("4")),
360-
new Shard().withShardId("newShard5")
361-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("5")),
374+
.withSequenceNumberRange(new SequenceNumberRange()
375+
.withEndingSequenceNumber("40")),
376+
new Shard().withShardId("closedEmptyShard5")
377+
.withSequenceNumberRange(new SequenceNumberRange()
378+
.withEndingSequenceNumber("50")),
362379
new Shard().withShardId("newShard6")
363-
.withSequenceNumberRange(new SequenceNumberRange().withEndingSequenceNumber("6"))));
380+
.withSequenceNumberRange(new SequenceNumberRange()),
381+
new Shard().withShardId("newShard7")
382+
.withSequenceNumberRange(new SequenceNumberRange())));
364383

365384

366385
setClosedShard(amazonKinesis, "1");
367386
setNewShard(amazonKinesis, "2");
368387
setNewShard(amazonKinesis, "3");
369388
setClosedShard(amazonKinesis, "4");
370-
setNewShard(amazonKinesis, "5");
389+
setClosedEmptyShard(amazonKinesis, "5");
371390
setNewShard(amazonKinesis, "6");
391+
setNewShard(amazonKinesis, "7");
372392

373393
return amazonKinesis;
374394
}
@@ -377,7 +397,8 @@ private void setClosedShard(AmazonKinesis amazonKinesis, String shardIndex) {
377397
String shardIterator = String.format("shard%sIterator1", shardIndex);
378398

379399
given(amazonKinesis.getShardIterator(
380-
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard" + shardIndex).toShardIteratorRequest()))
400+
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedShard" + shardIndex)
401+
.toShardIteratorRequest()))
381402
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator));
382403

383404
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(25)))
@@ -386,6 +407,18 @@ private void setClosedShard(AmazonKinesis amazonKinesis, String shardIndex) {
386407
.withData(ByteBuffer.wrap("foo".getBytes()))));
387408
}
388409

410+
private void setClosedEmptyShard(AmazonKinesis amazonKinesis, String shardIndex) {
411+
String shardIterator = String.format("shard%sIterator1", shardIndex);
412+
413+
given(amazonKinesis.getShardIterator(
414+
KinesisShardOffset.latest(STREAM_FOR_RESHARDING, "closedEmptyShard" + shardIndex)
415+
.toShardIteratorRequest()))
416+
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator));
417+
418+
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(25)))
419+
.willReturn(new GetRecordsResult().withNextShardIterator(null));
420+
}
421+
389422
private void setNewShard(AmazonKinesis amazonKinesis, String shardIndex) {
390423
String shardIterator1 = String.format("shard%sIterator1", shardIndex);
391424
String shardIterator2 = String.format("shard%sIterator2", shardIndex);
@@ -405,6 +438,11 @@ private void setNewShard(AmazonKinesis amazonKinesis, String shardIndex) {
405438
.willReturn(new GetShardIteratorResult().withShardIterator(shardIterator2));
406439
}
407440

441+
@Bean
442+
public ConcurrentMetadataStore reshardingCheckpointStore() {
443+
return new SimpleMetadataStore();
444+
}
445+
408446
@Bean
409447
public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() {
410448
KinesisMessageDrivenChannelAdapter adapter = new KinesisMessageDrivenChannelAdapter(
@@ -415,6 +453,7 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() {
415453
adapter.setDescribeStreamRetries(1);
416454
adapter.setRecordsLimit(25);
417455
adapter.setConcurrency(1);
456+
adapter.setCheckpointStore(reshardingCheckpointStore());
418457

419458
DirectFieldAccessor dfa = new DirectFieldAccessor(adapter);
420459
dfa.setPropertyValue("describeStreamBackoff", 10);

0 commit comments

Comments
 (0)