Skip to content

Commit 33a9004

Browse files
authored
Merge pull request #76 from yugabyte/improve-locking
[YugabyteDB] Improve locking mechanism during migrations
2 parents 19dd5e2 + 4a63f4b commit 33a9004

File tree

2 files changed

+131
-40
lines changed

2 files changed

+131
-40
lines changed

flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBDatabase.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,21 @@
2525

2626
import java.sql.Connection;
2727
import java.sql.SQLException;
28+
import java.util.List;
2829

2930

3031
@CustomLog
3132
public class YugabyteDBDatabase extends PostgreSQLDatabase {
3233

3334
public static final String LOCK_TABLE_NAME = "YB_FLYWAY_LOCK_TABLE";
35+
private static final String LOCK_TABLE_SCHEMA_SQL = "SELECT table_name, column_name FROM information_schema.columns WHERE table_name = '" + LOCK_TABLE_NAME + "'";
36+
private static final String DROP_LOCK_TABLE_IF_EXISTS_DDL = "DROP TABLE IF EXISTS " + LOCK_TABLE_NAME;
3437
/**
3538
* This table is used to enforce locking through SELECT ... FOR UPDATE on a
3639
* token row inserted in this table. The token row is inserted with the name
3740
* of the Flyway's migration history table as a token for simplicity.
3841
*/
39-
private static final String CREATE_LOCK_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (table_name varchar PRIMARY KEY, locked bool)";
42+
private static final String CREATE_LOCK_TABLE_DDL = "CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (table_name varchar PRIMARY KEY, lock_id bigint, ts timestamp)";
4043

4144
public YugabyteDBDatabase(Configuration configuration, JdbcConnectionFactory jdbcConnectionFactory, StatementInterceptor statementInterceptor) {
4245
super(configuration, jdbcConnectionFactory, statementInterceptor);
@@ -84,7 +87,21 @@ public boolean useSingleConnection() {
8487

8588
private void createLockTable() {
8689
try {
87-
jdbcTemplate.execute(CREATE_LOCK_TABLE_DDL);
90+
List<String> columns = jdbcTemplate.query(LOCK_TABLE_SCHEMA_SQL, rs -> rs.getString("column_name"));
91+
if (columns.isEmpty()) {
92+
LOG.debug("Lock table not found, creating it...");
93+
jdbcTemplate.execute(CREATE_LOCK_TABLE_DDL);
94+
} else {
95+
for (String column : columns) {
96+
if ("lock_id".equals(column)) {
97+
LOG.debug("Lock table with expected schema already exists");
98+
return;
99+
}
100+
}
101+
LOG.info("Lock table exists but has old schema. Dropping and recreating it with new schema...");
102+
jdbcTemplate.execute(DROP_LOCK_TABLE_IF_EXISTS_DDL);
103+
jdbcTemplate.execute(CREATE_LOCK_TABLE_DDL);
104+
}
88105
} catch (SQLException e) {
89106
throw new FlywaySqlException("Unable to initialize the lock table", e);
90107
}

flyway-database-yugabytedb/src/main/java/org/flywaydb/community/database/postgresql/yugabytedb/YugabyteDBExecutionTemplate.java

+112-38
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import org.flywaydb.core.api.FlywayException;
55
import org.flywaydb.core.internal.exception.FlywaySqlException;
66
import org.flywaydb.core.internal.jdbc.JdbcTemplate;
7-
import org.flywaydb.core.internal.strategy.RetryStrategy;
87
import org.flywaydb.core.internal.util.FlywayDbWebsiteLinks;
8+
import org.flywaydb.core.internal.util.SqlCallable;
99

1010
import java.sql.*;
11+
import java.time.Instant;
1112
import java.util.Map;
13+
import java.util.Random;
1214
import java.util.concurrent.Callable;
1315
import java.util.concurrent.ConcurrentHashMap;
1416

@@ -18,6 +20,10 @@ public class YugabyteDBExecutionTemplate {
1820
private final JdbcTemplate jdbcTemplate;
1921
private final String tableName;
2022
private static final Map<String, Boolean> tableEntries = new ConcurrentHashMap<>();
23+
private static final Random random = new Random();
24+
public static final int DEFAULT_LOCK_ID_TTL = 1000 * 60 * 5;
25+
public static final int MAX_LOCK_ID_TTL = 1000 * 60 * 60;
26+
public static final String LOCK_ID_TTL_SYS_PROP_NAME = "flyway.yugabytedb.lock-id-ttl-ms";
2127

2228
YugabyteDBExecutionTemplate(JdbcTemplate jdbcTemplate, String tableName) {
2329
this.jdbcTemplate = jdbcTemplate;
@@ -26,8 +32,9 @@ public class YugabyteDBExecutionTemplate {
2632

2733
public <T> T execute(Callable<T> callable) {
2834
Exception error = null;
35+
long lockId = 0;
2936
try {
30-
lock();
37+
lockId = lock();
3138
return callable.call();
3239
} catch (RuntimeException e) {
3340
error = e;
@@ -36,31 +43,36 @@ public <T> T execute(Callable<T> callable) {
3643
error = e;
3744
throw new FlywayException(e);
3845
} finally {
39-
unlock(error);
46+
if (lockId != 0) {
47+
unlock(lockId, error);
48+
}
4049
}
4150
}
4251

43-
private void lock() throws SQLException {
44-
RetryStrategy strategy = new RetryStrategy();
45-
strategy.doWithRetries(this::tryLock, "Interrupted while attempting to acquire lock through SELECT ... FOR UPDATE",
52+
private long lock() throws SQLException {
53+
YBRetryStrategy strategy = new YBRetryStrategy();
54+
return strategy.doWithRetries(this::tryLock, "Interrupted while attempting to acquire lock through SELECT ... FOR UPDATE",
4655
"Number of retries exceeded while attempting to acquire lock through SELECT ... FOR UPDATE. " +
4756
"Configure the number of retries with the 'lockRetryCount' configuration option: " + FlywayDbWebsiteLinks.LOCK_RETRY_COUNT);
4857

4958
}
5059

51-
private boolean tryLock() {
60+
private long tryLock() {
5261
Exception exception = null;
53-
boolean txStarted = false, success = false;
62+
boolean txStarted = false;
63+
long lockIdToBeReturned = 0;
5464
Statement statement = null;
5565
try {
5666
statement = jdbcTemplate.getConnection().createStatement();
5767

5868
if (!tableEntries.containsKey(tableName)) {
5969
try {
70+
String now = new Timestamp(Instant.now().getEpochSecond()).toString();
6071
statement.executeUpdate("INSERT INTO "
6172
+ YugabyteDBDatabase.LOCK_TABLE_NAME
62-
+ " VALUES ('" + tableName + "', 'false')");
73+
+ " VALUES ('" + tableName + "', 0, '" + now + "')");
6374
tableEntries.put(tableName, true);
75+
LOG.info("insert query ts: " + now);
6476
LOG.info(Thread.currentThread().getName() + "> Inserted a token row for " + tableName + " in " + YugabyteDBDatabase.LOCK_TABLE_NAME);
6577
} catch (SQLException e) {
6678
if ("23505".equals(e.getSQLState())) {
@@ -72,38 +84,53 @@ private boolean tryLock() {
7284
}
7385
}
7486

75-
boolean locked;
76-
String selectForUpdate = "SELECT locked FROM "
87+
long lockIdRead = 0;
88+
String selectForUpdate = "SELECT lock_id, ts FROM "
7789
+ YugabyteDBDatabase.LOCK_TABLE_NAME
7890
+ " WHERE table_name = '"
7991
+ tableName
8092
+ "' FOR UPDATE";
81-
String updateLocked = "UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME
82-
+ " SET locked = true WHERE table_name = '"
83-
+ tableName + "'";
8493

8594
statement.execute("BEGIN");
8695
txStarted = true;
8796
ResultSet rs = statement.executeQuery(selectForUpdate);
8897
if (rs.next()) {
89-
locked = rs.getBoolean("locked");
98+
lockIdRead = rs.getLong("lock_id");
99+
Timestamp tsRead = rs.getTimestamp("ts");
100+
String current = new Timestamp(Instant.now().getEpochSecond()).toString();
101+
long lockIdTtl = DEFAULT_LOCK_ID_TTL;
102+
String sysProp = System.getProperty(LOCK_ID_TTL_SYS_PROP_NAME);
103+
if (sysProp != null) {
104+
try {
105+
lockIdTtl = Long.parseLong(sysProp);
106+
lockIdTtl = lockIdTtl < 0 || lockIdTtl > MAX_LOCK_ID_TTL ? DEFAULT_LOCK_ID_TTL : lockIdTtl;
107+
} catch (NumberFormatException e) {
108+
LOG.warn("Invalid value for " + LOCK_ID_TTL_SYS_PROP_NAME + ": " + sysProp + ". Using default value: " + DEFAULT_LOCK_ID_TTL + " ms");
109+
}
110+
}
90111

91-
if (locked) {
92-
statement.execute("COMMIT");
93-
txStarted = false;
94-
LOG.debug(Thread.currentThread().getName() + "> Another Flyway operation is in progress. Allowing it to complete");
112+
if (lockIdRead == 0 || Instant.now().getEpochSecond() - tsRead.getTime() > lockIdTtl) {
113+
lockIdToBeReturned = random.nextLong();
114+
if (lockIdRead == 0) {
115+
LOG.debug(Thread.currentThread().getName() + "> Setting lock_id = " + lockIdToBeReturned);
116+
} else {
117+
LOG.warn(Thread.currentThread().getName() + "> Lock with lock_id " + lockIdRead + " is held for more than " + lockIdTtl + " millis. Resetting it with lock_id " + lockIdToBeReturned);
118+
}
119+
String updateLockId = "UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME
120+
+ " SET lock_id = " + lockIdToBeReturned + ", ts = '" + current + "' WHERE table_name = '"
121+
+ tableName + "'";
122+
LOG.debug(Thread.currentThread().getName() + "> executing query " + updateLockId);
123+
statement.executeUpdate(updateLockId);
95124
} else {
96-
LOG.debug(Thread.currentThread().getName() + "> Setting locked = true");
97-
statement.executeUpdate(updateLocked);
98-
success = true;
125+
LOG.debug(Thread.currentThread().getName() + "> Another Flyway operation is in progress. Allowing it to complete");
99126
}
100127
} else {
101128
// For some reason the record was not found, retry
102129
tableEntries.remove(tableName);
103130
}
104131

105132
} catch (SQLException e) {
106-
LOG.warn(Thread.currentThread().getName() + "> Unable to perform lock action, SQLState: " + e.getSQLState());
133+
LOG.debug(Thread.currentThread().getName() + "> Unable to perform lock action, SQLState: " + e.getSQLState());
107134
if (!"40001".equalsIgnoreCase(e.getSQLState())) {
108135
exception = new FlywaySqlException("Unable to perform lock action", e);
109136
throw (FlywaySqlException) exception;
@@ -112,56 +139,103 @@ private boolean tryLock() {
112139
if (txStarted) {
113140
try {
114141
statement.execute("COMMIT");
115-
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = true");
142+
// lock_id may not be set if there is exception in select for update
143+
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to attempt to set lock_id");
116144
} catch (SQLException e) {
117145
if (exception == null) {
118-
throw new FlywaySqlException("Failed to commit the tx to set locked = true", e);
146+
throw new FlywaySqlException("Failed to commit the tx to set lock_id ", e);
119147
}
120-
LOG.warn(Thread.currentThread().getName() + "> Failed to commit the tx to set locked = true: " + e);
148+
LOG.warn(Thread.currentThread().getName() + "> Failed to commit the tx to set lock_id: " + e);
121149
}
122150
}
123151
}
124-
return success;
152+
return lockIdToBeReturned;
125153
}
126154

127-
private void unlock(Exception rethrow) {
155+
private void unlock(long prevLockId, Exception rethrow) {
128156
Statement statement = null;
129157
try {
130158
statement = jdbcTemplate.getConnection().createStatement();
131159
statement.execute("BEGIN");
132-
ResultSet rs = statement.executeQuery("SELECT locked FROM " + YugabyteDBDatabase.LOCK_TABLE_NAME + " WHERE table_name = '" + tableName + "' FOR UPDATE");
160+
ResultSet rs = statement.executeQuery("SELECT lock_id FROM " + YugabyteDBDatabase.LOCK_TABLE_NAME + " WHERE table_name = '" + tableName + "' FOR UPDATE");
133161

134162
if (rs.next()) {
135-
boolean locked = rs.getBoolean("locked");
136-
if (locked) {
137-
statement.executeUpdate("UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + " SET locked = false WHERE table_name = '" + tableName + "'");
163+
long lockId = rs.getLong("lock_id");
164+
if (lockId == prevLockId) {
165+
statement.executeUpdate("UPDATE " + YugabyteDBDatabase.LOCK_TABLE_NAME + " SET lock_id = 0 WHERE table_name = '" + tableName + "'");
138166
} else {
139167
// Unexpected. This may happen only when callable took too long to complete
140168
// and another thread forcefully reset it.
169+
String msgLock = "Expected and actual lock_id mismatch. Expected: " + prevLockId + ", Actual: " + lockId;
141170
String msg = "Unlock failed but the Flyway operation may have succeeded. Check your Flyway operation before re-trying";
142-
LOG.warn(Thread.currentThread().getName() + "> " + msg);
171+
LOG.warn(Thread.currentThread().getName() + "> " + msg + "\n" + msgLock);
143172
if (rethrow == null) {
144173
throw new FlywayException(msg);
145174
}
146175
}
147176
}
148177
} catch (SQLException e) {
149178
if (rethrow == null) {
150-
rethrow = new FlywayException("Unable to perform unlock action", e);
179+
rethrow = new FlywaySqlException("Unable to perform unlock action for lock_id " + prevLockId, e);
151180
throw (FlywaySqlException) rethrow;
152181
}
153-
LOG.warn("Unable to perform unlock action " + e);
182+
LOG.warn("Unable to perform unlock action for lock_id " + prevLockId + ": " + e);
154183
} finally {
155184
try {
156185
statement.execute("COMMIT");
157-
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to set locked = false");
186+
LOG.debug(Thread.currentThread().getName() + "> Completed the tx to reset lock_id " + prevLockId);
158187
} catch (SQLException e) {
159188
if (rethrow == null) {
160-
throw new FlywaySqlException("Failed to commit unlock action", e);
189+
throw new FlywaySqlException("Failed to commit unlock action for lock_id " + prevLockId, e);
161190
}
162-
LOG.warn("Failed to commit unlock action: " + e);
191+
LOG.warn("Failed to commit unlock action for lock_id " + prevLockId + ": " + e);
163192
}
164193
}
165194
}
166195

196+
public static class YBRetryStrategy {
197+
private static int numberOfRetries = 50;
198+
private static boolean unlimitedRetries;
199+
private int numberOfRetriesRemaining;
200+
201+
public YBRetryStrategy() {
202+
this.numberOfRetriesRemaining = numberOfRetries;
203+
}
204+
205+
public static void setNumberOfRetries(int retries) {
206+
numberOfRetries = retries;
207+
unlimitedRetries = retries < 0;
208+
}
209+
210+
private boolean hasMoreRetries() {
211+
return unlimitedRetries || this.numberOfRetriesRemaining > 0;
212+
}
213+
214+
private void nextRetry() {
215+
if (!unlimitedRetries) {
216+
--this.numberOfRetriesRemaining;
217+
}
218+
}
219+
220+
private int nextWaitInMilliseconds() {
221+
return 1000;
222+
}
223+
224+
public long doWithRetries(SqlCallable<Long> callable, String interruptionMessage, String retriesExceededMessage) throws SQLException {
225+
long id = 0;
226+
while(id == 0) {
227+
id = callable.call();
228+
try {
229+
Thread.sleep(this.nextWaitInMilliseconds());
230+
} catch (InterruptedException e) {
231+
throw new FlywayException(interruptionMessage, e);
232+
}
233+
if (!this.hasMoreRetries()) {
234+
throw new FlywayException(retriesExceededMessage);
235+
}
236+
this.nextRetry();
237+
}
238+
return id;
239+
}
240+
}
167241
}

0 commit comments

Comments
 (0)