Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions server/src/main/java/invite/cron/AbstractNodeLeader.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public abstract class AbstractNodeLeader {

Expand All @@ -32,6 +33,8 @@ public void perform(String name, Executable executable) {

if (!lockAcquired) {
LOG.info(String.format("Another node is running %s, skipping this one", name));
//Might be that there is a lock not cleaned up due to VM crash
this.cleanupStaleLocks(conn, 60);
return;
}

Expand All @@ -55,30 +58,44 @@ public void perform(String name, Executable executable) {
conn.close();
} catch (Exception ignored) {
//Can't do anything about this
LOG.warn(String.format("Failed to close lock %s", name));
}
}
}
}

protected boolean tryGetLock(Connection conn, String name) throws Exception {
try (PreparedStatement ps = conn.prepareStatement("SELECT GET_LOCK(?, ?)")) {
ps.setString(1, name);
//Use timeout of 0 to have an immediate result
ps.setInt(2, 0);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
int result = rs.getInt(1);
return !rs.wasNull() && result == 1;
}
try {
conn.setAutoCommit(false);
try (PreparedStatement ps = conn.prepareStatement(
"INSERT INTO distributed_locks (lock_name, acquired_at) VALUES (?, NOW())")) {
ps.setString(1, name);
ps.executeUpdate();
conn.commit();
return true;
} catch (SQLException e) {
conn.rollback();
// Duplicate key or other constraint violation means lock is held
return false;
}
} finally {
conn.setAutoCommit(true);
}
}

private void releaseLock(Connection conn, String name) throws Exception {
try (PreparedStatement ps = conn.prepareStatement("SELECT RELEASE_LOCK(?)")) {
try (PreparedStatement ps = conn.prepareStatement(
"DELETE FROM distributed_locks WHERE lock_name = ?")) {
ps.setString(1, name);
ps.executeQuery(); // ignore result
ps.executeUpdate();
}
}

private void cleanupStaleLocks(Connection conn, int timeoutMinutes) throws Exception {
try (PreparedStatement ps = conn.prepareStatement(
"DELETE FROM distributed_locks WHERE acquired_at < NOW() - INTERVAL ? MINUTE")) {
ps.setInt(1, timeoutMinutes);
ps.executeUpdate();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE distributed_locks (
lock_name VARCHAR(255) PRIMARY KEY,
acquired_at TIMESTAMP
) ENGINE=InnoDB
DEFAULT CHARSET = utf8mb4;
26 changes: 8 additions & 18 deletions server/src/test/java/invite/cron/ResourceCleanerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.Instant;
import java.time.Period;
import java.util.List;
Expand Down Expand Up @@ -73,24 +72,15 @@ void cleanUserRoles() throws JsonProcessingException {
@SneakyThrows
@Test
void lockAlreadyAcquired() {
Connection conn = null;
try {
conn = dataSource.getConnection();
subject.tryGetLock(conn, LOCK_NAME);

long beforeUsers = userRepository.count();
markUser(GUEST_SUB);

subject.clean();
//Nothing happened
assertEquals(beforeUsers, userRepository.count());
} finally {
try (PreparedStatement ps = conn.prepareStatement("SELECT RELEASE_LOCK(?)")) {
ps.setString(1, LOCK_NAME);
ps.executeQuery(); // ignore result
}
}
Connection conn = dataSource.getConnection();
subject.tryGetLock(conn, LOCK_NAME);

long beforeUsers = userRepository.count();
markUser(GUEST_SUB);

subject.clean();
//Nothing happened
assertEquals(beforeUsers, userRepository.count());
}

private void markUser(String sub) {
Expand Down
Loading