Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit c53bdfb

Browse files
Greg Ealesartembilan
authored andcommitted
GH-175: Iterate listShards() pages
Fixes #175 `KinesisMessageDrivenChannelAdapter` doesn't read from all shards * Add loop to get all pages of shards from Kinesis * Upgrade to the latest SC-AWS
1 parent ee79a1c commit c53bdfb

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ ext {
3131
servletApiVersion = '4.0.1'
3232
localstackVersion = '0.1.22'
3333
log4jVersion = '2.13.3'
34-
springCloudAwsVersion = '2.2.3.RELEASE'
34+
springCloudAwsVersion = '2.2.4.RELEASE'
3535
springIntegrationVersion = '5.2.8.RELEASE'
3636
kinesisClientVersion = '1.13.3'
3737
kinesisProducerVersion = '0.14.1'

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import java.util.concurrent.locks.Lock;
4343
import java.util.stream.Collectors;
4444

45-
import javax.annotation.Nullable;
46-
4745
import org.springframework.beans.factory.DisposableBean;
4846
import org.springframework.context.ApplicationEventPublisher;
4947
import org.springframework.context.ApplicationEventPublisherAware;
@@ -64,6 +62,7 @@
6462
import org.springframework.integration.support.management.IntegrationManagedResource;
6563
import org.springframework.jmx.export.annotation.ManagedOperation;
6664
import org.springframework.jmx.export.annotation.ManagedResource;
65+
import org.springframework.lang.Nullable;
6766
import org.springframework.messaging.Message;
6867
import org.springframework.scheduling.SchedulingAwareRunnable;
6968
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
@@ -542,8 +541,17 @@ private List<Shard> readShardList(String stream, int retryCount) {
542541

543542
try {
544543
ListShardsResult listShardsResult = this.amazonKinesis.listShards(listShardsRequest);
545-
546-
shardList.addAll(listShardsResult.getShards());
544+
while (true) {
545+
shardList.addAll(listShardsResult.getShards());
546+
if (listShardsResult.getNextToken() == null) {
547+
break;
548+
}
549+
else {
550+
listShardsResult =
551+
this.amazonKinesis.listShards(new ListShardsRequest()
552+
.withNextToken(listShardsResult.getNextToken()));
553+
}
554+
}
547555

548556
}
549557
catch (LimitExceededException limitExceededException) {

0 commit comments

Comments
 (0)