Skip to content
This repository was archived by the owner on Nov 10, 2025. It is now read-only.

Commit bc4a1c7

Browse files
committed
DynamoDbLockRegistry: Don't use managed threads
https://stackoverflow.com/questions/62025839/dynamodb-unlock-errors-during-application-stop-when-spring-kinesis-binder-tries It turns out that we may call `unlock()` on the `DynamoDbLock` when application context is already shouted down. In this case any managed `TaskExecutor`s are out of use, too. * Fix `DynamoDbLockRegistry` to use independent single-use thread for unlock operations when main thread is already interrupted
1 parent 94e4481 commit bc4a1c7

File tree

1 file changed

+61
-68
lines changed

1 file changed

+61
-68
lines changed

src/main/java/org/springframework/integration/aws/lock/DynamoDbLockRegistry.java

Lines changed: 61 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,9 +24,8 @@
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.Executor;
2626
import java.util.concurrent.ExecutorService;
27-
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ThreadFactory;
2828
import java.util.concurrent.TimeUnit;
29-
import java.util.concurrent.atomic.AtomicBoolean;
3029
import java.util.concurrent.locks.Condition;
3130
import java.util.concurrent.locks.Lock;
3231
import java.util.concurrent.locks.ReentrantLock;
@@ -96,12 +95,12 @@ public class DynamoDbLockRegistry implements ExpirableLockRegistry, Initializing
9695

9796
private static final Log logger = LogFactory.getLog(DynamoDbLockRegistry.class);
9897

98+
private final ThreadFactory customizableThreadFactory = new CustomizableThreadFactory("dynamodb-lock-registry-");
99+
99100
private final Map<String, DynamoDbLock> locks = new ConcurrentHashMap<>();
100101

101102
private final CountDownLatch createTableLatch = new CountDownLatch(1);
102103

103-
private final AtomicBoolean running = new AtomicBoolean();
104-
105104
private final AmazonDynamoDB dynamoDB;
106105

107106
private final String tableName;
@@ -126,13 +125,6 @@ public class DynamoDbLockRegistry implements ExpirableLockRegistry, Initializing
126125

127126
private long heartbeatPeriod = 5L;
128127

129-
/**
130-
* An {@link ExecutorService} to call
131-
* {@link AmazonDynamoDBLockClient#releaseLock(LockItem)} in the separate thread when
132-
* the current one is interrupted.
133-
*/
134-
private Executor executor = Executors.newCachedThreadPool(new CustomizableThreadFactory("dynamodb-lock-registry-"));
135-
136128
/**
137129
* Flag to denote whether the {@link ExecutorService} was provided via the setter and
138130
* thus should not be shutdown when {@link #destroy()} is called.
@@ -207,10 +199,10 @@ public void setRefreshPeriod(long refreshPeriod) {
207199
* Set the {@link Executor}, where is not provided then a default of cached thread
208200
* pool Executor will be used.
209201
* @param executor the executor service
202+
* @deprecated with no-op in favor of internally created unmanaged threads.
210203
*/
204+
@Deprecated
211205
public void setExecutor(Executor executor) {
212-
this.executor = executor;
213-
this.executorExplicitlySet = true;
214206
}
215207

