Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit bc9a082

Browse files
committed
GH-221: Revise an at-least-once delivery
Fixes #221 * Modify the logic of the `KinesisMessageDrivenChannelAdapter` to rewind a shard iterator to the failed sequence for any errors. A rewinding sequence is determined from extra properties in the `ShardCheckpointer` * Remove `RequestShardForSequenceException` since more natural behavior to react for any record processor error without end-user interaction
1 parent 63b4083 commit bc9a082

File tree

5 files changed

+72
-107
lines changed

5 files changed

+72
-107
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,9 @@ For example, users may want to fully read any parent shards before starting to r
433433
}
434434
```
435435

436-
Starting with _version 3.0_. the `RequestShardForSequenceException` can be used for flow control to request the shard iterator for specific sequence.
437-
For example, when consumer has failed processing batch at specific record, throwing this exception with a sequence of that record will ensure at-least-once delivery since the shard iterator will move back to the requested record sequence.
436+
Starting with _version 3.0_, any exception thrown from the record process may lead to shard iterator rewinding to the latest check-pointed sequence or the first one in the current failed batch.
437+
This ensures an at-least-once delivery for possibly failed records.
438+
If the latest checkpoint is equal to the highest sequence in the batch, then shard consumer continue with the next iterator.
438439

439440
Also, the `KclMessageDrivenChannelAdapter` is provided for performing streams consumption by [Kinesis Client Library][].
440441
See its JavaDocs for more information.

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.springframework.context.ApplicationEventPublisherAware;
6666
import org.springframework.core.AttributeAccessor;
6767
import org.springframework.core.convert.converter.Converter;
68+
import org.springframework.core.log.LogMessage;
6869
import org.springframework.core.serializer.support.DeserializingConverter;
6970
import org.springframework.integration.IntegrationMessageHeaderAccessor;
7071
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
@@ -1094,18 +1095,8 @@ private Runnable processTask() {
10941095
this.shardIterator = result.nextShardIterator();
10951096
}
10961097
}
1097-
catch (RequestShardForSequenceException requestShardForSequenceException) {
1098-
// Something wrong happened and not all records were processed.
1099-
// Must start from the provided sequence
1100-
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
1101-
newOffset.setSequenceNumber(requestShardForSequenceException.getSequenceNumber());
1102-
newOffset.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
1103-
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1104-
this.shardIterator =
1105-
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
1106-
.getShardIterator(shardIteratorRequest)
1107-
.join()
1108-
.shardIterator();
1098+
catch (Exception ex) {
1099+
rewindIteratorOnError(ex, result);
11091100
}
11101101
finally {
11111102
attributesHolder.remove();
@@ -1154,6 +1145,39 @@ private Runnable processTask() {
11541145
};
11551146
}
11561147

1148+
private void rewindIteratorOnError(Exception ex, GetRecordsResponse result) {
1149+
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
1150+
String lastCheckpoint = this.checkpointer.getLastCheckpointValue();
1151+
String highestSequence = this.checkpointer.getHighestSequence();
1152+
if (highestSequence.equals(lastCheckpoint)) {
1153+
logger.info(ex, "Record processor has thrown exception. " +
1154+
"Ignore since the highest sequence in batch was check-pointed.");
1155+
this.shardIterator = result.nextShardIterator();
1156+
return;
1157+
}
1158+
String newOffsetValue = lastCheckpoint;
1159+
if (lastCheckpoint != null) {
1160+
newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
1161+
}
1162+
else {
1163+
newOffsetValue = this.checkpointer.getFirstSequenceInBatch();
1164+
newOffset.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
1165+
}
1166+
1167+
logger.info(ex,
1168+
LogMessage.format("Record processor has thrown exception. " +
1169+
"Rewind shard iterator %s sequence number: %s",
1170+
(lastCheckpoint != null ? "after" : "at"), newOffsetValue));
1171+
1172+
newOffset.setSequenceNumber(newOffsetValue);
1173+
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1174+
this.shardIterator =
1175+
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
1176+
.getShardIterator(shardIteratorRequest)
1177+
.join()
1178+
.shardIterator();
1179+
}
1180+
11571181
private void checkpointSwallowingProvisioningExceptions(String endingSequenceNumber) {
11581182
try {
11591183
this.checkpointer.checkpoint(endingSequenceNumber);
@@ -1205,6 +1229,7 @@ private void prepareSleepState() {
12051229
private void processRecords(List<Record> records) {
12061230
logger.trace(() -> "Processing records: " + records + " for [" + ShardConsumer.this + "]");
12071231

1232+
this.checkpointer.setFirstSequenceInBatch(records.get(0).sequenceNumber());
12081233
this.checkpointer.setHighestSequence(records.get(records.size() - 1).sequenceNumber());
12091234

12101235
if (ListenerMode.record.equals(KinesisMessageDrivenChannelAdapter.this.listenerMode)) {
@@ -1313,42 +1338,7 @@ private void performSend(
13131338

13141339
Message<?> messageToSend = messageBuilder.build();
13151340
setAttributesIfNecessary(rawRecord, messageToSend);
1316-
try {
1317-
sendMessage(messageToSend);
1318-
}
1319-
catch (RequestShardForSequenceException requestShardForSequenceException) {
1320-
// Rethrow
1321-
throw requestShardForSequenceException;
1322-
}
1323-
catch (Exception ex) {
1324-
RequestShardForSequenceException requestShardForSequenceExceptionInCause =
1325-
findRequestShardForSequenceExceptionInCause(ex);
1326-
if (requestShardForSequenceExceptionInCause != null) {
1327-
throw requestShardForSequenceExceptionInCause;
1328-
}
1329-
else {
1330-
logger.info(ex, () ->
1331-
"Got an exception during sending a '"
1332-
+ messageToSend
1333-
+ "'"
1334-
+ "\nfor the '"
1335-
+ rawRecord
1336-
+ "'.\n"
1337-
+ "Consider to use 'errorChannel' flow for the compensation logic.");
1338-
}
1339-
}
1340-
}
1341-
1342-
@Nullable
1343-
private static RequestShardForSequenceException findRequestShardForSequenceExceptionInCause(Throwable ex) {
1344-
if (ex instanceof RequestShardForSequenceException requestShardForSequenceException) {
1345-
return requestShardForSequenceException;
1346-
}
1347-
Throwable cause = ex.getCause();
1348-
if (cause != null && cause != ex) {
1349-
return findRequestShardForSequenceExceptionInCause(cause);
1350-
}
1351-
return null;
1341+
sendMessage(messageToSend);
13521342
}
13531343

13541344
private void checkpointIfBatchMode() {

src/main/java/org/springframework/integration/aws/inbound/kinesis/RequestShardForSequenceException.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

src/main/java/org/springframework/integration/aws/inbound/kinesis/ShardCheckpointer.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323

2424
import org.springframework.integration.metadata.ConcurrentMetadataStore;
2525
import org.springframework.integration.metadata.MetadataStore;
26+
import org.springframework.lang.Nullable;
2627

2728
/**
2829
* An internal {@link Checkpointer} implementation based on provided {@link MetadataStore}
@@ -42,6 +43,10 @@ class ShardCheckpointer implements Checkpointer {
4243

4344
private final String key;
4445

46+
private volatile String firstSequenceInBatch;
47+
48+
private volatile String highestSequence;
49+
4550
private volatile String lastCheckpointValue;
4651

4752
private volatile boolean active = true;
@@ -53,7 +58,7 @@ class ShardCheckpointer implements Checkpointer {
5358

5459
@Override
5560
public boolean checkpoint() {
56-
return checkpoint(this.lastCheckpointValue);
61+
return checkpoint(this.highestSequence);
5762
}
5863

5964
@Override
@@ -66,7 +71,11 @@ public boolean checkpoint(String sequenceNumber) {
6671
return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
6772
}
6873
else {
69-
return this.checkpointStore.putIfAbsent(this.key, sequenceNumber) == null;
74+
boolean stored = this.checkpointStore.putIfAbsent(this.key, sequenceNumber) == null;
75+
if (stored) {
76+
this.lastCheckpointValue = sequenceNumber;
77+
}
78+
return stored;
7079
}
7180
}
7281
}
@@ -79,14 +88,30 @@ public boolean checkpoint(String sequenceNumber) {
7988
return false;
8089
}
8190

91+
void setFirstSequenceInBatch(String firstSequenceInBatch) {
92+
this.firstSequenceInBatch = firstSequenceInBatch;
93+
}
94+
95+
@Nullable
96+
String getFirstSequenceInBatch() {
97+
return this.firstSequenceInBatch;
98+
}
99+
82100
void setHighestSequence(String highestSequence) {
83-
this.lastCheckpointValue = highestSequence;
101+
this.highestSequence = highestSequence;
102+
}
103+
104+
String getHighestSequence() {
105+
return this.highestSequence;
84106
}
85107

108+
@Nullable
86109
String getCheckpoint() {
87-
return this.checkpointStore.get(this.key);
110+
this.lastCheckpointValue = this.checkpointStore.get(this.key);
111+
return this.lastCheckpointValue;
88112
}
89113

114+
@Nullable
90115
String getLastCheckpointValue() {
91116
return this.lastCheckpointValue;
92117
}

src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.springframework.integration.aws.LocalstackContainerTest;
3737
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
3838
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageHeaderErrorMessageStrategy;
39-
import org.springframework.integration.aws.inbound.kinesis.RequestShardForSequenceException;
4039
import org.springframework.integration.aws.outbound.KinesisMessageHandler;
4140
import org.springframework.integration.aws.support.AwsHeaders;
4241
import org.springframework.integration.channel.QueueChannel;
@@ -224,9 +223,7 @@ public PollableChannel errorChannel() {
224223
@Override
225224
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
226225
if (message instanceof ErrorMessage errorMessage && this.thrown.compareAndSet(false, true)) {
227-
throw new RequestShardForSequenceException(
228-
errorMessage.getHeaders().get(AwsHeaders.RAW_RECORD, Record.class).sequenceNumber(),
229-
errorMessage.getPayload());
226+
throw (RuntimeException) errorMessage.getPayload();
230227
}
231228
}
232229

0 commit comments

Comments
 (0)