Skip to content

Commit

Permalink
Merge pull request #2168 from actonlang/acton-chase-true-test
Browse files Browse the repository at this point in the history
Improve error message on list test
  • Loading branch information
plajjan authored Feb 13, 2025
2 parents eaa7fcd + 45ffe10 commit 73a7de8
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 125 deletions.
52 changes: 38 additions & 14 deletions base/src/process.act
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class ProcessCap():

actor Process(cap: ProcessCap,
cmd: list[str],
on_stdout: action(Process, bytes) -> None,
on_stderr: action(Process, bytes) -> None,
on_stdout: action(Process, ?bytes) -> None,
on_stderr: action(Process, ?bytes) -> None,
on_exit: action(Process, int, int) -> None,
on_error: action(Process, str) -> None,
workdir: ?str=None,
Expand Down Expand Up @@ -104,31 +104,55 @@ actor RunProcess(cap: ProcessCap,
"""
var out_buf = b""
var err_buf = b""

def _on_stdout(p: Process, data: bytes):
out_buf += data

def _on_stderr(p: Process, data: bytes):
err_buf += data
var _out_done = False
var _err_done = False
var _exited = False
var _exit_code = 0
var _signal = 0
var _process: ?Process = None


def _on_stdout(p: Process, data: ?bytes):
if data is not None:
out_buf += data
else:
_out_done = True
_check_done()

def _on_stderr(p: Process, data: ?bytes):
if data is not None:
err_buf += data
else:
_err_done = True
_check_done()

def _on_exit(p: Process, exit_code: int, signal: int):
on_exit(p, exit_code, signal, out_buf, err_buf)
_exited = True
_exit_code = exit_code
_signal = signal
_check_done()

def _check_done():
if _process is not None:
if _out_done and _err_done and _exited:
on_exit(_process, _exit_code, _signal, out_buf, err_buf)

p = Process(cap, cmd, _on_stdout, _on_stderr, _on_exit, on_error, workdir, env, timeout)
_p = Process(cap, cmd, _on_stdout, _on_stderr, _on_exit, on_error, workdir, env, timeout)
_process = _p

def signal(sig: int):
"""Send signal to process"""
p.signal(sig)
_p.signal(sig)

def kill():
"""Abrubtly kill process by sending SIGKILL
"""
p.kill()
_p.kill()

def terminate():
"""Stop process by sending SIGTERM
"""
p.terminate()
_p.terminate()

def stop():
"""Stop process
Expand All @@ -137,4 +161,4 @@ actor RunProcess(cap: ProcessCap,
Unix system. After kill_after seconds (defaults to 1), SIGKILL is sent
to ensure the process is stopped.
"""
p.stop()
_p.stop()
16 changes: 10 additions & 6 deletions base/src/process.ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,40 @@ void exit_handler(uv_process_t *req, int64_t exit_status, int term_signal) {
}

void read_stderr(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
struct process_data *process_data = (struct process_data *)stream->data;
processQ_Process self = process_data->process;
$action2 f = ($action2)process_data->on_stderr;

if (nread < 0) {
if (nread == UV_EOF) {
uv_close((uv_handle_t *)stream, NULL);
f->$class->__asyn__(f, self, B_None);
} else {
// Log and handle read error
log_warn("Error reading from stderr");
}
} else if (nread > 0) {
if (stream->data) {
struct process_data *process_data = (struct process_data *)stream->data;
processQ_Process self = process_data->process;
$action2 f = ($action2)process_data->on_stderr;
f->$class->__asyn__(f, self, to$bytesD_len(buf->base, nread));
}
}
}

void read_stdout(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
struct process_data *process_data = (struct process_data *)stream->data;
processQ_Process self = process_data->process;
$action2 f = ($action2)process_data->on_stdout;

if (nread < 0) {
if (nread == UV_EOF) {
uv_close((uv_handle_t *)stream, NULL);
f->$class->__asyn__(f, self, B_None);
} else {
// Log and handle read error
log_warn("Error reading from stdout");
}
} else if (nread > 0) {
if (stream->data) {
struct process_data *process_data = (struct process_data *)stream->data;
processQ_Process self = process_data->process;
$action2 f = ($action2)process_data->on_stdout;
f->$class->__asyn__(f, self, to$bytesD_len(buf->base, nread));
}
}
Expand Down
75 changes: 42 additions & 33 deletions cli/src/acton.act
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,24 @@ actor CompilerRunner(process_cap, env, args, wdir=None, on_exit: ?action(int, in
"""
var std_out_buf = b""
var std_err_buf = b""
var std_out_done = False
var std_err_done = False

def on_actonc_std_err(p, data):
std_err_buf += data
if print_output:
print(data.decode(), end="")
def on_actonc_std_err(p, data: ?bytes):
if data is not None:
std_err_buf += data
if print_output:
print(data.decode(), end="")
else:
std_err_done = True

def on_actonc_std_out(p, data):
std_out_buf += data
if print_output:
print(data.decode(), end="")
def on_actonc_std_out(p, data: ?bytes):
if data is not None:
std_out_buf += data
if print_output:
print(data.decode(), end="")
else:
std_out_done = True

def on_actonc_exit(p, exit_code, term_signal):
if on_exit is not None:
Expand Down Expand Up @@ -427,28 +435,30 @@ actor RunModuleTest(process_cap, modname, test_cmd, on_json_output, on_test_erro
var std_err_buf = b""
var captured_std_err = b""

def on_std_out(p, data: bytes):
captured_std_out += data
def on_std_out(p, data: ?bytes):
if data is not None:
captured_std_out += data

def on_std_err(p, data):
def on_std_err(p, data: ?bytes):
# TODO: we shouldn't sent test data in-band on std_err - should be separate pipe
std_err_buf += data
lines = std_err_buf.splitlines(True)
std_err_buf = b""
for i, line in enumerate(lines):
if not line.endswith(b"\n"):
std_err_buf = b"".join(lines[i:])
break

# We have a complete line, check if it is valid JSON
try:
upd = json.decode(line.decode())
# We have a complete JSON object, pass it to the callback
on_json_output(self, upd, captured_std_out, captured_std_err)
except ValueError:
# Not a complete JSON object, append to captured_std_err
if line != b"\n":
captured_std_err += line
if data is not None:
std_err_buf += data
lines = std_err_buf.splitlines(True)
std_err_buf = b""
for i, line in enumerate(lines):
if not line.endswith(b"\n"):
std_err_buf = b"".join(lines[i:])
break

# We have a complete line, check if it is valid JSON
try:
upd = json.decode(line.decode())
# We have a complete JSON object, pass it to the callback
on_json_output(self, upd, captured_std_out, captured_std_err)
except ValueError:
# Not a complete JSON object, append to captured_std_err
if line != b"\n":
captured_std_err += line

def on_exit(p, exit_code, term_signal):
if exit_code != 0 or term_signal != 0:
Expand Down Expand Up @@ -535,7 +545,7 @@ actor ListTests(process_cap, test_modules: list[str], on_done: action(?str, ?dic
else:
on_done("Error listing tests for module %s: no 'tests' key in JSON" % module_name)
except ValueError as e:
on_done("Error parsing JSON from module %s: %s" % (module_name, str(e)))
on_done("Error parsing JSON from module %s: %s input: %s" % (module_name, str(e), std_err_buf.decode()))
else:
on_done("Error listing tests for module %s, exited with code %d and term signal %d: %s" % (module_name, exit_code, term_signal, std_err_buf.decode()))

Expand Down Expand Up @@ -683,13 +693,12 @@ actor CmdTest(env: Env, args, perf_mode: bool=False):

ListTests(process_cap, list_modules, _on_list_output)

def _on_list_output(err, tests: dict[str, dict[str, testing.TestInfo]]={}):
if err is not None:
print(err, err=True)
def _on_list_output(error, tests: dict[str, dict[str, testing.TestInfo]]={}):
if error is not None:
print("Error when listing tests", error, err=True)
env.exit(1)
return


expected_modules = set()
name_filter = set(args.get_strlist("name"))
for module_name in sorted(tests.keys()):
Expand Down
6 changes: 4 additions & 2 deletions test/rts_db/test_db_app.act
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ actor Tester(env, verbose, port_chunk):
combined_output += psp[paid] + " " + prefix + ": " + line + "\n"

def on_stderr(p, data):
log_process_output(p, "ERR", data)
if data is not None:
log_process_output(p, "ERR", data)

def on_stdout(p, data):
log_process_output(p, "OUT", data)
if data is not None:
log_process_output(p, "OUT", data)


def db_on_exit(p, exit_code, term_signal):
Expand Down
26 changes: 15 additions & 11 deletions test/rts_db/test_tcp_client.act
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,18 @@ actor Tester(env, verbose, port_chunk):

report()

def log_process_output(p, prefix, data):
def log_process_output(p, prefix, data: bytes):
paid = p.aid()
for line in data.decode().splitlines(False):
combined_output += psp[paid] + " " + prefix + ": " + line + "\n"

def on_stderr(p, data):
log_process_output(p, "ERR", data)
if data is not None:
log_process_output(p, "ERR", data)

def on_stdout(p, data):
log_process_output(p, "OUT", data)
if data is not None:
log_process_output(p, "OUT", data)

def db_on_exit(p, exit_code, term_signal):
log("DB Process exited with code: %d terminated with signal: %d" % (exit_code, term_signal))
Expand Down Expand Up @@ -158,7 +160,8 @@ actor Tester(env, verbose, port_chunk):

# TCP server
def app_server_on_stdout(p, data):
log_process_output(p, "OUT: state " + str(state), data)
if data is not None:
log_process_output(p, "OUT: state " + str(state), data)

def app_server_on_exit(p, exit_code, term_signal):
log("Server app exited in state %d with exit code %d and termination signal %d" % (state, exit_code, term_signal))
Expand Down Expand Up @@ -206,13 +209,14 @@ actor Tester(env, verbose, port_chunk):

def app_client_on_stdout(p, data):
on_stdout(p, data)
if state == 0:
if data == b"Got a 0, doing noooothing...\n":
state = 1
log("YAY, got expected data, terminating client, current p_alive: " + str(p_alive))
p.terminate()
else:
error("Got unexpected data from client over stdout in state %d: %s" % (state, str(data)))
if data is not None:
if state == 0:
if data == b"Got a 0, doing noooothing...\n":
state = 1
log("YAY, got expected data, terminating client, current p_alive: " + str(p_alive))
p.terminate()
else:
error("Got unexpected data from client over stdout in state %d: %s" % (state, str(data)))


def app_client_on_exit(p, exit_code, term_signal):
Expand Down
37 changes: 20 additions & 17 deletions test/rts_db/test_tcp_server.act
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ actor Tester(env, verbose, port_chunk, app_wthreads=4):
combined_output += psp[paid] + " " + prefix + ": " + line + "\n"

def on_stderr(p, data):
log_process_output(p, "ERR", data)
if data is not None:
log_process_output(p, "ERR", data)

def on_stdout(p, data):
log_process_output(p, "OUT", data)
if data is not None:
log_process_output(p, "OUT", data)

def db_on_exit(p, exit_code, term_signal):
log("DB Process exited with code: %d terminated with signal: %d" % (exit_code, term_signal))
Expand Down Expand Up @@ -187,21 +189,22 @@ actor Tester(env, verbose, port_chunk, app_wthreads=4):
error("Client ERR: %s" % (msg))

def app_server_on_stdout(p, data):
log_process_output(p, "OUT: state " + str(state), data)
if state == 0:
if data.find(b"NOW LISTENING", None, None) > -1:
log("Server app listening in state %d, starting TCP client" % state)
state = 1
tcp_client = net.TCPConnection(connect_cap, "127.0.0.1", port, tcpc_on_connect, tcpc_on_receive, tcpc_on_error)
else:
log("In state " + str(state) + ", read unexpected output from server app:" + str(data))
elif state == 6:
if data.find(b"NOW LISTENING", None, None) > -1:
log("Server app listening in state %d, starting TCP client" % state)
state = 7
tcp_client = net.TCPConnection(connect_cap, "127.0.0.1", port, tcpc_on_connect, tcpc_on_receive, tcpc_on_error)
else:
log("In state " + str(state) + ", read unexpected output from server app:" + str(data))
if data is not None:
log_process_output(p, "OUT: state " + str(state), data)
if state == 0:
if data.find(b"NOW LISTENING", None, None) > -1:
log("Server app listening in state %d, starting TCP client" % state)
state = 1
tcp_client = net.TCPConnection(connect_cap, "127.0.0.1", port, tcpc_on_connect, tcpc_on_receive, tcpc_on_error)
else:
log("In state " + str(state) + ", read unexpected output from server app:" + str(data))
elif state == 6:
if data.find(b"NOW LISTENING", None, None) > -1:
log("Server app listening in state %d, starting TCP client" % state)
state = 7
tcp_client = net.TCPConnection(connect_cap, "127.0.0.1", port, tcpc_on_connect, tcpc_on_receive, tcpc_on_error)
else:
log("In state " + str(state) + ", read unexpected output from server app:" + str(data))

log("Running server test")
dbc_start()
Expand Down
24 changes: 13 additions & 11 deletions test/stdlib_auto/test_process.act
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ import process

actor main(env):
def on_stderr(p, data):
print("Received output on stderr:", data)
env.exit(0)

def on_stdout(p, data):
print("Received output on stdout:", data.decode())
if data == b"HELLO\n":
print("All good, exiting..")
await async env.exit(0)
else:
print("Unexpected output, blargh")
await async env.exit(1)
if data is not None:
print("Received output on stderr:", data)
env.exit(0)

def on_stdout(p, data: ?bytes):
if data is not None:
print("Received output on stdout:", data.decode())
if data == b"HELLO\n":
print("All good, exiting..")
await async env.exit(0)
else:
print("Unexpected output, blargh")
await async env.exit(1)

def on_exit(p, exit_code, term_signal):
print("Process exited with code: ", exit_code, " terminated with signal:", term_signal)
Expand Down
Loading

0 comments on commit 73a7de8

Please sign in to comment.