Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class AcquireLockOptions {
* optionally back-off and retry and to acquire the lock.
*/
private final boolean shouldSkipBlockingWait;
private final ThreadFactory threadFactory;

/**
* A builder for setting up an AcquireLockOptions object. This allows clients to configure
Expand All @@ -77,8 +79,18 @@ public static class AcquireLockOptionsBuilder {
private Optional<Runnable> sessionMonitorCallback;
private boolean isSessionMonitorSet = false;
private boolean shouldSkipBlockingWait;
private ThreadFactory threadFactory;

AcquireLockOptionsBuilder(final String partitionKey) {
/**
* Produces a new ThreadFactory that creates threads with the default settings.
*
* @return a new ThreadFactory that creates threads with the default settings
*/
private static ThreadFactory threadFactory() {
return Thread::new;
}

AcquireLockOptionsBuilder(final String partitionKey, final ThreadFactory threadFactory) {
this.partitionKey = partitionKey;
this.additionalAttributes = new HashMap<>();
this.sortKey = Optional.empty();
Expand All @@ -90,6 +102,7 @@ public static class AcquireLockOptionsBuilder {
this.shouldSkipBlockingWait = false;
this.acquireReleasedLocksConsistently = false;
this.reentrant = false;
this.threadFactory = threadFactory == null ? threadFactory() : threadFactory;
}

/**
Expand Down Expand Up @@ -327,13 +340,13 @@ public AcquireLockOptions build() {
final Optional<SessionMonitor> sessionMonitor;
if (this.isSessionMonitorSet) {
Objects.requireNonNull(this.timeUnit, "timeUnit must not be null if sessionMonitor is non-null");
sessionMonitor = Optional.of(new SessionMonitor(this.timeUnit.toMillis(this.safeTimeWithoutHeartbeat), this.sessionMonitorCallback));
sessionMonitor = Optional.of(new SessionMonitor(this.timeUnit.toMillis(this.safeTimeWithoutHeartbeat), this.sessionMonitorCallback, this.threadFactory));
} else {
sessionMonitor = Optional.empty();
}
return new AcquireLockOptions(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists,
this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, sessionMonitor,
this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant);
this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.threadFactory);
}

@Override
Expand All @@ -342,7 +355,7 @@ public String toString() {
+ this.replaceData + ", deleteLockOnRelease=" + this.deleteLockOnRelease + ", refreshPeriod=" + this.refreshPeriod + ", additionalTimeToWaitForLock="
+ this.additionalTimeToWaitForLock + ", timeUnit=" + this.timeUnit + ", additionalAttributes=" + this.additionalAttributes + ", safeTimeWithoutHeartbeat="
+ this.safeTimeWithoutHeartbeat + ", sessionMonitorCallback=" + this.sessionMonitorCallback + ", acquireReleasedLocksConsistently="
+ this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ")";
+ this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant + ", threadFactory= " + this.threadFactory + ")";
}
}

Expand All @@ -354,13 +367,26 @@ public String toString() {
* @return a builder for an AquireLockOptions object
*/
public static AcquireLockOptionsBuilder builder(final String partitionKey) {
return new AcquireLockOptionsBuilder(partitionKey);
return new AcquireLockOptionsBuilder(partitionKey, null);
}

/**
* Creates a new version of AcquireLockOptionsBuilder using the partition key as well as a ThreadFactory, which
* will be used for spawning related threads.
*
* @param partitionKey The partition key under which the lock will be acquired.
* @param threadFactory The factory to create related threads.
* @return
*/
public static AcquireLockOptionsBuilder builder(final String partitionKey, final ThreadFactory threadFactory) {
return new AcquireLockOptionsBuilder(partitionKey, threadFactory);
}

private AcquireLockOptions(final String partitionKey, final Optional<String> sortKey, final Optional<ByteBuffer> data, final Boolean replaceData,
final Boolean deleteLockOnRelease, final Boolean acquireOnlyIfLockAlreadyExists, final Long refreshPeriod, final Long additionalTimeToWaitForLock,
final TimeUnit timeUnit, final Map<String, AttributeValue> additionalAttributes, final Optional<SessionMonitor> sessionMonitor,
final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant) {
final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant,
final ThreadFactory threadFactory) {
this.partitionKey = partitionKey;
this.sortKey = sortKey;
this.data = data;
Expand All @@ -376,6 +402,7 @@ private AcquireLockOptions(final String partitionKey, final Optional<String> sor
this.shouldSkipBlockingWait = shouldSkipBlockingWait;
this.acquireReleasedLocksConsistently = acquireReleasedLocksConsistently;
this.reentrant = reentrant;
this.threadFactory = threadFactory;
}

String getPartitionKey() {
Expand Down Expand Up @@ -428,6 +455,10 @@ Map<String, AttributeValue> getAdditionalAttributes() {
return this.additionalAttributes;
}

ThreadFactory getThreadFactory() {
return this.threadFactory;
}

/**
* Constructs a SessionMonitor object for LockItem instantiation
*
Expand Down Expand Up @@ -460,15 +491,16 @@ public boolean equals(final Object other) {
&& Objects.equals(this.updateExistingLockRecord, otherOptions.updateExistingLockRecord)
&& Objects.equals(this.shouldSkipBlockingWait, otherOptions.shouldSkipBlockingWait)
&& Objects.equals(this.acquireReleasedLocksConsistently, otherOptions.acquireReleasedLocksConsistently)
&& Objects.equals(this.reentrant, otherOptions.reentrant);
&& Objects.equals(this.reentrant, otherOptions.reentrant)
&& Objects.equals(this.threadFactory, otherOptions.threadFactory);
}

@Override
public int hashCode() {
return Objects.hash(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease,
this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit,
this.additionalAttributes, this.sessionMonitor, this.updateExistingLockRecord,
this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant);
this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.threadFactory);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -237,6 +238,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable {
private final ConcurrentHashMap<String, Thread> sessionMonitors;
private final Optional<Thread> backgroundThread;
private final Function<String, ThreadFactory> namedThreadCreator;
private final ReentrantLock releaseLocksReentrantLock;
private volatile boolean shuttingDown = false;

/* These are the keys that are stored in the DynamoDB table to keep track of the locks */
Expand Down Expand Up @@ -281,6 +283,7 @@ public AmazonDynamoDBLockClient(final AmazonDynamoDBLockClientOptions amazonDyna
this.sortKeyName = amazonDynamoDBLockClientOptions.getSortKeyName();
this.namedThreadCreator = amazonDynamoDBLockClientOptions.getNamedThreadCreator();
this.holdLockOnServiceUnavailable = amazonDynamoDBLockClientOptions.getHoldLockOnServiceUnavailable();
this.releaseLocksReentrantLock = new ReentrantLock();

if (amazonDynamoDBLockClientOptions.getCreateHeartbeatBackgroundThread()) {
if (this.leaseDurationInMilliseconds < 2 * this.heartbeatPeriodInMilliseconds) {
Expand Down Expand Up @@ -853,7 +856,9 @@ public boolean releaseLock(final ReleaseLockOptions options) {
return false;
}

synchronized (lockItem) {
lockItem.getReentrantLock().lock();
try
{
try {
// Always remove the heartbeat for the lock. The
// caller's intention is to release the lock. Stopping the
Expand Down Expand Up @@ -931,6 +936,12 @@ public boolean releaseLock(final ReleaseLockOptions options) {
// get exceptions from this method.
this.removeKillSessionMonitor(lockItem.getUniqueIdentifier());
}
finally
{
lockItem.getReentrantLock()
.unlock();
}

return true;
}

Expand All @@ -952,11 +963,15 @@ private Map<String, AttributeValue> getKeys(String partitionKey, Optional<String
*/
private void releaseAllLocks() {
final Map<String, LockItem> locks = new HashMap<>(this.locks);
synchronized (locks) {
this.releaseLocksReentrantLock.lock();
try {
for (final Entry<String, LockItem> lockEntry : locks.entrySet()) {
this.releaseLock(lockEntry.getValue()); // TODO catch exceptions and report failure separately
}
}
finally {
this.releaseLocksReentrantLock.unlock();
}
}

/**
Expand Down Expand Up @@ -1163,7 +1178,8 @@ public void sendHeartbeat(final SendHeartbeatOptions options) {
throw new LockNotGrantedException("Cannot send heartbeat because lock is not granted");
}

synchronized (lockItem) {
lockItem.getReentrantLock().lock();
try {
//Set up condition for UpdateItem. Basically any changes require:
//1. I own the lock
//2. I know the current version number
Expand Down Expand Up @@ -1235,6 +1251,10 @@ public void sendHeartbeat(final SendHeartbeatOptions options) {
}
}
}
finally {
lockItem.getReentrantLock()
.unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class AmazonDynamoDBLockClientOptions {
private final Function<String, ThreadFactory> namedThreadCreator;
private final Boolean holdLockOnServiceUnavailable;


/**
* A builder for setting up an AmazonDynamoDBLockClientOptions object. By default, it is setup to have a partition key name of
* "key," a lease duration of 20 seconds, and a default heartbeat period of 5 seconds. These defaults can be overriden.
Expand All @@ -76,6 +75,14 @@ public static class AmazonDynamoDBLockClientOptionsBuilder {
namedThreadCreator());
}

AmazonDynamoDBLockClientOptionsBuilder(final DynamoDbClient dynamoDBClient, final String tableName, final Function<String, ThreadFactory> namedThreadCreator) {
this(dynamoDBClient, tableName,
/* By default, tries to set ownerName to the localhost */
generateOwnerNameFromLocalhost(),
namedThreadCreator);
}


private static final String generateOwnerNameFromLocalhost() {
try {
return Inet4Address.getLocalHost().getHostName() + UUID.randomUUID().toString();
Expand Down Expand Up @@ -233,6 +240,20 @@ public static AmazonDynamoDBLockClientOptionsBuilder builder(final DynamoDbClien
return new AmazonDynamoDBLockClientOptionsBuilder(dynamoDBClient, tableName);
}

/**
* Creates an AmazonDynamoDBLockClientOptions builder object, which can be
* used to create an AmazonDynamoDBLockClient. The only required parameters
* are the client and the table name.
*
* @param dynamoDBClient The client for talking to DynamoDB.
* @param tableName The table containing the lock client.
* @param namedThreadCreator A function that takes in a thread name and outputs a ThreadFactory that creates threads with the given name.
* @return A builder which can be used for creating a lock client.
*/
public static AmazonDynamoDBLockClientOptionsBuilder builder(final DynamoDbClient dynamoDBClient, final String tableName, final Function<String, ThreadFactory> namedThreadCreator) {
return new AmazonDynamoDBLockClientOptionsBuilder(dynamoDBClient, tableName, namedThreadCreator);
}

private AmazonDynamoDBLockClientOptions(final DynamoDbClient dynamoDBClient, final String tableName, final String partitionKeyName, final Optional<String> sortKeyName,
final String ownerName, final Long leaseDuration, final Long heartbeatPeriod, final TimeUnit timeUnit, final Boolean createHeartbeatBackgroundThread,
final Function<String, ThreadFactory> namedThreadCreator, final Boolean holdLockOnServiceUnavailable) {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

/**
Expand All @@ -37,6 +39,7 @@ public class LockItem implements Closeable {
private final AmazonDynamoDBLockClient client;
private final String partitionKey;
private final Optional<String> sortKey;
private final ReentrantLock reentrantLock;

private Optional<ByteBuffer> data;
private final String ownerName;
Expand Down Expand Up @@ -87,6 +90,7 @@ public class LockItem implements Closeable {
this.ownerName = ownerName;
this.deleteLockItemOnClose = deleteLockItemOnClose;

this.reentrantLock = new ReentrantLock();
this.leaseDuration = new AtomicLong(leaseDuration);
this.lookupTime = new AtomicLong(lastUpdatedTimeInMilliseconds);
this.recordVersionNumber = new StringBuffer(recordVersionNumber);
Expand All @@ -105,6 +109,15 @@ public String getPartitionKey() {
return this.partitionKey;
}

/**
* Returns the {@link ReentrantLock} for this {@link LockItem}.
*
* @return The reentrant lock for this lock item.
*/
public ReentrantLock getReentrantLock() {
return this.reentrantLock;
}

/**
* Returns the sort key associated with the lock, if there is one.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.amazonaws.services.dynamodbv2;

import java.util.Optional;
import java.util.concurrent.ThreadFactory;

import com.amazonaws.services.dynamodbv2.util.LockClientUtils;

Expand All @@ -29,6 +30,7 @@
final class SessionMonitor {
private final long safeTimeWithoutHeartbeatMillis;
private final Optional<Runnable> callback;
private final ThreadFactory threadFactory;

/**
* Constructs a SessionMonitor object.
Expand All @@ -38,10 +40,15 @@ final class SessionMonitor {
* "danger zone"
* @param callback the callback to run when the lock's lease enters the danger
* zone
* @param threadFactory the factory to create the thread that will run the callback
*/
public SessionMonitor(final long safeTimeWithoutHeartbeatMillis, final Optional<Runnable> callback) {
public SessionMonitor(
final long safeTimeWithoutHeartbeatMillis,
final Optional<Runnable> callback,
final ThreadFactory threadFactory) {
this.safeTimeWithoutHeartbeatMillis = safeTimeWithoutHeartbeatMillis;
this.callback = callback;
this.threadFactory = threadFactory;
}

/**
Expand Down Expand Up @@ -78,7 +85,7 @@ long millisecondsUntilLeaseEntersDangerZone(final long lastAbsoluteTimeUpdatedMi
*/
public void runCallback() {
if (this.callback.isPresent()) {
final Thread t = new Thread(this.callback.get());
final Thread t = this.threadFactory.newThread(this.callback.get());
t.setDaemon(true);
t.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void equals_differentPartitionKey_returnFalse() {
1000, //last updated time in milliseconds
"recordVersionNumber",
false, //released
Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor
Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor
new HashMap<>())));
}

Expand All @@ -84,7 +84,7 @@ public void equals_differentOwner_returnFalse() {
1000, //last updated time in milliseconds
"recordVersionNumber",
false, //released
Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor
Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor
new HashMap<>())));
}

Expand All @@ -104,7 +104,7 @@ public void isExpired_whenIsReleasedTrue_returnTrue() {
1000, //last updated time in milliseconds
"recordVersionNumber",
true, //released
Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor
Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor
new HashMap<>()).isExpired());
}

Expand Down Expand Up @@ -139,7 +139,7 @@ public void millisecondsUntilDangerZoneEntered_whenIsReleasedTrue_throwsIllegalS
1000, //last updated time in milliseconds
"recordVersionNumber",
true, //released
Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor
Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor
new HashMap<>()).millisecondsUntilDangerZoneEntered();
}

Expand Down Expand Up @@ -191,7 +191,7 @@ static LockItem createLockItem(AmazonDynamoDBLockClient lockClient) {
1000, //last updated time in milliseconds
"recordVersionNumber",
false, //released
Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor
Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor
new HashMap<>()); //additional attributes
}
}
Loading