Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 63b4083

Browse files
committed
GH-221: Revise the manual checkpoint logic
Fixes #221 * Introduce a `RequestShardForSequenceException` to control the flow for requesting the `KinesisMessageDrivenChannelAdapter` for shard iterator at specific sequence
1 parent 136cce6 commit 63b4083

File tree

7 files changed

+132
-68
lines changed

7 files changed

+132
-68
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ Can be specified as `null` with meaning no conversion and the target `Message` i
398398
Additional headers like `AwsHeaders.RECEIVED_STREAM`, `AwsHeaders.SHARD`, `AwsHeaders.RECEIVED_PARTITION_KEY` and `AwsHeaders.RECEIVED_SEQUENCE_NUMBER` are populated to the message for downstream logic.
399399
When `CheckpointMode.manual` is used the `Checkpointer` instance is populated to the `AwsHeaders.CHECKPOINTER` header for an acknowledgment in the downstream logic manually.
400400

401-
The `KinesisMessageDrivenChannelAdapter` ca be configured with the `ListenerMode` `record` or `batch` to process records one by one or send the whole just polled batch of records.
401+
The `KinesisMessageDrivenChannelAdapter` can be configured with the `ListenerMode` `record` or `batch` to process records one by one or send the whole just polled batch of records.
402402
If `Converter` is configured to `null`, the entire `List<Record>` is sent as a payload.
403403
Otherwise, a list of converted `Record.getData().array()` is wrapped to the payload of message to send.
404404
In this case the `AwsHeaders.RECEIVED_PARTITION_KEY` and `AwsHeaders.RECEIVED_SEQUENCE_NUMBER` headers contains values as a `List<String>` of partition keys and sequence numbers of converted records respectively.
@@ -433,6 +433,9 @@ For example, users may want to fully read any parent shards before starting to r
433433
}
434434
```
435435

436+
Starting with _version 3.0_. the `RequestShardForSequenceException` can be used for flow control to request the shard iterator for specific sequence.
437+
For example, when consumer has failed processing batch at specific record, throwing this exception with a sequence of that record will ensure at-least-once delivery since the shard iterator will move back to the requested record sequence.
438+
436439
Also, the `KclMessageDrivenChannelAdapter` is provided for performing streams consumption by [Kinesis Client Library][].
437440
See its JavaDocs for more information.
438441

build.gradle

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ plugins {
1111
id 'checkstyle'
1212
id 'org.ajoberstar.grgit' version '4.1.1'
1313
id 'io.spring.dependency-management' version '1.1.0'
14-
id 'com.jfrog.artifactory' version '4.31.7'
14+
id 'com.jfrog.artifactory' version '4.31.9'
1515
}
1616

1717
description = 'Spring Integration AWS Support'
@@ -32,15 +32,15 @@ repositories {
3232
ext {
3333
assertjVersion = '3.24.2'
3434
awaitilityVersion = '4.2.0'
35-
awsSdkVersion = '2.20.35'
36-
jacksonVersion = '2.14.2'
35+
awsSdkVersion = '2.20.51'
36+
jacksonVersion = '2.15.0'
3737
junitVersion = '5.9.2'
38-
log4jVersion = '2.19.0'
38+
log4jVersion = '2.20.0'
3939
servletApiVersion = '6.0.0'
4040
springCloudAwsVersion = '3.0.0-RC2'
41-
springIntegrationVersion = '6.0.4'
41+
springIntegrationVersion = '6.0.5'
4242
kinesisClientVersion = '2.4.8'
43-
kinesisProducerVersion = '0.15.5'
43+
kinesisProducerVersion = '0.15.6'
4444
testcontainersVersion = '1.18.0'
4545

4646
idPrefix = 'aws'

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 54 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,68 +1091,43 @@ private Runnable processTask() {
10911091
if (!records.isEmpty()) {
10921092
processRecords(records);
10931093
}
1094+
this.shardIterator = result.nextShardIterator();
10941095
}
10951096
}
1097+
catch (RequestShardForSequenceException requestShardForSequenceException) {
1098+
// Something wrong happened and not all records were processed.
1099+
// Must start from the provided sequence
1100+
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
1101+
newOffset.setSequenceNumber(requestShardForSequenceException.getSequenceNumber());
1102+
newOffset.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
1103+
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1104+
this.shardIterator =
1105+
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
1106+
.getShardIterator(shardIteratorRequest)
1107+
.join()
1108+
.shardIterator();
1109+
}
10961110
finally {
10971111
attributesHolder.remove();
10981112
if (result != null) {
1099-
// If using manual checkpointer, we have to make sure we are allowed to use the next shard iterator
1100-
// Because if the manual checkpointer was not set to the latest record, it means there are records to be reprocessed
1101-
// and if we use the nextShardIterator, we will be skipping records that need to be reprocessed
1102-
List<Record> records = result.records();
1103-
if (CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode) &&
1104-
!records.isEmpty()) {
1105-
logger.info("Manual checkpointer. Must validate if should use getNextShardIterator()");
1106-
String lastRecordSequence = records.get(records.size() - 1).sequenceNumber();
1107-
String lastCheckpointSequence = this.checkpointer.getCheckpoint();
1108-
if (lastCheckpointSequence.equals(lastRecordSequence)) {
1109-
logger.info("latestCheckpointSequence is same as latestRecordSequence. " +
1110-
"Should getNextShardIterator()");
1111-
// Means the manual checkpointer has processed the last record, Should move forward
1112-
this.shardIterator = result.nextShardIterator();
1113-
}
1114-
else {
1115-
logger.info("latestCheckpointSequence is not the same as latestRecordSequence. " +
1116-
"Should Get a new iterator AFTER_SEQUENCE_NUMBER latestCheckpointSequence");
1117-
// Something wrong happened and not all records were processed.
1118-
// Must start from the latest known checkpoint
1119-
KinesisShardOffset newOffset = new KinesisShardOffset(this.shardOffset);
1120-
newOffset.setSequenceNumber(lastCheckpointSequence);
1121-
newOffset.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
1122-
GetShardIteratorRequest shardIteratorRequest = newOffset.toShardIteratorRequest();
1123-
this.shardIterator =
1124-
KinesisMessageDrivenChannelAdapter.this.amazonKinesis
1125-
.getShardIterator(shardIteratorRequest)
1126-
.join()
1127-
.shardIterator();
1128-
}
1129-
}
1130-
else {
1131-
this.shardIterator = result.nextShardIterator();
1132-
}
1133-
11341113
if (this.shardIterator == null) {
11351114
if (KinesisMessageDrivenChannelAdapter.this.lockRegistry != null) {
11361115
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.shardOffsetsToConsumer
11371116
.remove(this.key);
11381117
}
11391118
// Shard is closed: nothing to consume anymore.
11401119
// Checkpoint endingSequenceNumber to ensure shard is marked exhausted.
1141-
// If in CheckpointMode.manual, only checkpoint if lastCheckpointValue is also null, as this
1142-
// means that no records have ever been read and so the shard was empty
1143-
if (!CheckpointMode.manual.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)
1144-
|| this.checkpointer.getLastCheckpointValue() == null) {
1145-
for (Shard shard : readShardList(this.shardOffset.getStream())) {
1146-
if (shard.shardId().equals(this.shardOffset.getShard())) {
1147-
String endingSequenceNumber =
1148-
shard.sequenceNumberRange().endingSequenceNumber();
1149-
if (endingSequenceNumber != null) {
1150-
checkpointSwallowingProvisioningExceptions(endingSequenceNumber);
1151-
}
1152-
break;
1120+
for (Shard shard : readShardList(this.shardOffset.getStream())) {
1121+
if (shard.shardId().equals(this.shardOffset.getShard())) {
1122+
String endingSequenceNumber =
1123+
shard.sequenceNumberRange().endingSequenceNumber();
1124+
if (endingSequenceNumber != null) {
1125+
checkpointSwallowingProvisioningExceptions(endingSequenceNumber);
11531126
}
1127+
break;
11541128
}
11551129
}
1130+
11561131
// Resharding is possible.
11571132
if (KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
11581133
KinesisMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent(
@@ -1185,7 +1160,7 @@ private void checkpointSwallowingProvisioningExceptions(String endingSequenceNum
11851160
}
11861161
catch (ProvisionedThroughputExceededException ignored) {
11871162
// This exception is ignored to guarantee that an exhausted shard is marked as CLOSED
1188-
// even in the case it's not possible to checkpoint. Otherwise the ShardConsumer is
1163+
// even in the case it's not possible to checkpoint. Otherwise, the ShardConsumer is
11891164
// left in an illegal state where the shard iterator is null without any possibility
11901165
// of recovering from it.
11911166
logger.debug(ignored, "Exception while checkpointing empty shards");
@@ -1341,18 +1316,41 @@ private void performSend(
13411316
try {
13421317
sendMessage(messageToSend);
13431318
}
1319+
catch (RequestShardForSequenceException requestShardForSequenceException) {
1320+
// Rethrow
1321+
throw requestShardForSequenceException;
1322+
}
13441323
catch (Exception ex) {
1345-
logger.info(ex, () ->
1346-
"Got an exception during sending a '"
1347-
+ messageToSend
1348-
+ "'"
1349-
+ "\nfor the '"
1350-
+ rawRecord
1351-
+ "'.\n"
1352-
+ "Consider to use 'errorChannel' flow for the compensation logic.");
1324+
RequestShardForSequenceException requestShardForSequenceExceptionInCause =
1325+
findRequestShardForSequenceExceptionInCause(ex);
1326+
if (requestShardForSequenceExceptionInCause != null) {
1327+
throw requestShardForSequenceExceptionInCause;
1328+
}
1329+
else {
1330+
logger.info(ex, () ->
1331+
"Got an exception during sending a '"
1332+
+ messageToSend
1333+
+ "'"
1334+
+ "\nfor the '"
1335+
+ rawRecord
1336+
+ "'.\n"
1337+
+ "Consider to use 'errorChannel' flow for the compensation logic.");
1338+
}
13531339
}
13541340
}
13551341

1342+
@Nullable
1343+
private static RequestShardForSequenceException findRequestShardForSequenceExceptionInCause(Throwable ex) {
1344+
if (ex instanceof RequestShardForSequenceException requestShardForSequenceException) {
1345+
return requestShardForSequenceException;
1346+
}
1347+
Throwable cause = ex.getCause();
1348+
if (cause != null && cause != ex) {
1349+
return findRequestShardForSequenceExceptionInCause(cause);
1350+
}
1351+
return null;
1352+
}
1353+
13561354
private void checkpointIfBatchMode() {
13571355
if (CheckpointMode.batch.equals(KinesisMessageDrivenChannelAdapter.this.checkpointMode)) {
13581356
this.checkpointer.checkpoint();
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws.inbound.kinesis;
18+
19+
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
20+
21+
/**
22+
* The flow control exception to notify the {@link KinesisMessageDrivenChannelAdapter}
23+
* that specific shard iterator ({@link ShardIteratorType#AT_SEQUENCE_NUMBER})
24+
* must be requested instead of checkpointing.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 3.0
29+
*/
30+
@SuppressWarnings("serial")
31+
public class RequestShardForSequenceException extends RuntimeException {
32+
33+
private final String sequenceNumber;
34+
35+
public RequestShardForSequenceException(String sequenceNumber) {
36+
this.sequenceNumber = sequenceNumber;
37+
}
38+
39+
public RequestShardForSequenceException(String sequenceNumber, Throwable cause) {
40+
super(cause);
41+
this.sequenceNumber = sequenceNumber;
42+
}
43+
44+
public String getSequenceNumber() {
45+
return this.sequenceNumber;
46+
}
47+
48+
}

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ void testKinesisMessageDrivenChannelAdapter() {
148148
Checkpointer checkpointer = headers.get(AwsHeaders.CHECKPOINTER, Checkpointer.class);
149149
assertThat(checkpointer).isNotNull();
150150

151-
checkpointer.checkpoint();
152-
153151
message = this.kinesisChannel.receive(10000);
154152
assertThat(message).isNotNull();
155153
assertThat(message.getPayload()).isEqualTo("bar");
@@ -161,6 +159,8 @@ void testKinesisMessageDrivenChannelAdapter() {
161159

162160
assertThat(this.kinesisChannel.receive(10)).isNull();
163161

162+
checkpointer.checkpoint();
163+
164164
assertThat(this.checkpointStore.get("SpringIntegration" + ":" + STREAM1 + ":" + "1")).isEqualTo("2");
165165

166166
this.kinesisMessageDrivenChannelAdapter.stop();

src/test/java/org/springframework/integration/aws/kinesis/KinesisIntegrationTests.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import java.util.Date;
2020
import java.util.HashSet;
2121
import java.util.Set;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import org.junit.jupiter.api.AfterAll;
2425
import org.junit.jupiter.api.BeforeAll;
2526
import org.junit.jupiter.api.Test;
2627
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
28+
import software.amazon.awssdk.services.kinesis.model.Record;
2729

2830
import org.springframework.beans.DirectFieldAccessor;
2931
import org.springframework.beans.factory.annotation.Autowired;
@@ -34,6 +36,7 @@
3436
import org.springframework.integration.aws.LocalstackContainerTest;
3537
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter;
3638
import org.springframework.integration.aws.inbound.kinesis.KinesisMessageHeaderErrorMessageStrategy;
39+
import org.springframework.integration.aws.inbound.kinesis.RequestShardForSequenceException;
3740
import org.springframework.integration.aws.outbound.KinesisMessageHandler;
3841
import org.springframework.integration.aws.support.AwsHeaders;
3942
import org.springframework.integration.channel.QueueChannel;
@@ -114,6 +117,14 @@ void testKinesisInboundOutbound() {
114117
.contains("Channel 'kinesisReceiveChannel' expected one of the following data types "
115118
+ "[class java.util.Date], but received [class java.lang.String]");
116119

120+
String errorSequenceNumber = errorMessage.getHeaders().get(AwsHeaders.RAW_RECORD, Record.class).sequenceNumber();
121+
122+
// Second exception for the same record since we have requested via RequestShardForSequenceException
123+
errorMessage = this.errorChannel.receive(30_000);
124+
assertThat(errorMessage).isNotNull();
125+
assertThat(errorMessage.getHeaders().get(AwsHeaders.RAW_RECORD, Record.class).sequenceNumber())
126+
.isEqualTo(errorSequenceNumber);
127+
117128
for (int i = 0; i < 2; i++) {
118129
this.kinesisSendChannel
119130
.send(MessageBuilder.withPayload(new Date()).setHeader(AwsHeaders.STREAM, TEST_STREAM).build());
@@ -208,10 +219,14 @@ public PollableChannel errorChannel() {
208219
QueueChannel queueChannel = new QueueChannel();
209220
queueChannel.addInterceptor(new ChannelInterceptor() {
210221

222+
private final AtomicBoolean thrown = new AtomicBoolean();
223+
211224
@Override
212225
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
213-
if (message instanceof ErrorMessage) {
214-
throw (RuntimeException) ((ErrorMessage) message).getPayload();
226+
if (message instanceof ErrorMessage errorMessage && this.thrown.compareAndSet(false, true)) {
227+
throw new RequestShardForSequenceException(
228+
errorMessage.getHeaders().get(AwsHeaders.RAW_RECORD, Record.class).sequenceNumber(),
229+
errorMessage.getPayload());
215230
}
216231
}
217232

src/test/resources/log4j2-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<Logger name="org.springframework" level="warn"/>
1010
<Logger name="org.springframework.integration" level="warn"/>
1111
<Logger name="org.springframework.integration.aws" level="info"/>
12-
<Logger name="org.testcontainers" level="debug"/>
12+
<Logger name="org.testcontainers" level="warn"/>
1313
<Root level="warn">
1414
<AppenderRef ref="STDOUT"/>
1515
</Root>

0 commit comments

Comments
 (0)