-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathnodeprocess.nim
175 lines (141 loc) · 4.75 KB
/
nodeprocess.nim
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import pkg/questionable
import pkg/questionable/results
import pkg/confutils
import pkg/chronicles
import pkg/chronos/asyncproc
import pkg/libp2p
import std/os
import std/strutils
import codex/conf
import codex/utils/exceptions
import codex/utils/trackedfutures
import ./codexclient
export codexclient
export chronicles
logScope:
topics = "integration testing node process"
type
NodeProcess* = ref object of RootObj
process*: AsyncProcessRef
arguments*: seq[string]
debug: bool
trackedFutures*: TrackedFutures
name*: string
NodeProcessError* = object of CatchableError
method workingDir(node: NodeProcess): string {.base, gcsafe.} =
raiseAssert "not implemented"
method executable(node: NodeProcess): string {.base, gcsafe.} =
raiseAssert "not implemented"
method startedOutput(node: NodeProcess): string {.base, gcsafe.} =
raiseAssert "not implemented"
method processOptions(node: NodeProcess): set[AsyncProcessOption] {.base, gcsafe.} =
raiseAssert "not implemented"
method outputLineEndings(node: NodeProcess): string {.base, gcsafe.} =
raiseAssert "not implemented"
method onOutputLineCaptured(node: NodeProcess, line: string) {.base, gcsafe.} =
raiseAssert "not implemented"
method start*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
let poptions = node.processOptions + {AsyncProcessOption.StdErrToStdOut}
trace "starting node",
args = node.arguments,
executable = node.executable,
workingDir = node.workingDir,
processOptions = poptions
try:
if node.debug:
echo "starting codex node with args: ", node.arguments.join(" ")
node.process = await startProcess(
node.executable,
node.workingDir,
node.arguments,
options = poptions,
stdoutHandle = AsyncProcess.Pipe
)
except CancelledError as error:
raise error
except CatchableError as e:
error "failed to start node process", error = e.msg
proc captureOutput(
node: NodeProcess,
output: string,
started: Future[void]
) {.async.} =
logScope:
nodeName = node.name
trace "waiting for output", output
let stream = node.process.stdoutStream
try:
while node.process.running.option == some true:
while(let line = await stream.readLine(0, node.outputLineEndings); line != ""):
if node.debug:
# would be nice if chronicles could parse and display with colors
echo line
if not started.isNil and not started.finished and line.contains(output):
started.complete()
node.onOutputLineCaptured(line)
await sleepAsync(1.millis)
await sleepAsync(1.millis)
except AsyncStreamReadError as e:
error "error reading output stream", error = e.msgDetail
proc startNode*[T: NodeProcess](
_: type T,
args: seq[string],
debug: string | bool = false,
name: string
): Future[T] {.async.} =
## Starts a Codex Node with the specified arguments.
## Set debug to 'true' to see output of the node.
let node = T(
arguments: @args,
debug: ($debug != "false"),
trackedFutures: TrackedFutures.new(),
name: name
)
await node.start()
return node
method stop*(node: NodeProcess) {.base, async.} =
logScope:
nodeName = node.name
await node.trackedFutures.cancelTracked()
if node.process != nil:
try:
trace "terminating node process..."
if errCode =? node.process.terminate().errorOption:
error "failed to terminate process", errCode = $errCode
trace "waiting for node process to exit"
let exitCode = await node.process.waitForExit(3.seconds)
if exitCode > 0:
error "failed to exit process, check for zombies", exitCode
trace "closing node process' streams"
await node.process.closeWait()
except CancelledError as error:
raise error
except CatchableError as e:
error "error stopping node process", error = e.msg
finally:
node.process = nil
trace "node stopped"
proc waitUntilStarted*(node: NodeProcess) {.async.} =
logScope:
nodeName = node.name
trace "waiting until node started"
let started = newFuture[void]()
try:
discard node.captureOutput(node.startedOutput, started).track(node)
await started.wait(35.seconds) # allow enough time for proof generation
except AsyncTimeoutError:
# attempt graceful shutdown in case node was partially started, prevent
# zombies
await node.stop()
# raise error here so that all nodes (not just this one) can be
# shutdown gracefully
raise newException(NodeProcessError, "node did not output '" &
node.startedOutput & "'")
proc restart*(node: NodeProcess) {.async.} =
await node.stop()
await node.start()
await node.waitUntilStarted()
method removeDataDir*(node: NodeProcess) {.base.} =
raiseAssert "[removeDataDir] not implemented"