Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit ab2f9b8

Browse files
committed
Expose KCL pollingMaxRecords & pollingIdleTime options
1 parent 06ca18a commit ab2f9b8

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,10 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
170170

171171
private boolean emptyRecordList;
172172

173+
private int pollingMaxRecords = PollingConfig.DEFAULT_MAX_RECORDS;
174+
175+
private long pollingIdleTime = 1500L;
176+
173177
public KclMessageDrivenChannelAdapter(String... streams) {
174178
this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
175179
}
@@ -369,6 +373,26 @@ public void setEmptyRecordList(boolean emptyRecordList) {
369373
this.emptyRecordList = emptyRecordList;
370374
}
371375

376+
/**
377+
* The number of records to poll from Kinesis when using {@link PollingConfig}.
378+
* @param pollingMaxRecords the number of records to poll from Kinesis.
379+
* @since 3.0.8
380+
* @see PollingConfig#maxRecords(int)
381+
*/
382+
public void setPollingMaxRecords(int pollingMaxRecords) {
383+
this.pollingMaxRecords = pollingMaxRecords;
384+
}
385+
386+
/**
387+
* The idle timeout between polls when using {@link PollingConfig}.
388+
* @param pollingIdleTime idle timeout between polls.
389+
* @since 3.0.8
390+
* @see PollingConfig#idleTimeBetweenReadsInMillis(long)
391+
*/
392+
public void setPollingIdleTime(long pollingIdleTime) {
393+
this.pollingIdleTime = pollingIdleTime;
394+
}
395+
372396
@Override
373397
protected void onInit() {
374398
super.onInit();
@@ -425,7 +449,9 @@ protected void doStart() {
425449
else {
426450
retrievalSpecificConfig =
427451
new PollingConfig(this.kinesisClient)
428-
.streamName(singleStreamName);
452+
.streamName(singleStreamName)
453+
.maxRecords(this.pollingMaxRecords)
454+
.idleTimeBetweenReadsInMillis(this.pollingIdleTime);
429455
}
430456

431457
RetrievalConfig retrievalConfig = this.config.retrievalConfig()

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,15 @@ public void shardConsumerDispatchPollIntervalMillisOverriddenByCustomizer() {
162162
assertThat(shardConsumerDispatchPollIntervalMillis).isEqualTo(500L);
163163
}
164164

165+
@Test
166+
public void pollingMaxRecordsIsPropagated() {
167+
Integer maxRecords =
168+
TestUtils.getPropertyValue(this.kclMessageDrivenChannelAdapter,
169+
"scheduler.retrievalConfig.retrievalSpecificConfig.maxRecords",
170+
Integer.class);
171+
assertThat(maxRecords).isEqualTo(99);
172+
}
173+
165174
@Configuration
166175
@EnableIntegration
167176
public static class TestConfiguration {
@@ -184,6 +193,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
184193
coordinatorConfig.shardConsumerDispatchPollIntervalMillis(500L));
185194
adapter.setBindSourceRecord(true);
186195
adapter.setEmptyRecordList(true);
196+
adapter.setPollingMaxRecords(99);
187197
return adapter;
188198
}
189199

0 commit comments

Comments
 (0)