Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Dates = "1"
Logging = "1"
LoggingExtras = "1"
Pkg = "1"
Profile = "1"
Random = "1"
Serialization = "1"
Sockets = "1"
Expand All @@ -30,9 +31,10 @@ DeepDiffs = "ab62b9b5-e342-54a8-a765-a90f495de1a6"
IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Profile = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
XMLDict = "228000da-037f-5747-90a9-8195ccbf91a5"

[targets]
test = ["AutoHashEquals", "DeepDiffs", "IOCapture", "Logging", "Pkg", "Random", "Test", "XMLDict"]
test = ["AutoHashEquals", "DeepDiffs", "IOCapture", "Logging", "Pkg", "Profile", "Random", "Test", "XMLDict"]
46 changes: 34 additions & 12 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ will be run.
`paths` passed to it cannot contain test files, either because the path doesn't exist or
the path points to a file which is not a test file. Default is `false`.
Can also be set using the `RETESTITEMS_VALIDATE_PATHS` environment variable.
- `timeout_profile_wait::Real=0`: When non-zero, a worker that times-out will trigger a CPU profile
for which we will wait `timeout_profile_wait` seconds before terminating the worker.
Zero means no profile will be taken. Can also be set using the `RETESTITEMS_TIMEOUT_PROFILE_WAIT`
environment variable. See the [Profile documentation](https://docs.julialang.org/en/v1/stdlib/Profile/#Triggered-During-Execution)
for more information on triggered profiles. Note you can use `worker_init_expr` to tweak the profile settings on workers.
"""
function runtests end

Expand Down Expand Up @@ -237,14 +242,16 @@ function runtests(
verbose_results::Bool=(logs !== :issues && isinteractive()),
test_end_expr::Expr=Expr(:block),
validate_paths::Bool=parse(Bool, get(ENV, "RETESTITEMS_VALIDATE_PATHS", "false")),
timeout_profile_wait::Real=parse(Int, get(ENV, "RETESTITEMS_TIMEOUT_PROFILE_WAIT", "0")),
)
nworker_threads = _validated_nworker_threads(nworker_threads)
paths′ = _validated_paths(paths, validate_paths)

logs in LOG_DISPLAY_MODES || throw(ArgumentError("`logs` must be one of $LOG_DISPLAY_MODES, got $(repr(logs))"))
report && logs == :eager && throw(ArgumentError("`report=true` is not compatible with `logs=:eager`"))
(0 ≤ memory_threshold ≤ 1) || throw(ArgumentError("`memory_threshold` must be between 0 and 1, got $(repr(memory_threshold))"))
testitem_timeout > 0 || throw(ArgumentError("`testitem_timeout` must be a postive number, got $(repr(testitem_timeout))"))
testitem_timeout > 0 || throw(ArgumentError("`testitem_timeout` must be a positive number, got $(repr(testitem_timeout))"))
timeout_profile_wait >= 0 || throw(ArgumentError("`timeout_profile_wait` must be a non-negative number, got $(repr(timeout_profile_wait))"))
# If we were given paths but none were valid, then nothing to run.
!isempty(paths) && isempty(paths′) && return nothing
shouldrun_combined(ti) = shouldrun(ti) && _shouldrun(name, ti.name) && _shouldrun(tags, ti.tags)
Expand All @@ -253,13 +260,15 @@ function runtests(
nworkers = max(0, nworkers)
retries = max(0, retries)
timeout = ceil(Int, testitem_timeout)
timeout_profile_wait = ceil(Int, timeout_profile_wait)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will throw an InexactError if you pass Inf. Maybe that's fine?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah Infs, NaNs and negative values will throw at some point, but I don't think this is an issue in practice.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a check for the negative numbers as we do the same thing for timeout

(timeout_profile_wait > 0 && Sys.iswindows()) && @warn "CPU profiles on timeout is not supported on Windows, ignoring `timeout_profile_wait`"
debuglvl = Int(debug)
if debuglvl > 0
LoggingExtras.withlevel(LoggingExtras.Debug; verbosity=debuglvl) do
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs, timeout_profile_wait)
end
else
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs, timeout_profile_wait)
end
end

Expand All @@ -273,7 +282,7 @@ end
# By tracking and reusing test environments, we can avoid this issue.
const TEST_ENVS = Dict{String, String}()

function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol)
function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, timeout_profile_wait::Int)
# Don't recursively call `runtests` e.g. if we `include` a file which calls it.
# So we ignore the `runtests(...)` call in `test/runtests.jl` when `runtests(...)`
# was called from the command line.
Expand All @@ -293,7 +302,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
if is_running_test_runtests_jl(proj_file)
# Assume this is `Pkg.test`, so test env already active.
@debugv 2 "Running in current environment `$(Base.active_project())`"
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs, timeout_profile_wait)
else
@debugv 1 "Activating test environment for `$proj_file`"
orig_proj = Base.active_project()
Expand All @@ -306,7 +315,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
testenv = TestEnv.activate()
TEST_ENVS[proj_file] = testenv
end
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs, timeout_profile_wait)
finally
Base.set_active_project(orig_proj)
end
Expand All @@ -317,6 +326,7 @@ end
function _runtests_in_current_env(
shouldrun, paths, projectfile::String, nworkers::Int, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
testitem_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol,
timeout_profile_wait::Int,
)
start_time = time()
proj_name = something(Pkg.Types.read_project(projectfile).name, "")
Expand Down Expand Up @@ -381,7 +391,7 @@ function _runtests_in_current_env(
ti = starting[i]
@spawn begin
with_logger(original_logger) do
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs)
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs, $timeout_profile_wait)
end
end
end
Expand Down Expand Up @@ -492,7 +502,7 @@ end

function manage_worker(
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
default_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol
default_timeout::Int, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol, timeout_profile_wait::Int
)
ntestitems = length(testitems.testitems)
run_number = 1
Expand Down Expand Up @@ -551,23 +561,35 @@ function manage_worker(
end
catch e
@debugv 2 "Error" exception=e
println(DEFAULT_STDOUT[])
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
# Handle the exception
if e isa TimeoutException
@debugv 1 "Test item $(repr(testitem.name)) timed out. Terminating worker $worker"
terminate!(worker)
if timeout_profile_wait > 0
@warn "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \
A CPU profile will be triggered on the worker and then it will be terminated."
trigger_profile(worker, timeout_profile_wait, :timeout)
end
terminate!(worker, :timeout)
wait(worker)
# TODO: We print the captured logs after the worker is terminated,
# which means that we include an annoying stackrace from the worker termination,
# but the profiles don't seem to get flushed properly if we don't do this.
# This is not an issue with eager logs, but when going through a file, this seems to help.
println(DEFAULT_STDOUT[])
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
@error "$worker timed out running test item $(repr(testitem.name)) after $timeout seconds. \
Recording test error."
record_timeout!(testitem, run_number, timeout)
elseif e isa WorkerTerminatedException
println(DEFAULT_STDOUT[])
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
@error "$worker died running test item $(repr(testitem.name)). \
Recording test error."
record_worker_terminated!(testitem, worker, run_number)
else
# We don't expect any other kind of error, so rethrow, which will propagate
# back up to the main coordinator task and throw to the user
println(DEFAULT_STDOUT[])
_print_captured_logs(DEFAULT_STDOUT[], testitem, run_number)
rethrow()
end
# Handle retries
Expand Down
4 changes: 3 additions & 1 deletion src/log_capture.jl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ If target is String it is assumed it is a file path.
_redirect_logs(f, path::String) = open(io->_redirect_logs(f, io), path, "w")
function _redirect_logs(f, target::IO)
target === DEFAULT_STDOUT[] && return f()
colored_io = IOContext(target, :color => get(DEFAULT_STDOUT[], :color, false))
# If we're not doing :eager logs, make sure the displaysize is large so we don't truncate
# CPU profiles.
colored_io = IOContext(target, :color => get(DEFAULT_STDOUT[], :color, false), :displaysize => (10000,10000))
# In case the default logger was changed by the user, we need to make sure the new logstate
# is poinitng to the new stderr.
# Adapted from https://github.com/JuliaIO/Suppressor.jl/blob/cbfc46f1450b03d6b69dad4c35de739290ff0aff/src/Suppressor.jl#L158-L161
Expand Down
20 changes: 20 additions & 0 deletions src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Workers
using Sockets, Serialization

export Worker, remote_eval, remote_fetch, terminate!, WorkerTerminatedException
export trigger_profile

function try_with_timeout(f, timeout)
cond = Threads.Condition()
Expand Down Expand Up @@ -105,6 +106,25 @@ function watch_and_terminate!(w::Worker, ev::Threads.Event)
true
end

# Send signal to the given `Worker` process to trigger a profile.
# Users can customise this profiling in the usual way, e.g. via
# `JULIA_PROFILE_PEEK_HEAP_SNAPSHOT`, but `Profile.set_peek_duration`, `Profile.peek_report[]`
# would have to be modified in the worker process.
# See https://docs.julialang.org/en/v1/stdlib/Profile/#Triggered-During-Execution
# Called when timeout_profile_wait is non-zero.
function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual)
if !Sys.iswindows()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not windows? because there isn't a SIGINFO equivalent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! Looks like i did this because upstream Julia doesn't support this functionality on Windows... but idk why not

@debug "sending profile request to worker $(w.pid) from $from"
if Sys.islinux()
kill(w.process, 10) # SIGUSR1
elseif Sys.isbsd()
kill(w.process, 29) # SIGINFO
end
sleep(timeout_profile_wait) # Leave time for it to print the profile.
end
return nothing
end

# gracefully terminate a worker by sending a shutdown message
# and waiting for the other tasks to perform worker shutdown
function Base.close(w::Worker)
Expand Down
73 changes: 72 additions & 1 deletion test/integrationtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ end
@test err.value == string(ErrorException("Timed out after 4s running test item \"Test item takes 60 seconds\" (run=1)"))

for t in (0, -1.1)
expected = ArgumentError("`testitem_timeout` must be a postive number, got $t")
expected = ArgumentError("`testitem_timeout` must be a positive number, got $t")
@test_throws expected runtests(file; nworkers, testitem_timeout=t)
end
end
Expand Down Expand Up @@ -819,6 +819,77 @@ end
@test ts.time_end - ts.time_start ≈ timeout
end

@testset "CPU profile timeout trigger" begin
using Profile
# We're only testing that the signal was registered and that the stacktrace was printed.
# We also tried testing that the CPU profile was displayed here, but that was too flaky in CI.
function capture_timeout_profile(f, timeout_profile_wait; kwargs...)
logs = mktemp() do path, io
redirect_stdio(stdout=io, stderr=io, stdin=devnull) do
encased_testset() do
if isnothing(timeout_profile_wait)
runtests(joinpath(TEST_FILES_DIR, "_timeout_tests.jl"); nworkers=1, testitem_timeout=3, kwargs...)
else
runtests(joinpath(TEST_FILES_DIR, "_timeout_tests.jl"); nworkers=1, testitem_timeout=3, timeout_profile_wait, kwargs...)
end
end
end
flush(io)
close(io)
read(path, String)
end
f(logs)
@assert occursin("timed out running test item \"Test item takes 60 seconds\" after 3 seconds", logs)
return logs
end

@testset "timeout_profile_wait=0 means no CPU profile" begin
capture_timeout_profile(0) do logs
@test !occursin("Information request received", logs)
end
end


default_peektime = Profile.get_peek_duration()
@testset "non-zero timeout_profile_wait means we collect a CPU profile" begin
capture_timeout_profile(5) do logs
@test occursin("Information request received. A stacktrace will print followed by a $(default_peektime) second profile", logs)
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
@test occursin("Profile collected.", logs)
end
end


@testset "`set_peek_duration` is respected in `worker_init_expr`" begin
capture_timeout_profile(5, worker_init_expr=:(using Profile; Profile.set_peek_duration($default_peektime + 1.0))) do logs
@test occursin("Information request received. A stacktrace will print followed by a $(default_peektime + 1.0) second profile", logs)
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
@test occursin("Profile collected.", logs)
end
end


# The RETESTITEMS_TIMEOUT_PROFILE_WAIT environment variable can be used to set the timeout_profile_wait.
@testset "RETESTITEMS_TIMEOUT_PROFILE_WAIT environment variable" begin
withenv("RETESTITEMS_TIMEOUT_PROFILE_WAIT" => "5") do
capture_timeout_profile(nothing) do logs
@test occursin("Information request received", logs)
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
@test occursin("Profile collected.", logs)
end
end
end

# The profile is collected for each worker thread.
@testset "CPU profile with $(repr(log_capture))" for log_capture in (:eager, :batched)
capture_timeout_profile(5, nworker_threads=VERSION >= v"1.9" ? "3,2" : "3", logs=log_capture) do logs
@test occursin("Information request received", logs)
@test count(r"pthread_cond_wait|__psych_cvwait", logs) > 0 # the stacktrace was printed (will fail on Windows)
@test occursin("Profile collected.", logs)
end
end
end

@testset "worker always crashes immediately" begin
file = joinpath(TEST_FILES_DIR, "_happy_tests.jl")

Expand Down
24 changes: 24 additions & 0 deletions test/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,28 @@ using Test
close(w)
end

@testset "CPU profile" begin
logs = mktemp() do path, io
w = Worker(threads=VERSION >= v"1.9" ? "3,2" : "3", worker_redirect_io=io)
fut = remote_eval(w, :(sleep(5), yield()))
sleep(0.5)
trigger_profile(w, 1, :test)
fetch(fut)
close(w)
flush(io)
close(io)
return read(path, String)
end

@test occursin(r"Thread 1 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
@test occursin(r"Thread 2 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
@test occursin(r"Thread 3 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
if VERSION >= v"1.9"
@test occursin(r"Thread 4 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
@test occursin(r"Thread 5 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
@test !occursin(r"Thread 6 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
else
@test !occursin(r"Thread 4 Task 0x\w+ Total snapshots: \d+. Utilization: \d+%", logs)
end
end
end # workers.jl testset