Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit a002646

Browse files
authored
No NextShardIterator if not processed manual ack batch
* fix(KinesisMessageDrivenChannelAdapter): added logic to not use the NextShardIterator if Manual Checkpointer hasnt reached the last checkpoint * fix(KinesisMessageDrivenChannel): added logs for getNextShardIterator * fix(KinesisMessageDrivenChannelAdapter): fixed logic and comments issue * feat(KinesisMessageDrivenChannelAdapter): implemented integration tests for getNextShardIterator new usecase * feat(KinesisMessageDrivenChannelAdapter): updated @author * feat(KinesisMessageDrivenChannelAdapterTests): added author data
1 parent f10d6c6 commit a002646

File tree

2 files changed

+100
-1
lines changed

2 files changed

+100
-1
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
* @author Dirk Bonhomme
9595
* @author Greg Eales
9696
* @author Asiel Caballero
97+
* @author Jonathan Nagayoshi
9798
*
9899
* @since 1.1
99100
*/
@@ -1073,7 +1074,40 @@ private Runnable processTask() {
10731074
finally {
10741075
attributesHolder.remove();
10751076
if (result != null) {
1076-
this.shardIterator = result.getNextShardIterator();
1077+
// If using manual checkpointer, we have to make sure we are allowed to use the next shard iterator
1078+
// Because if the manual checkpointer was not set to the latest record, it means there are records to be reprocessed
1079+
// and if we use the nextShardIterator, we will be skipping records that need to be reprocessed
1080+
List<Record> records = result.getRecords();
1081+
if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) &&
1082+
!records.isEmpty()) {
1083+
logger.info("Manual checkpointer. Must validate if should use getNextShardIterator()");
1084+
String lastRecordSequence = records.get(records.size() - 1).getSequenceNumber();
1085+
String lastCheckpointSequence = this.checkpointer.getCheckpoint();
1086+
if (lastCheckpointSequence.equals(lastRecordSequence)) {
1087+
logger.info("latestCheckpointSequence is same as latestRecordSequence. " +
1088+
"" +
1089+
"Should getNextShardIterator()");
1090+
// Means the manual checkpointer has processed the last record, Should move forward
1091+
this.shardIterator = result.getNextShardIterator();
1092+
}
1093+
else {
1094+
logger.info("latestCheckpointSequence is not the same as latestRecordSequence" +
1095+
". Should Get a new iterator AFTER_SEQUENCE_NUMBER latestCheckpointSequence");
1096+
// Something wrong happened and not all records were processed.
1097+
// Must start from the latest known checkpoint
1098+
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
1099+
newOffset.setSequenceNumber(lastCheckpointSequence);
1100+
newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
1101+
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1102+
this.shardIterator = KinesisMessageDrivenChannelAdapter.this
1103+
.amazonKinesis
1104+
.getShardIterator(shardIteratorRequest)
1105+
.getShardIterator();
1106+
}
1107+
}
1108+
else {
1109+
this.shardIterator = result.getNextShardIterator();
1110+
}
10771111

10781112
if (this.shardIterator == null) {
10791113
if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
* @author Matthias Wesolowski
7777
* @author Greg Eales
7878
* @author Asiel Caballero
79+
* @author Jonathan Nagayoshi
7980
*
8081
* @since 1.1
8182
*/
@@ -201,6 +202,40 @@ void testKinesisMessageDrivenChannelAdapter() {
201202
.hasSize(2);
202203

203204
this.kinesisMessageDrivenChannelAdapter.stop();
205+
206+
this.kinesisMessageDrivenChannelAdapter.setListenerMode(ListenerMode.batch);
207+
this.kinesisMessageDrivenChannelAdapter.setCheckpointMode(CheckpointMode.manual);
208+
this.checkpointStore.put("SpringIntegration" + ":" + STREAM1 + ":" + "1", "2");
209+
210+
this.kinesisMessageDrivenChannelAdapter.start();
211+
212+
message = this.kinesisChannel.receive(10000);
213+
assertThat(message).isNotNull();
214+
assertThat(message.getPayload()).isInstanceOf(List.class);
215+
List<String> messagePayload = (List<String>) message.getPayload();
216+
assertThat(messagePayload).size().isEqualTo(3);
217+
218+
Object messageSequenceNumberHeader = message.getHeaders().get(AwsHeaders.RECEIVED_SEQUENCE_NUMBER);
219+
assertThat(messageSequenceNumberHeader).isInstanceOf(List.class);
220+
assertThat((List<String>) messageSequenceNumberHeader).contains("3");
221+
// Set checkpoint to 3, this should prevent adapter from using next shard, since its not the latest record
222+
// in the batch
223+
checkpointer.checkpoint("3");
224+
225+
await().untilAsserted(
226+
() -> assertThat(this.checkpointStore.get("SpringIntegration" + ":" + STREAM1 + ":" + "1"))
227+
.isEqualTo("3"));
228+
message = this.kinesisChannel.receive(10000);
229+
assertThat(message).isNotNull();
230+
assertThat(message.getPayload()).isInstanceOf(List.class);
231+
messagePayload = (List<String>) message.getPayload();
232+
assertThat(messagePayload).size().isEqualTo(2);
233+
assertThat(messagePayload).contains("bar");
234+
assertThat(messagePayload).contains("foobar");
235+
236+
this.kinesisMessageDrivenChannelAdapter.stop();
237+
238+
204239
}
205240

206241
@Test
@@ -296,6 +331,36 @@ public AmazonKinesis amazonKinesis() {
296331
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber("2")
297332
.withData(ByteBuffer.wrap(serializingConverter.convert("bar")))));
298333

334+
335+
String shard1Iterator5 = "shard1Iterator5";
336+
String shard1Iterator6 = "shard1Iterator6";
337+
338+
given(amazonKinesis.getShardIterator(
339+
KinesisShardOffset.afterSequenceNumber(STREAM1, "1", "2").toShardIteratorRequest()))
340+
.willReturn(new GetShardIteratorResult().withShardIterator(shard1Iterator5));
341+
342+
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shard1Iterator5).withLimit(25)))
343+
.willReturn(new GetRecordsResult().withNextShardIterator(shard1Iterator6)
344+
.withRecords(new Record().withPartitionKey("partition1").withSequenceNumber("3")
345+
.withData(ByteBuffer.wrap(serializingConverter.convert("foo"))),
346+
new Record().withPartitionKey("partition1").withSequenceNumber("4")
347+
.withData(ByteBuffer.wrap(serializingConverter.convert("bar"))),
348+
new Record().withPartitionKey("partition1").withSequenceNumber("5")
349+
.withData(ByteBuffer.wrap(serializingConverter.convert("foobar")))));
350+
351+
352+
given(amazonKinesis.getShardIterator(
353+
KinesisShardOffset.afterSequenceNumber(STREAM1, "1", "3").toShardIteratorRequest()))
354+
.willReturn(new GetShardIteratorResult().withShardIterator(shard1Iterator6));
355+
356+
given(amazonKinesis.getRecords(new GetRecordsRequest().withShardIterator(shard1Iterator6).withLimit(25)))
357+
.willReturn(new GetRecordsResult().withNextShardIterator(shard1Iterator6)
358+
.withRecords(
359+
new Record().withPartitionKey("partition1").withSequenceNumber("4")
360+
.withData(ByteBuffer.wrap(serializingConverter.convert("bar"))),
361+
new Record().withPartitionKey("partition1").withSequenceNumber("5")
362+
.withData(ByteBuffer.wrap(serializingConverter.convert("foobar")))));
363+
299364
return amazonKinesis;
300365
}
301366

0 commit comments

Comments
 (0)