Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YugabyteDB] Improve locking mechanism during migrations #76

Merged
merged 4 commits into from
Feb 12, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Ensure that only the thread which sets the lock_id, resets it
ashetkar committed Nov 26, 2024
commit 6b4e98ad6b056025144ebd4dfde215b6dffebbde
Original file line number Diff line number Diff line change
@@ -31,12 +31,13 @@
public class YugabyteDBDatabase extends PostgreSQLDatabase {

public static final String LOCK_TABLE_NAME = "YB_FLYWAY_LOCK_TABLE";
private static final String DROP_TABLE_IF_EXISTS = "DROP TABLE IF EXISTS " + LOCK_TABLE_NAME;
/**
* This table is used to enforce locking through SELECT ... FOR UPDATE on a
* token row inserted in this table. The token row is inserted with the name
* of the Flyway's migration history table as a token for simplicity.
*/
private static final String CREATE_LOCK_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (table_name varchar PRIMARY KEY, locked bool)";
private static final String CREATE_LOCK_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (table_name varchar PRIMARY KEY, lock_id bigint)";

public YugabyteDBDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
super(configuration, jdbcConnectionFactory, statementInterceptor);
@@ -84,6 +85,7 @@ public boolean useSingleConnection() {

private void createLockTable() {
try {
jdbcTemplate.execute(DROP_TABLE_IF_EXISTS);
jdbcTemplate.execute(CREATE_LOCK_TABLE_DDL);
} catch (SQLException e) {
throw new FlywaySqlException("Unable to initialize the lock table", e);
Original file line number Diff line number Diff line change
@@ -4,11 +4,12 @@
import org.flywaydb.core.api.FlywayException;
import org.flywaydb.core.internal.exception.FlywaySqlException;
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
import org.flywaydb.core.internal.strategy.RetryStrategy;
import org.flywaydb.core.internal.util.FlywayDbWebsiteLinks;
import org.flywaydb.core.internal.util.SqlCallable;

import java.sql.*;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;

@@ -18,6 +19,7 @@ public class YugabyteDBExecutionTemplate {
private final JdbcTemplate jdbcTemplate;
private final String tableName;
private static final Map<String, Boolean> tableEntries = new ConcurrentHashMap<>();
private static final Random random = new Random();

YugabyteDBExecutionTemplate(JdbcTemplate jdbcTemplate, String tableName) {
this.jdbcTemplate = jdbcTemplate;
@@ -26,8 +28,9 @@ public class YugabyteDBExecutionTemplate {

public <T> T execute(Callable<T> callable) {
Exception error = null;
long lockId = 0;
try {
lock();
lockId = lock();
return callable.call();
} catch (RuntimeException e) {
error = e;
@@ -36,21 +39,24 @@ public <T> T execute(Callable<T> callable) {
error = e;
throw new FlywayException(e);
} finally {
unlock(error);
if (lockId != 0) {
unlock(lockId, error);
}
}
}

private void lock() throws SQLException {
RetryStrategy strategy = new RetryStrategy();
strategy.doWithRetries(this::tryLock, "Interrupted while attempting to acquire lock through SELECT ... FOR UPDATE",
private long lock() throws SQLException {
YBRetryStrategy strategy = new YBRetryStrategy();
return strategy.doWithRetries(this::tryLock, "Interrupted while attempting to acquire lock through SELECT ... FOR UPDATE",
"Number of retries exceeded while attempting to acquire lock through SELECT ... FOR UPDATE. " +
"Configure the number of retries with the 'lockRetryCount' configuration option: " + FlywayDbWebsiteLinks.LOCK_RETRY_COUNT);

}

private boolean tryLock() {
private long tryLock() {
Exception exception = null;
boolean txStarted = false, success = false;
boolean txStarted = false;
long lockIdToBeReturned = 0;
Statement statement = null;
try {
statement = jdbcTemplate.getConnection().createStatement();
@@ -59,7 +65,7 @@ private boolean tryLock() {
try {
statement.executeUpdate("INSERT INTO "
+ YugabyteDBDatabase.LOCK_TABLE_NAME
+ " VALUES ('" + tableName + "', 'false')");
+ " VALUES ('" + tableName + "', 0)");
tableEntries.put(tableName, true);
LOG.info(Thread.currentThread().getName() + "> Inserted a token row for " + tableName + " in " + YugabyteDBDatabase.LOCK_TABLE_NAME);
} catch (SQLException e) {
@@ -72,38 +78,38 @@ private boolean tryLock() {
}
}

boolean locked;
String selectForUpdate = "SELECT locked FROM "
long lockIdRead = 0;
String selectForUpdate = "SELECT lock_id FROM "
+ YugabyteDBDatabase.LOCK_TABLE_NAME
+ " WHERE table_name = '"
+ tableName
+ "' FOR UPDATE";
String updateLocked = "UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME
+ " SET locked = true WHERE table_name = '"
+ tableName + "'";

statement.execute("BEGIN");
txStarted = true;
ResultSet rs = statement.executeQuery(selectForUpdate);
if (rs.next()) {
locked = rs.getBoolean("locked");
lockIdRead = rs.getLong("lock_id");

if (locked) {
if (lockIdRead != 0) {
statement.execute("COMMIT");
txStarted = false;
LOG.debug(Thread.currentThread().getName() + "> Another Flyway operation is in progress. Allowing it to complete");
} else {
LOG.debug(Thread.currentThread().getName() + "> Setting locked = true");
statement.executeUpdate(updateLocked);
success = true;
lockIdToBeReturned = random.nextLong();
String updateLockId = "UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME
+ " SET lock_id = " + lockIdToBeReturned + " WHERE table_name = '"
+ tableName + "'";
LOG.debug(Thread.currentThread().getName() + "> Setting lock_id = " + lockIdToBeReturned);
statement.executeUpdate(updateLockId);
}
} else {
// For some reason the record was not found, retry
tableEntries.remove(tableName);
}

} catch (SQLException e) {
LOG.warn(Thread.currentThread().getName() + "> Unable to perform lock action, SQLState: " + e.getSQLState());
LOG.debug(Thread.currentThread().getName() + "> Unable to perform lock action, SQLState: " + e.getSQLState());
if (!"40001".equalsIgnoreCase(e.getSQLState())) {
exception = new FlywaySqlException("Unable to perform lock action", e);
throw (FlywaySqlException) exception;
@@ -112,56 +118,103 @@ private boolean tryLock() {
if (txStarted) {
try {
statement.execute("COMMIT");
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = true");
// lock_id may not be set if there is exception in select for update
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to attempt to set lock_id");
} catch (SQLException e) {
if (exception == null) {
throw new FlywaySqlException("Failed to commit the tx to set locked = true", e);
throw new FlywaySqlException("Failed to commit the tx to set lock_id ", e);
}
LOG.warn(Thread.currentThread().getName() + "> Failed to commit the tx to set locked = true: " + e);
LOG.warn(Thread.currentThread().getName() + "> Failed to commit the tx to set lock_id: " + e);
}
}
}
return success;
return lockIdToBeReturned;
}

private void unlock(Exception rethrow) {
private void unlock(long prevLockId, Exception rethrow) {
Statement statement = null;
try {
statement = jdbcTemplate.getConnection().createStatement();
statement.execute("BEGIN");
ResultSet rs = statement.executeQuery("SELECT locked FROM " + YugabyteDBDatabase.LOCK_TABLE_NAME + " WHERE table_name = '" + tableName + "' FOR UPDATE");
ResultSet rs = statement.executeQuery("SELECT lock_id FROM " + YugabyteDBDatabase.LOCK_TABLE_NAME + " WHERE table_name = '" + tableName + "' FOR UPDATE");

if (rs.next()) {
boolean locked = rs.getBoolean("locked");
if (locked) {
statement.executeUpdate("UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + " SET locked = false WHERE table_name = '" + tableName + "'");
long lockId = rs.getLong("lock_id");
if (lockId == prevLockId) {
statement.executeUpdate("UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + " SET lock_id = 0 WHERE table_name = '" + tableName + "'");
} else {
// Unexpected. This may happen only when callable took too long to complete
// and another thread forcefully reset it.
String msgLock = "Expected and actual lock_id mismatch. Expected: " + prevLockId + ", Actual: " + lockId;
String msg = "Unlock failed but the Flyway operation may have succeeded. Check your Flyway operation before re-trying";
LOG.warn(Thread.currentThread().getName() + "> " + msg);
LOG.warn(Thread.currentThread().getName() + "> " + msg + "\n" + msgLock);
if (rethrow == null) {
throw new FlywayException(msg);
}
}
}
} catch (SQLException e) {
if (rethrow == null) {
rethrow = new FlywayException("Unable to perform unlock action", e);
rethrow = new FlywaySqlException("Unable to perform unlock action for lock_id " + prevLockId, e);
throw (FlywaySqlException) rethrow;
}
LOG.warn("Unable to perform unlock action " + e);
LOG.warn("Unable to perform unlock action for lock_id " + prevLockId + ": " + e);
} finally {
try {
statement.execute("COMMIT");
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = false");
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to reset lock_id " + prevLockId);
} catch (SQLException e) {
if (rethrow == null) {
throw new FlywaySqlException("Failed to commit unlock action", e);
throw new FlywaySqlException("Failed to commit unlock action for lock_id " + prevLockId, e);
}
LOG.warn("Failed to commit unlock action: " + e);
LOG.warn("Failed to commit unlock action for lock_id " + prevLockId + ": " + e);
}
}
}

public static class YBRetryStrategy {
private static int numberOfRetries = 50;
private static boolean unlimitedRetries;
private int numberOfRetriesRemaining;

public YBRetryStrategy() {
this.numberOfRetriesRemaining = numberOfRetries;
}

public static void setNumberOfRetries(int retries) {
numberOfRetries = retries;
unlimitedRetries = retries < 0;
}

private boolean hasMoreRetries() {
return unlimitedRetries || this.numberOfRetriesRemaining > 0;
}

private void nextRetry() {
if (!unlimitedRetries) {
--this.numberOfRetriesRemaining;
}
}

private int nextWaitInMilliseconds() {
return 1000;
}

public long doWithRetries(SqlCallable<Long> callable, String interruptionMessage, String retriesExceededMessage) throws SQLException {
long id = 0;
while(id == 0) {
id = callable.call();
try {
Thread.sleep(this.nextWaitInMilliseconds());
} catch (InterruptedException e) {
throw new FlywayException(interruptionMessage, e);
}
if (!this.hasMoreRetries()) {
throw new FlywayException(retriesExceededMessage);
}
this.nextRetry();
}
return id;
}
}
}