216208
@Override
@@ -227,59 +219,62 @@ public void afterPropertiesSet() {
227219
this.leaseDuration = (long) new DirectFieldAccessor(this.dynamoDBLockClient)
228220
.getPropertyValue("leaseDurationInMilliseconds");
229221

230-
this.executor.execute(() -> {
231-
try {
232-
if (!this.dynamoDBLockClientExplicitlySet) {
222+
this.customizableThreadFactory
223+
.newThread(() -> {
233224
try {
234-
this.dynamoDBLockClient.assertLockTableExists();
235-
return;
236-
}
237-
catch (LockTableDoesNotExistException e) {
238-
if (logger.isInfoEnabled()) {
239-
logger.info("No table '" + this.tableName + "'. Creating one...");
225+
if (!this.dynamoDBLockClientExplicitlySet) {
226+
try {
227+
this.dynamoDBLockClient.assertLockTableExists();
228+
return;
229+
}
230+
catch (LockTableDoesNotExistException e) {
231+
if (logger.isInfoEnabled()) {
232+
logger.info("No table '" + this.tableName + "'. Creating one...");
233+
}
234+
}
235+
236+
CreateDynamoDBTableOptions createDynamoDBTableOptions = CreateDynamoDBTableOptions
237+
.builder(this.dynamoDB, new ProvisionedThroughput(this.readCapacity,
238+
this.writeCapacity),
239+
this.tableName)
240+
.withPartitionKeyName(this.partitionKey).withSortKeyName(this.sortKeyName).build();
241+
242+
try {
243+
AmazonDynamoDBLockClient.createLockTableInDynamoDB(createDynamoDBTableOptions);
244+
}
245+
catch (ResourceInUseException ex) {
246+
// Swallow an exception and check for table existence
247+
}
240248
}
241-
}
242-
243-
CreateDynamoDBTableOptions createDynamoDBTableOptions = CreateDynamoDBTableOptions
244-
.builder(this.dynamoDB, new ProvisionedThroughput(this.readCapacity, this.writeCapacity),
245-
this.tableName)
246-
.withPartitionKeyName(this.partitionKey).withSortKeyName(this.sortKeyName).build();
247249

248-
try {
249-
AmazonDynamoDBLockClient.createLockTableInDynamoDB(createDynamoDBTableOptions);
250-
}
251-
catch (ResourceInUseException ex) {
252-
// Swallow an exception and check for table existence
253-
}
254-
}
250+
int i = 0;
251+
// We need up to one minute to wait until table is created on AWS.
252+
while (i++ < 60) {
253+
if (this.dynamoDBLockClient.lockTableExists()) {
254+
return;
255+
}
256+
else {
257+
try {
258+
// This is allowed minimum for constant AWS requests.
259+
Thread.sleep(1000);
260+
}
261+
catch (InterruptedException e) {
262+
ReflectionUtils.rethrowRuntimeException(e);
263+
}
264+
}
265+
}
255266

256-
int i = 0;
257-
// We need up to one minute to wait until table is created on AWS.
258-
while (i++ < 60) {
259-
if (this.dynamoDBLockClient.lockTableExists()) {
260-
return;
267+
logger.error("Cannot describe DynamoDb table: " + this.tableName);
261268
}
262-
else {
263-
try {
264-
// This is allowed minimum for constant AWS requests.
265-
Thread.sleep(1000);
266-
}
267-
catch (InterruptedException e) {
268-
ReflectionUtils.rethrowRuntimeException(e);
269-
}
269+
finally {
270+
// Release create table barrier either way.
271+
// If there is an error during creation/description,
272+
// we deffer the actual ResourceNotFoundException to the end-user active
273+
// calls.
274+
this.createTableLatch.countDown();
270275
}
271-
}
272-
273-
logger.error("Cannot describe DynamoDb table: " + this.tableName);
274-
}
275-
finally {
276-
// Release create table barrier either way.
277-
// If there is an error during creation/description,
278-
// we deffer the actual ResourceNotFoundException to the end-user active
279-
// calls.
280-
this.createTableLatch.countDown();
281-
}
282-
});
276+
})
277+
.start();
283278

284279
this.initialized = true;
285280
}
@@ -303,10 +298,6 @@ private void awaitForActive() {
303298

304299
@Override
305300
public void destroy() throws Exception {
306-
if (!this.executorExplicitlySet) {
307-
((ExecutorService) this.executor).shutdown();
308-
}
309-
310301
if (!this.dynamoDBLockClientExplicitlySet) {
311302
this.dynamoDBLockClient.close();
312303
}
@@ -515,8 +506,10 @@ public void unlock() {
515506
try {
516507
if (Thread.currentThread().isInterrupted()) {
517508
LockItem lockItemToRelease = this.lockItem;
518-
DynamoDbLockRegistry.this.executor
519-
.execute(() -> DynamoDbLockRegistry.this.dynamoDBLockClient.releaseLock(lockItemToRelease));
509+
DynamoDbLockRegistry.this.customizableThreadFactory
510+
.newThread(() ->
511+
DynamoDbLockRegistry.this.dynamoDBLockClient.releaseLock(lockItemToRelease))
512+
.start();
520513
}
521514
else {
522515
DynamoDbLockRegistry.this.dynamoDBLockClient.releaseLock(this.lockItem);
@@ -538,7 +531,7 @@ public Condition newCondition() {
538531

539532
@Override
540533
public String toString() {
541-
SimpleDateFormat dateFormat = new SimpleDateFormat("YYYY-MM-dd@HH:mm:ss.SSS");
534+
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd@HH:mm:ss.SSS");
542535
return "DynamoDbLock [lockKey=" + this.key + ",lockedAt=" + dateFormat.format(new Date(this.lastUsed))
543536
+ ", lockItem=" + this.lockItem + "]";
544537
}

0 commit comments

Comments
 (0)