Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit ea13591

Browse files
committed
Add lockRenewalTimeout to the KinesisMesChAd
Related to spring-cloud/spring-cloud-stream-binder-aws-kinesis#148
1 parent b43a230 commit ea13591

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ public class KinesisMessageDrivenChannelAdapter extends MessageProducerSupport
161161

162162
private int describeStreamRetries = 50;
163163

164+
private long lockRenewalTimeout = 10_000L;
165+
164166
private boolean resetCheckpoints;
165167

166168
private InboundMessageMapper<byte[]> embeddedHeadersMapper;
@@ -291,6 +293,16 @@ public void setStartTimeout(int startTimeout) {
291293
this.startTimeout = startTimeout;
292294
}
293295

296+
/**
297+
* Configure a timeout in milliseconds to wait for lock on shard renewal.
298+
* @param lockRenewalTimeout the timeout to wait for lock renew in milliseconds.
299+
* @since 2.3.5
300+
*/
301+
public void setLockRenewalTimeout(long lockRenewalTimeout) {
302+
Assert.isTrue(lockRenewalTimeout > 0, "'lockRenewalTimeout' must be more than 0");
303+
this.lockRenewalTimeout = lockRenewalTimeout;
304+
}
305+
294306
/**
295307
* The maximum number of concurrent {@link ConsumerInvoker}s running. The {@link ShardConsumer}s
296308
* are evenly distributed between {@link ConsumerInvoker}s. Messages from within the same shard
@@ -920,7 +932,7 @@ void stop() {
920932
LockCompletableFuture unlockFuture = new LockCompletableFuture(this.key);
921933
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.unlock(unlockFuture);
922934
try {
923-
unlockFuture.get(1, TimeUnit.SECONDS);
935+
unlockFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout, TimeUnit.MILLISECONDS);
924936
}
925937
catch (Exception ex) {
926938
if (ex instanceof InterruptedException) {
@@ -1029,7 +1041,8 @@ private boolean renewLockIfAny() {
10291041
KinesisMessageDrivenChannelAdapter.this.shardConsumerManager.renewLock(renewLockFuture);
10301042
boolean lockRenewed = false;
10311043
try {
1032-
lockRenewed = renewLockFuture.get(1, TimeUnit.SECONDS);
1044+
lockRenewed = renewLockFuture.get(KinesisMessageDrivenChannelAdapter.this.lockRenewalTimeout,
1045+
TimeUnit.MILLISECONDS);
10331046
}
10341047
catch (Exception ex) {
10351048
if (ex instanceof InterruptedException) {

0 commit comments

Comments
 (0)