Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit f5704cd

Browse files
committed
Add KclMessageDrivenChannelAdapter.leaseTableName property
1 parent c113391 commit f5704cd

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import java.util.UUID;
2424
import java.util.function.Consumer;
2525

26-
import javax.annotation.Nullable;
27-
2826
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer;
2927
import reactor.core.publisher.Flux;
3028
import reactor.core.publisher.Mono;
@@ -87,6 +85,7 @@
8785
import org.springframework.integration.support.ErrorMessageUtils;
8886
import org.springframework.integration.support.management.IntegrationManagedResource;
8987
import org.springframework.jmx.export.annotation.ManagedResource;
88+
import org.springframework.lang.Nullable;
9089
import org.springframework.messaging.Message;
9190
import org.springframework.util.Assert;
9291

@@ -123,6 +122,9 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
123122

124123
private String consumerGroup = "SpringIntegration";
125124

125+
@Nullable
126+
private String leaseTableName;
127+
126128
private InboundMessageMapper<byte[]> embeddedHeadersMapper;
127129

128130
private ConfigsBuilder config;
@@ -211,6 +213,16 @@ public String getConsumerGroup() {
211213
return this.consumerGroup;
212214
}
213215

216+
/**
217+
* Set a name of the DynamoDB table name for leases.
218+
* Defaults to {@link #consumerGroup}.
219+
* @param leaseTableName the DynamoDB table name for leases.
220+
* @since 3.0.8
221+
*/
222+
public void setLeaseTableName(String leaseTableName) {
223+
this.leaseTableName = leaseTableName;
224+
}
225+
214226
/**
215227
* Specify an {@link InboundMessageMapper} to extract message headers embedded into
216228
* the record data.
@@ -372,6 +384,10 @@ protected void onInit() {
372384
this.cloudWatchClient,
373385
this.workerId,
374386
this.recordProcessorFactory);
387+
388+
if (this.leaseTableName != null) {
389+
this.config.tableName(this.leaseTableName);
390+
}
375391
}
376392

377393
private StreamTracker buildStreamTracker() {

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ void kclChannelAdapterReceivesRecords() {
121121

122122
// Because FanOut is false, there would be no Stream Consumers.
123123
assertThat(streamConsumers).hasSize(0);
124+
125+
List<String> tableNames = DYNAMO_DB.listTables().join().tableNames();
126+
assertThat(tableNames).containsOnly("test_table");
124127
}
125128

126129
@Test
@@ -172,6 +175,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
172175
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
173176
adapter.setConverter(String::new);
174177
adapter.setConsumerGroup("single_stream_group");
178+
adapter.setLeaseTableName("test_table");
175179
adapter.setFanOut(false);
176180
adapter.setMetricsLevel(MetricsLevel.NONE);
177181
adapter.setLeaseManagementConfigCustomizer(leaseManagementConfig ->

0 commit comments

Comments
 (0)