4040import software .amazon .kinesis .exceptions .InvalidStateException ;
4141import software .amazon .kinesis .exceptions .ShutdownException ;
4242import software .amazon .kinesis .exceptions .ThrottlingException ;
43- import software .amazon .kinesis .lifecycle .LifecycleConfig ;
4443import software .amazon .kinesis .lifecycle .events .InitializationInput ;
4544import software .amazon .kinesis .lifecycle .events .LeaseLostInput ;
4645import software .amazon .kinesis .lifecycle .events .ProcessRecordsInput ;
5453import software .amazon .kinesis .processor .SingleStreamTracker ;
5554import software .amazon .kinesis .processor .StreamTracker ;
5655import software .amazon .kinesis .retrieval .KinesisClientRecord ;
57- import software .amazon .kinesis .retrieval .RetrievalConfig ;
56+ import software .amazon .kinesis .retrieval .RetrievalSpecificConfig ;
57+ import software .amazon .kinesis .retrieval .fanout .FanOutConfig ;
58+ import software .amazon .kinesis .retrieval .polling .PollingConfig ;
5859
5960import org .springframework .context .ApplicationEventPublisher ;
6061import org .springframework .context .ApplicationEventPublisherAware ;
@@ -131,6 +132,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
131132
132133 private boolean bindSourceRecord ;
133134
135+ private boolean fanOut = true ;
136+
134137 private ApplicationEventPublisher applicationEventPublisher ;
135138
136139 private volatile Scheduler scheduler ;
@@ -248,6 +251,15 @@ public void setBindSourceRecord(boolean bindSourceRecord) {
248251 this .bindSourceRecord = bindSourceRecord ;
249252 }
250253
254+ /**
255+ * Specify a retrieval strategy: fan-out (true; default) or polling (false).
256+ * @param fanOut false for a polling retrieval strategy.
257+ * @since 3.0.2
258+ */
259+ public void setFanOut (boolean fanOut ) {
260+ this .fanOut = fanOut ;
261+ }
262+
251263 @ Override
252264 protected void onInit () {
253265 super .onInit ();
@@ -259,6 +271,28 @@ protected void onInit() {
259271 this .cloudWatchClient ,
260272 this .workerId ,
261273 this .recordProcessorFactory );
274+
275+ this .config .lifecycleConfig ().taskBackoffTimeMillis (this .consumerBackoff );
276+
277+ RetrievalSpecificConfig retrievalSpecificConfig ;
278+
279+ String singleStreamName = this .streams .length == 1 ? this .streams [0 ] : null ;
280+
281+ if (this .fanOut ) {
282+ retrievalSpecificConfig =
283+ new FanOutConfig (this .kinesisClient )
284+ .applicationName (this .consumerGroup )
285+ .streamName (singleStreamName );
286+ }
287+ else {
288+ retrievalSpecificConfig =
289+ new PollingConfig (this .kinesisClient )
290+ .streamName (singleStreamName );
291+ }
292+
293+ this .config .retrievalConfig ()
294+ .glueSchemaRegistryDeserializer (this .glueSchemaRegistryDeserializer )
295+ .retrievalSpecificConfig (retrievalSpecificConfig );
262296 }
263297
264298 private StreamTracker buildStreamTracker () {
@@ -281,20 +315,16 @@ protected void doStart() {
281315 + "because it does not make sense in case of [ListenerMode.batch]." );
282316 }
283317
284- LifecycleConfig lifecycleConfig = this .config .lifecycleConfig ().taskBackoffTimeMillis (this .consumerBackoff );
285- RetrievalConfig retrievalConfig =
286- this .config .retrievalConfig ()
287- .glueSchemaRegistryDeserializer (this .glueSchemaRegistryDeserializer );
288318
289319 this .scheduler =
290320 new Scheduler (
291321 this .config .checkpointConfig (),
292322 this .config .coordinatorConfig (),
293323 this .config .leaseManagementConfig (),
294- lifecycleConfig ,
324+ this . config . lifecycleConfig () ,
295325 this .config .metricsConfig (),
296326 this .config .processorConfig (),
297- retrievalConfig );
327+ this . config . retrievalConfig () );
298328
299329 this .executor .execute (this .scheduler );
300330 }
0 commit comments