Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit b43a230

Browse files
authored
GH-190: Swallow checkpoint provisioning exception
Resolves #190 Swallows `ProvisionedThroughputExceededException` while checkpointing exhausted shards to avoid the `ShardConsumer` from not being marked as closed and therefore be left in an inconsistent state which will only throw exceptions as the `shardIterator` would be `null` and the `ShardConsumer` wouldn't be marked as `CLOSED`.
1 parent 6964720 commit b43a230

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,6 +93,7 @@
9393
* @author Hervé Fortin
9494
* @author Dirk Bonhomme
9595
* @author Greg Eales
96+
* @author Asiel Caballero
9697
*
9798
* @since 1.1
9899
*/
@@ -1091,7 +1092,7 @@ private Runnable processTask() {
10911092
String endingSequenceNumber =
10921093
shard.getSequenceNumberRange().getEndingSequenceNumber();
10931094
if (endingSequenceNumber != null) {
1094-
this.checkpointer.checkpoint(endingSequenceNumber);
1095+
checkpointSwallowingProvisioningExceptions(endingSequenceNumber);
10951096
}
10961097
break;
10971098
}
@@ -1125,6 +1126,19 @@ private Runnable processTask() {
11251126
};
11261127
}
11271128

1129+
private void checkpointSwallowingProvisioningExceptions(String endingSequenceNumber) {
1130+
try {
1131+
this.checkpointer.checkpoint(endingSequenceNumber);
1132+
}
1133+
catch (ProvisionedThroughputExceededException ignored) {
1134+
// This exception is ignored to gurantee that an exhausted shard is marked as CLOSED
1135+
// even in the case it's not possible to checkpoint. Otherwise the ShardConsumer is
1136+
// left in an illegal state where the shard iterator is null without any possibility
1137+
// of recovering from it.
1138+
logger.debug("Exception while checkpointing empty shards", ignored);
1139+
}
1140+
}
1141+
11281142
private GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
11291143
try {
11301144
return KinesisMessageDrivenChannelAdapter.this.amazonKinesis.getRecords(getRecordsRequest);

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -75,6 +75,7 @@
7575
* @author Artem Bilan
7676
* @author Matthias Wesolowski
7777
* @author Greg Eales
78+
* @author Asiel Caballero
7879
*
7980
* @since 1.1
8081
*/
@@ -237,7 +238,6 @@ void testResharding() throws InterruptedException {
237238
@Configuration
238239
@EnableIntegration
239240
public static class Config {
240-
241241
private final AtomicReference<KinesisShardEndedEvent> shardEndedEventReference = new AtomicReference<>();
242242

243243
@Bean
@@ -440,7 +440,7 @@ private void setNewShard(AmazonKinesis amazonKinesis, String shardIndex) {
440440

441441
@Bean
442442
public ConcurrentMetadataStore reshardingCheckpointStore() {
443-
return new SimpleMetadataStore();
443+
return new ExceptionReadyMetadataStore();
444444
}
445445

446446
@Bean
@@ -472,4 +472,14 @@ public void handleKinesisShardEndedEvent(KinesisShardEndedEvent event) {
472472

473473
}
474474

475+
private static class ExceptionReadyMetadataStore extends SimpleMetadataStore {
476+
@Override
477+
public boolean replace(String key, String oldValue, String newValue) {
478+
if ("SpringIntegration:streamForResharding:closedShard4".equals(key)) {
479+
throw new ProvisionedThroughputExceededException("Throughput exceeded");
480+
}
481+
482+
return super.replace(key, oldValue, newValue);
483+
}
484+
}
475485
}

0 commit comments

Comments
 (0)