Skip to content

Commit ed729cf

Browse files
committed
Add failed state to Tasks and prevent PT release to mask failure
1 parent d9fe468 commit ed729cf

File tree

9 files changed

+228
-3
lines changed

9 files changed

+228
-3
lines changed

core/src/main/java/org/neo4j/gds/core/utils/progress/tasks/ProgressTracker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ default void logProgress() {
5252

5353
void release();
5454

55+
void fail();
56+
5557
class EmptyProgressTracker implements ProgressTracker {
5658

5759
@Override
@@ -102,5 +104,10 @@ public ProgressLogger progressLogger() {
102104
@Override
103105
public void release() {
104106
}
107+
108+
@Override
109+
public void fail() {
110+
111+
}
105112
}
106113
}

core/src/main/java/org/neo4j/gds/core/utils/progress/tasks/Status.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ public enum Status {
2323
PENDING,
2424
RUNNING,
2525
FINISHED,
26-
CANCELED
26+
CANCELED,
27+
FAILED
2728
}

core/src/main/java/org/neo4j/gds/core/utils/progress/tasks/Task.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,10 @@ public void setEstimatedMemoryRangeInBytes(MemoryRange memoryRangeInBytes) {
178178
this.estimatedMemoryRangeInBytes = memoryRangeInBytes;
179179
}
180180

181+
public void fail() {
182+
this.status = Status.FAILED;
183+
}
184+
181185
protected Task nextSubTaskAfterValidation() {
182186
if (subTasks.stream().anyMatch(t -> t.status == Status.RUNNING)) {
183187
throw new IllegalStateException("Cannot move to next subtask, because some subtasks are still running");

core/src/main/java/org/neo4j/gds/core/utils/progress/tasks/TaskProgressTracker.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,12 @@ public ProgressLogger progressLogger() {
127127
@Override
128128
public void release() {
129129
taskRegistry.unregisterTask();
130-
validateTaskFinishedOrCanceled();
130+
validateTaskNotRunning();
131+
}
132+
133+
@Override
134+
public void fail() {
135+
baseTask.fail();
131136
}
132137

133138
@TestOnly
@@ -150,7 +155,7 @@ private Task requireCurrentTask() {
150155
return currentTask.orElseThrow(() -> new IllegalStateException("No more running tasks"));
151156
}
152157

153-
private void validateTaskFinishedOrCanceled() {
158+
private void validateTaskNotRunning() {
154159
if (baseTask.status() == Status.RUNNING) {
155160
var message = formatWithLocale(
156161
"Attempted to release algorithm, but task %s is still running",
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.testproc;
21+
22+
import org.neo4j.gds.Algorithm;
23+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
24+
25+
class FailingAlgorithm extends Algorithm<FailingAlgorithm, ProcedureThatFailsDuringTask.Output> {
26+
FailingAlgorithm(ProgressTracker progressTracker) {
27+
super();
28+
this.progressTracker = progressTracker;
29+
}
30+
31+
@Override
32+
public ProcedureThatFailsDuringTask.Output compute() {
33+
progressTracker.beginSubTask();
34+
throw new IllegalStateException("Oops");
35+
}
36+
37+
@Override
38+
public FailingAlgorithm me() {
39+
return this;
40+
}
41+
42+
@Override
43+
public void release() {
44+
45+
}
46+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.testproc;
21+
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
import org.neo4j.gds.BaseProcTest;
25+
import org.neo4j.gds.extension.Neo4jGraph;
26+
27+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
28+
29+
public class ProcedureFailTest extends BaseProcTest {
30+
31+
@Neo4jGraph
32+
public static final String DB_CYPHER = "CREATE (a:N)";
33+
34+
@BeforeEach
35+
void setup() throws Exception {
36+
registerProcedures(ProcedureThatFailsDuringTask.class);
37+
}
38+
39+
@Test
40+
void shouldFailWithIllegalStateException() {
41+
assertThatThrownBy(() -> runQuery("CALL very.strange.procedure({nodeProjection: '*', relationshipProjection: '*'})"))
42+
.getRootCause()
43+
.isInstanceOf(IllegalStateException.class)
44+
.hasMessageContaining("Oops");
45+
}
46+
47+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.testproc;
21+
22+
import org.neo4j.gds.AlgoBaseProc;
23+
import org.neo4j.gds.AlgorithmFactory;
24+
import org.neo4j.gds.api.Graph;
25+
import org.neo4j.gds.config.GraphCreateConfig;
26+
import org.neo4j.gds.core.CypherMapWrapper;
27+
import org.neo4j.gds.core.utils.mem.AllocationTracker;
28+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
29+
import org.neo4j.gds.test.config.DummyConfig;
30+
import org.neo4j.gds.test.config.DummyConfigImpl;
31+
import org.neo4j.procedure.Mode;
32+
import org.neo4j.procedure.Name;
33+
import org.neo4j.procedure.Procedure;
34+
35+
import java.util.Map;
36+
import java.util.Optional;
37+
import java.util.stream.Stream;
38+
39+
public class ProcedureThatFailsDuringTask extends AlgoBaseProc<FailingAlgorithm, ProcedureThatFailsDuringTask.Output, DummyConfig> {
40+
@Procedure(name = "very.strange.procedure", mode = Mode.READ)
41+
public Stream<Output> run(
42+
@Name(value = "graphName") Object graphNameOrConfig,
43+
@Name(value = "configuration", defaultValue = "{}") Map<String, Object> configuration
44+
) {
45+
var result = compute(graphNameOrConfig, configuration);
46+
assert result.result() == null;
47+
return Stream.empty();
48+
}
49+
50+
@Override
51+
protected DummyConfig newConfig(
52+
String username,
53+
Optional<String> graphName,
54+
Optional<GraphCreateConfig> maybeImplicitCreate,
55+
CypherMapWrapper config
56+
) {
57+
return new DummyConfigImpl(graphName, maybeImplicitCreate, username, config);
58+
}
59+
60+
@Override
61+
protected AlgorithmFactory<FailingAlgorithm, DummyConfig> algorithmFactory() {
62+
return new AlgorithmFactory<FailingAlgorithm, DummyConfig>() {
63+
@Override
64+
protected String taskName() {
65+
return "Failing Algorithm";
66+
}
67+
68+
@Override
69+
protected FailingAlgorithm build(
70+
Graph graph,
71+
DummyConfig configuration,
72+
AllocationTracker allocationTracker,
73+
ProgressTracker progressTracker
74+
) {
75+
return new FailingAlgorithm(progressTracker);
76+
}
77+
};
78+
}
79+
80+
public class Output {
81+
public Object out;
82+
}
83+
}

proc/common/src/main/java/org/neo4j/gds/AlgoBaseProc.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,9 @@ protected ComputationResult<ALGO, ALGO_RESULT, CONFIG> compute(
232232
() -> {
233233
try (ProgressTimer ignored = ProgressTimer.start(builder::computeMillis)) {
234234
return algo.compute();
235+
} catch (Exception e) {
236+
algo.progressTracker.fail();
237+
throw e;
235238
} finally {
236239
if (releaseAlgorithm) {
237240
algo.progressTracker.release();
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.test.config;
21+
22+
import org.neo4j.gds.annotation.Configuration;
23+
import org.neo4j.gds.annotation.ValueClass;
24+
import org.neo4j.gds.config.AlgoBaseConfig;
25+
26+
@Configuration
27+
@ValueClass
28+
public interface DummyConfig extends AlgoBaseConfig {
29+
}

0 commit comments

Comments
 (0)