Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit 10e96e2

Browse files
authored
GH-163: Replace describeStream with listStream
Fixes #163
1 parent a4e551d commit 10e96e2

File tree

10 files changed

+386
-294
lines changed

10 files changed

+386
-294
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KinesisMessageDrivenChannelAdapter.java

Lines changed: 354 additions & 258 deletions
Large diffs are not rendered by default.

src/main/java/org/springframework/integration/aws/support/AbstractMessageAttributesHeaderMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public abstract class AbstractMessageAttributesHeaderMapper<A> implements Header
4646

4747
private static final Log logger = LogFactory.getLog(SqsHeaderMapper.class);
4848

49-
private volatile String[] outboundHeaderNames = { "!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP,
50-
"!" + AwsHeaders.MESSAGE_ID, "!" + AwsHeaders.QUEUE, "!" + AwsHeaders.TOPIC, "*" };
49+
private volatile String[] outboundHeaderNames = {"!" + MessageHeaders.ID, "!" + MessageHeaders.TIMESTAMP,
50+
"!" + AwsHeaders.MESSAGE_ID, "!" + AwsHeaders.QUEUE, "!" + AwsHeaders.TOPIC, "*"};
5151

5252
/**
5353
* Provide the header names that should be mapped to a AWS request object attributes

src/test/java/org/springframework/integration/aws/inbound/KinesisMessageDrivenChannelAdapterTests.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,21 +60,20 @@
6060
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6161

6262
import com.amazonaws.services.kinesis.AmazonKinesis;
63-
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
64-
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
6563
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
6664
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
6765
import com.amazonaws.services.kinesis.model.GetRecordsResult;
6866
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
67+
import com.amazonaws.services.kinesis.model.ListShardsRequest;
68+
import com.amazonaws.services.kinesis.model.ListShardsResult;
6969
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
7070
import com.amazonaws.services.kinesis.model.Record;
7171
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
7272
import com.amazonaws.services.kinesis.model.Shard;
73-
import com.amazonaws.services.kinesis.model.StreamDescription;
74-
import com.amazonaws.services.kinesis.model.StreamStatus;
7573

7674
/**
7775
* @author Artem Bilan
76+
* @author Matthias Wesolowski
7877
* @since 1.1
7978
*/
8079
@SpringJUnitConfig
@@ -109,7 +108,7 @@ void setup() {
109108
}
110109

111110
@Test
112-
@SuppressWarnings({ "unchecked", "rawtypes" })
111+
@SuppressWarnings({"unchecked", "rawtypes"})
113112
void testKinesisMessageDrivenChannelAdapter() {
114113
this.kinesisMessageDrivenChannelAdapter.start();
115114
final Set<KinesisShardOffset> shardOffsets = TestUtils.getPropertyValue(this.kinesisMessageDrivenChannelAdapter,
@@ -214,34 +213,35 @@ void testResharding() throws InterruptedException {
214213
assertThat(n).isLessThan(100);
215214

216215
// When resharding happens the describeStream() is performed again
217-
verify(this.amazonKinesisForResharding, atLeast(1)).describeStream(any(DescribeStreamRequest.class));
216+
verify(this.amazonKinesisForResharding, atLeast(1))
217+
.listShards(any(ListShardsRequest.class));
218218

219219
this.reshardingChannelAdapter.stop();
220220

221221
KinesisShardEndedEvent kinesisShardEndedEvent = this.config.shardEndedEventReference.get();
222222

223223
assertThat(kinesisShardEndedEvent).isNotNull()
224-
.extracting(KinesisShardEndedEvent::getShardKey)
225-
.isEqualTo("SpringIntegration:streamForResharding:closedShard");
224+
.extracting(KinesisShardEndedEvent::getShardKey)
225+
.isEqualTo("SpringIntegration:streamForResharding:closedShard");
226226
}
227227

228228
@Configuration
229229
@EnableIntegration
230230
public static class Config {
231231

232+
private final AtomicReference<KinesisShardEndedEvent> shardEndedEventReference = new AtomicReference<>();
233+
232234
@Bean
233235
public AmazonKinesis amazonKinesis() {
234236
AmazonKinesis amazonKinesis = mock(AmazonKinesis.class);
235237

236-
given(amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(STREAM1))).willReturn(
237-
new DescribeStreamResult().withStreamDescription(
238-
new StreamDescription().withStreamName(STREAM1).withStreamStatus(StreamStatus.UPDATING)),
239-
new DescribeStreamResult().withStreamDescription(new StreamDescription().withStreamName(STREAM1)
240-
.withStreamStatus(StreamStatus.ACTIVE).withHasMoreShards(false)
238+
given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM1))).willReturn(
239+
new ListShardsResult()
241240
.withShards(new Shard().withShardId("1").withSequenceNumberRange(new SequenceNumberRange()),
242241
new Shard().withShardId("2").withSequenceNumberRange(new SequenceNumberRange()),
243242
new Shard().withShardId("3").withSequenceNumberRange(
244-
new SequenceNumberRange().withEndingSequenceNumber("1")))));
243+
new SequenceNumberRange().withEndingSequenceNumber("1")))
244+
);
245245

