Skip to content

Commit 75b70c3

Browse files
jyothsnakonisayifan-c
authored andcommitted
Cassandra diff should accept a provided job_id for retrying diffs
Patch by Jyothsna Konisa; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRA-16968
1 parent e3161b6 commit 75b70c3

File tree

4 files changed

+89
-10
lines changed

4 files changed

+89
-10
lines changed

spark-job/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,5 +67,12 @@
6767
<artifactId>junit</artifactId>
6868
</dependency>
6969

70+
<dependency>
71+
<groupId>org.mockito</groupId>
72+
<artifactId>mockito-core</artifactId>
73+
<version>3.5.10</version>
74+
<scope>test</scope>
75+
</dependency>
76+
7077
</dependencies>
7178
</project>

spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,12 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
122122
ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
123123
JobMetadataDb.JobLifeCycle job = null;
124124
UUID jobId = null;
125-
try (Cluster metadataCluster = metadataProvider.getCluster();
126-
Session metadataSession = metadataCluster.connect()) {
125+
Cluster metadataCluster = null;
126+
Session metadataSession = null;
127127

128+
try {
129+
metadataCluster = metadataProvider.getCluster();
130+
metadataSession = metadataCluster.connect();
128131
RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions());
129132
MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
130133
JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider);
@@ -197,18 +200,32 @@ public void run(JobConfiguration configuration, JavaSparkContext sc) {
197200
Differ.shutdown();
198201
JobMetadataDb.ProgressTracker.resetStatements();
199202
}
203+
if (metadataCluster != null) {
204+
metadataCluster.close();
205+
}
206+
if (metadataSession != null) {
207+
metadataSession.close();
208+
}
209+
200210
}
201211
}
202212

203-
private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
213+
@VisibleForTesting
214+
static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) {
204215
if (conf.jobId().isPresent()) {
205-
return job.getJobParams(conf.jobId().get());
206-
} else {
207-
return new Params(UUID.randomUUID(),
208-
keyspaceTables,
209-
conf.buckets(),
210-
conf.splits());
216+
final Params jobParams = job.getJobParams(conf.jobId().get());
217+
if(jobParams != null) {
218+
// When job_id is passed as a config property for the first time, we will not have metadata associated
219+
// with job_id in metadata table. we should return jobParams from the table only when jobParams is not null
220+
// Otherwise return new jobParams with provided job_id
221+
return jobParams;
222+
}
211223
}
224+
final UUID jobId = conf.jobId().isPresent() ? conf.jobId().get() : UUID.randomUUID();
225+
return new Params(jobId,
226+
keyspaceTables,
227+
conf.buckets(),
228+
conf.splits());
212229
}
213230

214231
private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) {

spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ public void initializeJob(DiffJob.Params params,
369369
metadataKeyspace, Schema.RUNNING_JOBS),
370370
params.jobId);
371371
if (!rs.one().getBool("[applied]")) {
372-
logger.info("Aborting due to inability to mark job as running. " +
372+
logger.info("Could not mark job as running. " +
373373
"Did a previous run of job id {} fail non-gracefully?",
374374
params.jobId);
375375
throw new RuntimeException("Unable to mark job running, aborting");

spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,18 @@
2020
package org.apache.cassandra.diff;
2121

2222
import java.math.BigInteger;
23+
import java.util.ArrayList;
2324
import java.util.List;
25+
import java.util.Optional;
26+
import java.util.UUID;
2427

2528
import org.junit.Test;
2629

2730
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertNotNull;
32+
import static org.mockito.ArgumentMatchers.any;
33+
import static org.mockito.Mockito.mock;
34+
import static org.mockito.Mockito.when;
2835

2936
public class DiffJobTest
3037
{
@@ -39,6 +46,37 @@ public void testSplitsRandom()
3946
splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner"));
4047
}
4148

49+
@Test
50+
public void testGetJobParamsWithJobIdProvidedShouldReturnNonNullConFigParams() {
51+
final MockConfig mockConfig = new MockConfig();
52+
final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
53+
final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
54+
final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
55+
assertNotNull(params);
56+
}
57+
58+
@Test
59+
public void testGetJobParamsDuringRetryShouldReturnPreviousParams() {
60+
final MockConfig mockConfig = new MockConfig();
61+
final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
62+
final DiffJob.Params mockParams = mock(DiffJob.Params.class);
63+
when(mockJob.getJobParams(any())).thenAnswer(invocationOnMock -> mockParams);
64+
final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
65+
final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
66+
assertEquals(params, mockParams);
67+
}
68+
69+
@Test
70+
public void testGetJobParamsWithNoJobId() {
71+
final MockConfig mockConfig = mock(MockConfig.class);
72+
when(mockConfig.jobId()).thenReturn(Optional.empty());
73+
74+
final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class);
75+
final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>();
76+
final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs);
77+
assertNotNull(params.jobId);
78+
}
79+
4280
private void splitTestHelper(TokenHelper tokens)
4381
{
4482
List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens);
@@ -54,4 +92,21 @@ private void splitTestHelper(TokenHelper tokens)
5492
for (int i = 0; i < splits.size(); i++)
5593
assertEquals(i, splits.get(i).splitNumber);
5694
}
95+
96+
private class MockConfig extends AbstractMockJobConfiguration {
97+
@Override
98+
public int splits() {
99+
return 2;
100+
}
101+
102+
@Override
103+
public int buckets() {
104+
return 2;
105+
}
106+
107+
@Override
108+
public Optional<UUID> jobId() {
109+
return Optional.of(UUID.randomUUID());
110+
}
111+
}
57112
}

0 commit comments

Comments
 (0)