Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 5a3c6a1

Browse files
authored
GH-179: Kinesis: Add "shards to consume" filter
Fixes #179 * code review * rename to `shardListFilter` and add example to README
1 parent c53bdfb commit 5a3c6a1

File tree

2 files changed

+36
-9
lines changed

2 files changed

+36
-9
lines changed

README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,22 @@ The `KinesisMessageDrivenChannelAdapter` iterates over its shards and tries to a
580580
If `LockRegistry` is not provided, no exclusive locking happens and all the shards are consumed by this `KinesisMessageDrivenChannelAdapter`.
581581
See also `DynamoDbLockRegistry` for more information.
582582

583-
Also the `KclMessageDrivenChannelAdapter` is provided for performing streams consumption by [Kinesis Client Library][].
583+
The `KinesisMessageDrivenChannelAdapter` can be configured with a `Function<List<Shard>, List<Shard>> shardListFilter` to filter the available, open, non-exhausted shards.
584+
This filter `Function` will be called each time the shard list is refreshed.
585+
586+
For example, users may want to fully read any parent shards before starting to read their child shards. This could be achieved as follows:
587+
588+
```java
589+
openShards -> {
590+
Set<String> openShardIds = openShards.stream().map(Shard::getShardId).collect(Collectors.toSet());
591+
// only return open shards which have no parent available for reading
592+
return openShards.stream()
593+
.filter(shard -> !openShardIds.contains(shard.getParentShardId()))
594+
.collect(Collectors.toList());
595+
}
596+
```
597+
598+
Also, the `KclMessageDrivenChannelAdapter` is provided for performing streams consumption by [Kinesis Client Library][].
584599
See its JavaDocs for more information.
585600

586601
### Outbound Channel Adapter

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.Semaphore;
4141
import java.util.concurrent.TimeUnit;
4242
import java.util.concurrent.locks.Lock;
43+
import java.util.function.Function;
4344
import java.util.stream.Collectors;
4445

4546
import org.springframework.beans.factory.DisposableBean;
@@ -172,6 +173,9 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport
172173

173174
private ApplicationEventPublisher applicationEventPublisher;
174175

176+
@Nullable
177+
private Function<List<Shard>, List<Shard>> shardListFilter;
178+
175179
public KinesisMessageDrivenChannelAdapter(AmazonKinesis amazonKinesis, String... streams) {
176180
Assert.notNull(amazonKinesis, "'amazonKinesis' must not be null.");
177181
Assert.notEmpty(streams, "'streams' must not be null.");
@@ -339,6 +343,15 @@ public void setBindSourceRecord(boolean bindSourceRecord) {
339343
this.bindSourceRecord = bindSourceRecord;
340344
}
341345

346+
/**
347+
* Specify a {@link Function Function&lt;List&lt;Shard&gt;, List&lt;Shard&gt;&gt;} to filter the shards which will
348+
* be read from.
349+
* @param shardListFilter the filter {@link Function Function&lt;List&lt;Shard&gt;, List&lt;Shard&gt;&gt;}
350+
*/
351+
public void setShardListFilter(Function<List<Shard>, List<Shard>> shardListFilter) {
352+
this.shardListFilter = shardListFilter;
353+
}
354+
342355
@Override
343356
protected void onInit() {
344357
super.onInit();
@@ -618,19 +631,19 @@ private List<Shard> detectShardsToConsume(String stream, int retry) {
618631
if (endingSequenceNumber != null) {
619632
String checkpoint = this.checkpointStore.get(key);
620633

621-
boolean skipClosedShard = checkpoint != null && new BigInteger(endingSequenceNumber)
634+
boolean skipClosedAndExhaustedShard = checkpoint != null && new BigInteger(endingSequenceNumber)
622635
.compareTo(new BigInteger(checkpoint)) <= 0;
623636

624637
if (logger.isTraceEnabled()) {
625638
logger.trace("The shard [" + shard + "] in stream [" + stream
626-
+ "] is closed CLOSED with endingSequenceNumber [" + endingSequenceNumber
639+
+ "] is closed CLOSED and exhausted with endingSequenceNumber [" + endingSequenceNumber
627640
+ "].\nThe last processed checkpoint is [" + checkpoint + "]."
628-
+ (skipClosedShard ? "\nThe shard will be skipped." : ""));
641+
+ (skipClosedAndExhaustedShard ? "\nThe shard will be skipped." : ""));
629642
}
630643

631-
if (skipClosedShard) {
632-
// Skip CLOSED shard which has been read before
633-
// according a checkpoint
644+
if (skipClosedAndExhaustedShard) {
645+
// Skip CLOSED shard which has been exhausted
646+
// according the checkpoint
634647
continue;
635648
}
636649
}
@@ -649,8 +662,7 @@ private List<Shard> detectShardsToConsume(String stream, int retry) {
649662
sleep(this.describeStreamBackoff, new IllegalStateException(exceptionMessage), false);
650663
}
651664

652-
return shardsToConsume;
653-
665+
return this.shardListFilter != null ? this.shardListFilter.apply(shardsToConsume) : shardsToConsume;
654666
}
655667

656668
private void sleep(long sleepAmount, RuntimeException error, boolean interruptThread) {

0 commit comments

Comments
 (0)