246246
String shard1Iterator1 = "shard1Iterator1";
247247
String shard1Iterator2 = "shard1Iterator2";
@@ -329,12 +329,10 @@ public PollableChannel kinesisChannel() {
329329
public AmazonKinesis amazonKinesisForResharding() {
330330
AmazonKinesis amazonKinesis = mock(AmazonKinesis.class);
331331

332-
given(amazonKinesis.describeStream(new DescribeStreamRequest().withStreamName(STREAM_FOR_RESHARDING)))
333-
.willReturn(new DescribeStreamResult()
334-
.withStreamDescription(new StreamDescription().withStreamName(STREAM_FOR_RESHARDING)
335-
.withStreamStatus(StreamStatus.ACTIVE).withHasMoreShards(false)
336-
.withShards(new Shard().withShardId("closedShard").withSequenceNumberRange(
337-
new SequenceNumberRange().withEndingSequenceNumber("1")))));
332+
given(amazonKinesis.listShards(new ListShardsRequest().withStreamName(STREAM_FOR_RESHARDING)))
333+
.willReturn(new ListShardsResult()
334+
.withShards(new Shard().withShardId("closedShard").withSequenceNumberRange(
335+
new SequenceNumberRange().withEndingSequenceNumber("1"))));
338336

339337
String shard1Iterator1 = "shard1Iterator1";
340338

@@ -370,8 +368,6 @@ public KinesisMessageDrivenChannelAdapter reshardingChannelAdapter() {
370368
return adapter;
371369
}
372370

373-
private final AtomicReference<KinesisShardEndedEvent> shardEndedEventReference = new AtomicReference<>();
374-
375371
@EventListener
376372
public void handleKinesisShardEndedEvent(KinesisShardEndedEvent event) {
377373
this.shardEndedEventReference.set(event);

src/test/java/org/springframework/integration/aws/inbound/S3StreamingChannelAdapterTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@
7373
@DirtiesContext
7474
public class S3StreamingChannelAdapterTests {
7575

76+
private static final String S3_BUCKET = "S3_BUCKET";
77+
7678
@TempDir
7779
static Path TEMPORARY_FOLDER;
7880

79-
private static final String S3_BUCKET = "S3_BUCKET";
80-
8181
private static List<S3Object> S3_OBJECTS;
8282

8383
@Autowired

src/test/java/org/springframework/integration/aws/inbound/SqsMessageDrivenChannelAdapterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ void testSqsMessageDrivenChannelAdapter() {
107107
.hasCauseExactlyInstanceOf(IllegalArgumentException.class)
108108
.hasMessageContaining("Queue with name 'foo' does not exist");
109109

110-
assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] { "testQueue" });
110+
assertThat(this.sqsMessageDrivenChannelAdapter.getQueues()).isEqualTo(new String[] {"testQueue"});
111111
}
112112

113113
@Configuration

src/test/java/org/springframework/integration/aws/kinesis/KplKclIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
@LocalstackDockerProperties(randomizePorts = true,
8080
hostNameResolver = EnvironmentHostNameResolver.class,
8181
environmentVariableProvider = LocalStackSslEnvironmentProvider.class,
82-
services = { "kinesis", "dynamodb", "cloudwatch" })
82+
services = {"kinesis", "dynamodb", "cloudwatch"})
8383
@DirtiesContext
8484
public class KplKclIntegrationTests {
8585

src/test/java/org/springframework/integration/aws/lock/DynamoDbLockRegistryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,10 @@
6666
@DirtiesContext
6767
public class DynamoDbLockRegistryTests {
6868

69-
private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
70-
7169
private static AmazonDynamoDBAsync DYNAMO_DB;
7270

71+
private final AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
72+
7373
@Autowired
7474
private DynamoDbLockRegistry dynamoDbLockRegistry;
7575

src/test/java/org/springframework/integration/aws/metadata/DynamoDbMetadataStoreTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@
5454
services = "dynamodb")
5555
class DynamoDbMetadataStoreTests {
5656

57-
private static AmazonDynamoDBAsync DYNAMO_DB;
58-
5957
private static final String TEST_TABLE = "testMetadataStore";
6058

59+
private static AmazonDynamoDBAsync DYNAMO_DB;
60+
6161
private static DynamoDbMetadataStore store;
6262

6363
private final String file1 = "/remotepath/filesTodownload/file-1.txt";

src/test/java/org/springframework/integration/aws/outbound/S3MessageHandlerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,6 @@
111111
@DirtiesContext
112112
public class S3MessageHandlerTests {
113113

114-
private static SpelExpressionParser PARSER = new SpelExpressionParser();
115-
116114
// define the bucket and file names used throughout the test
117115
private static final String S3_BUCKET_NAME = "myBucket";
118116

@@ -123,6 +121,8 @@ public class S3MessageHandlerTests {
123121
@TempDir
124122
static Path temporaryFolder;
125123

124+
private static SpelExpressionParser PARSER = new SpelExpressionParser();
125+
126126
@Autowired
127127
private AmazonS3 amazonS3;
128128

src/test/resources/log4j2-test.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
<Configuration status="WARN">
33
<Appenders>
44
<Console name="STDOUT" target="SYSTEM_OUT">
5-
<PatternLayout pattern="%d %p [%t] [%c] - %m%n" />
5+
<PatternLayout pattern="%d %p [%t] [%c] - %m%n"/>
66
</Console>
77
</Appenders>
88
<Loggers>
99
<Logger name="org.springframework" level="warn"/>
1010
<Logger name="org.springframework.integration" level="warn"/>
1111
<Logger name="org.springframework.integration.aws" level="info"/>
1212
<Root level="warn">
13-
<AppenderRef ref="STDOUT" />
13+
<AppenderRef ref="STDOUT"/>
1414
</Root>
1515
</Loggers>
1616
</Configuration>

0 commit comments

Comments
 (0)