Skip to content

Commit fd688a2

Browse files
authored
Lock TestWorkflowStoreImpl when listing workflows (#890)
This is the only path to the backing histories map that isn't locked. When running a test containing multiple child workflows in parallel where one child workflow attempts to call listWorkflows I'm able to consistently reproduce a concurrent modification exception. Despite being a change to locking, this change is rather low risk. The lock is reentrant and there are no other locks acquired while holding this lock, so there's no chance of deadlock. In addition, this codepath isn't use internally at all. It exists only to implement ListOpenWorkflowExecutions and ListClosedWorkflowExecutions on the client, allowing test code to make these RPCs. This is likely why it hasn't been caught before.
1 parent 801d9b4 commit fd688a2

File tree

1 file changed

+58
-43
lines changed

1 file changed

+58
-43
lines changed

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowStoreImpl.java

+58-43
Original file line numberDiff line numberDiff line change
@@ -407,51 +407,66 @@ public void getDiagnostics(StringBuilder result) {
407407
@Override
408408
public List<WorkflowExecutionInfo> listWorkflows(
409409
WorkflowState state, Optional<String> filterWorkflowId) {
410-
List<WorkflowExecutionInfo> result = new ArrayList<>();
411-
for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
412-
if (state == WorkflowState.OPEN) {
413-
if (entry.getValue().isCompleted()) {
414-
continue;
415-
}
416-
ExecutionId executionId = entry.getKey();
417-
String workflowId = executionId.getWorkflowId().getWorkflowId();
418-
if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
419-
continue;
420-
}
421-
List<HistoryEvent> history = entry.getValue().getHistory();
422-
WorkflowExecutionInfo info =
423-
new WorkflowExecutionInfo()
424-
.setExecution(executionId.getExecution())
425-
.setHistoryLength(history.size())
426-
.setStartTime(history.get(0).getTimestamp())
427-
.setIsCron(
428-
history.get(0).getWorkflowExecutionStartedEventAttributes().isSetCronSchedule())
429-
.setType(
430-
history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType());
431-
result.add(info);
432-
} else {
433-
if (!entry.getValue().isCompleted()) {
434-
continue;
435-
}
436-
ExecutionId executionId = entry.getKey();
437-
String workflowId = executionId.getWorkflowId().getWorkflowId();
438-
if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
439-
continue;
410+
lock.lock();
411+
List<WorkflowExecutionInfo> result;
412+
try {
413+
result = new ArrayList<>();
414+
for (Entry<ExecutionId, HistoryStore> entry : this.histories.entrySet()) {
415+
if (state == WorkflowState.OPEN) {
416+
if (entry.getValue().isCompleted()) {
417+
continue;
418+
}
419+
ExecutionId executionId = entry.getKey();
420+
String workflowId = executionId.getWorkflowId().getWorkflowId();
421+
if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
422+
continue;
423+
}
424+
List<HistoryEvent> history = entry.getValue().getHistory();
425+
WorkflowExecutionInfo info =
426+
new WorkflowExecutionInfo()
427+
.setExecution(executionId.getExecution())
428+
.setHistoryLength(history.size())
429+
.setStartTime(history.get(0).getTimestamp())
430+
.setIsCron(
431+
history
432+
.get(0)
433+
.getWorkflowExecutionStartedEventAttributes()
434+
.isSetCronSchedule())
435+
.setType(
436+
history
437+
.get(0)
438+
.getWorkflowExecutionStartedEventAttributes()
439+
.getWorkflowType());
440+
result.add(info);
441+
} else {
442+
if (!entry.getValue().isCompleted()) {
443+
continue;
444+
}
445+
ExecutionId executionId = entry.getKey();
446+
String workflowId = executionId.getWorkflowId().getWorkflowId();
447+
if (filterWorkflowId.isPresent() && !workflowId.equals(filterWorkflowId.get())) {
448+
continue;
449+
}
450+
List<HistoryEvent> history = entry.getValue().getHistory();
451+
WorkflowExecutionInfo info =
452+
new WorkflowExecutionInfo()
453+
.setExecution(executionId.getExecution())
454+
.setHistoryLength(history.size())
455+
.setStartTime(history.get(0).getTimestamp())
456+
.setIsCron(
457+
history
458+
.get(0)
459+
.getWorkflowExecutionStartedEventAttributes()
460+
.isSetCronSchedule())
461+
.setType(
462+
history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType())
463+
.setCloseStatus(
464+
WorkflowExecutionUtils.getCloseStatus(history.get(history.size() - 1)));
465+
result.add(info);
440466
}
441-
List<HistoryEvent> history = entry.getValue().getHistory();
442-
WorkflowExecutionInfo info =
443-
new WorkflowExecutionInfo()
444-
.setExecution(executionId.getExecution())
445-
.setHistoryLength(history.size())
446-
.setStartTime(history.get(0).getTimestamp())
447-
.setIsCron(
448-
history.get(0).getWorkflowExecutionStartedEventAttributes().isSetCronSchedule())
449-
.setType(
450-
history.get(0).getWorkflowExecutionStartedEventAttributes().getWorkflowType())
451-
.setCloseStatus(
452-
WorkflowExecutionUtils.getCloseStatus(history.get(history.size() - 1)));
453-
result.add(info);
454467
}
468+
} finally {
469+
lock.unlock();
455470
}
456471
return result;
457472
}

0 commit comments

Comments
 (0)