Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 881720d

Browse files
committed
GH-188: expose KplMessageHandler.flushDuration
Fixes #188 Fixes spring-cloud/spring-cloud-stream-binder-aws-kinesis#154 Also fix `KinesisMessageDrivenChannelAdapter` to not ERROR unlock interrupt when the `KinesisMessageDrivenChannelAdapter` is not active any more **Cherry-pick to `2.3.x`**
1 parent 7638c5a commit 881720d

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1535,7 +1535,12 @@ public void run() {
15351535
lock.unlock();
15361536
}
15371537
catch (Exception ex) {
1538-
logger.error(ex, () -> "Error during unlocking: " + lock);
1538+
if (KinesisMessageDrivenChannelAdapter.this.active) {
1539+
logger.error(ex, () -> "Error during unlocking: " + lock);
1540+
}
1541+
else {
1542+
logger.info(ex, () -> "Error during unlocking: " + lock + " while adapter was inactive");
1543+
}
15391544
}
15401545
finally {
15411546
iterator.remove();

src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ public void setEmbeddedHeadersMapper(OutboundMessageMapper<byte[]> embeddedHeade
180180
this.embeddedHeadersMapper = embeddedHeadersMapper;
181181
}
182182

183+
/**
184+
* Configure a {@link Duration} how often to call a {@link KinesisProducer#flush()}.
185+
* @param flushDuration the {@link Duration} to periodic call of a {@link KinesisProducer#flush()}.
186+
* @since 2.3.6
187+
*/
188+
public void setFlushDuration(Duration flushDuration) {
189+
Assert.notNull(flushDuration, "'flushDuration' must not be null.");
190+
this.flushDuration = flushDuration;
191+
}
192+
183193
/**
184194
* Unsupported operation. Use {@link #setEmbeddedHeadersMapper} instead.
185195
* @param headerMapper is not used.

0 commit comments

Comments
 (0)