diff --git a/Project.toml b/Project.toml index 3d7afa6..bf694b7 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "ParallelTestRunner" uuid = "d3525ed8-44d0-4b2c-a655-542cee43accc" authors = ["Valentin Churavy "] -version = "2.5.1" +version = "2.6.0" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" diff --git a/docs/src/api.md b/docs/src/api.md index 9890f75..47cbe3d 100644 --- a/docs/src/api.md +++ b/docs/src/api.md @@ -39,6 +39,20 @@ addworkers default_njobs ``` +## Custom Records + +Per-test data is captured in an [`AbstractTestRecord`](@ref). The default +[`TestRecord`](@ref) stores timing and memory statistics; subtypes can wrap it +to collect additional data (e.g. GPU metrics) by dispatching [`execute`](@ref) +on the new type and reading the baseline through [`parent`](@ref). + +```@docs +AbstractTestRecord +TestRecord +execute +parent(::ParallelTestRunner.AbstractTestRecord) +``` + ## Internal Types These are internal types, not subject to semantic versioning contract (could be changed or removed at any point without notice), not intended for consumption by end-users. diff --git a/src/ParallelTestRunner.jl b/src/ParallelTestRunner.jl index eada6f9..e8c97f7 100644 --- a/src/ParallelTestRunner.jl +++ b/src/ParallelTestRunner.jl @@ -79,8 +79,27 @@ if VERSION >= v"1.13.0-DEV.1044" using Base.ScopedValues end +""" + AbstractTestRecord + +Abstract supertype for per-test result records. [`TestRecord`](@ref) is the +default concrete subtype, carrying the captured test set and baseline timing / +memory statistics. Custom subtypes can attach extra per-test data (e.g. GPU +statistics) by carrying a `base::TestRecord` field and dispatching +[`execute`](@ref) on the new type. See the `RecordType` argument of +[`runtests`](@ref) for how to plug a custom record type into a run. +""" abstract type AbstractTestRecord end +""" + TestRecord <: AbstractTestRecord + +Default per-test record. Holds the captured `DefaultTestSet` alongside the +baseline timing and memory statistics that [`runtests`](@ref) prints and +persists. Custom [`AbstractTestRecord`](@ref) subtypes wrap a `TestRecord` in a +`base` field; [`parent`](@ref) returns that baseline so the default `print_*` +methods work unchanged. +""" struct TestRecord <: AbstractTestRecord value::DefaultTestSet @@ -93,12 +112,24 @@ struct TestRecord <: AbstractTestRecord total_time::Float64 end -function memory_usage(rec::TestRecord) - return rec.rss +""" + parent(rec::AbstractTestRecord) -> TestRecord + +Return the [`TestRecord`](@ref) baseline that a custom record type wraps. By +default, subtypes of `AbstractTestRecord` are expected to carry a +`base::TestRecord` field; override `parent` for a different layout. The default +`print_*` methods read baseline fields through `parent`, so wrapped types +inherit the standard output unchanged. +""" +Base.parent(rec::AbstractTestRecord) = rec.base +Base.parent(rec::TestRecord) = rec + +function memory_usage(rec::AbstractTestRecord) + return parent(rec).rss end -function Base.getindex(rec::TestRecord) - return rec.value +function Base.getindex(rec::AbstractTestRecord) + return parent(rec).value end @@ -121,7 +152,7 @@ struct TestIOContext rss_align::Int end -function test_IOContext(stdout::IO, stderr::IO, lock::ReentrantLock, name_align::Int, verbose::Bool) +function test_IOContext(::Type{<:AbstractTestRecord}, stdout::IO, stderr::IO, lock::ReentrantLock, name_align::Int, verbose::Bool) elapsed_align = textwidth("time (s)") compile_align = textwidth("Compile") gc_align = textwidth("GC (s)") @@ -137,7 +168,7 @@ function test_IOContext(stdout::IO, stderr::IO, lock::ReentrantLock, name_align: ) end -function print_header(ctx::TestIOContext, testgroupheader, workerheader) +function print_header(::Type{<:AbstractTestRecord}, ctx::TestIOContext, testgroupheader, workerheader) lock(ctx.lock) try # header top @@ -160,7 +191,7 @@ function print_header(ctx::TestIOContext, testgroupheader, workerheader) end end -function print_test_started(wrkr, test, ctx::TestIOContext) +function print_test_started(::Type{<:AbstractTestRecord}, wrkr, test, ctx::TestIOContext) lock(ctx.lock) try printstyled(ctx.stdout, test, lpad("($wrkr)", ctx.name_align - textwidth(test) + 1, " "), " │", color = :white) @@ -174,35 +205,36 @@ function print_test_started(wrkr, test, ctx::TestIOContext) end end -function print_test_finished(record::TestRecord, wrkr, test, ctx::TestIOContext) +function print_test_finished(record::AbstractTestRecord, wrkr, test, ctx::TestIOContext) + base = parent(record) lock(ctx.lock) try printstyled(ctx.stdout, test, color = :white) printstyled(ctx.stdout, lpad("($wrkr)", ctx.name_align - textwidth(test) + 1, " "), " │ ", color = :white) - time_str = @sprintf("%7.2f", record.time) + time_str = @sprintf("%7.2f", base.time) printstyled(ctx.stdout, lpad(time_str, ctx.elapsed_align, " "), " │ ", color = :white) if ctx.verbose # pre-testset time - init_time_str = @sprintf("%7.2f", record.total_time - record.time) + init_time_str = @sprintf("%7.2f", base.total_time - base.time) printstyled(ctx.stdout, lpad(init_time_str, ctx.elapsed_align, " "), " │ ", color = :white) # compilation time if VERSION >= v"1.11" - init_time_str = @sprintf("%7.2f", Float64(100*record.compile_time/record.time)) + init_time_str = @sprintf("%7.2f", Float64(100*base.compile_time/base.time)) printstyled(ctx.stdout, lpad(init_time_str, ctx.compile_align, " "), " │ ", color = :white) end end - gc_str = @sprintf("%5.2f", record.gctime) + gc_str = @sprintf("%5.2f", base.gctime) printstyled(ctx.stdout, lpad(gc_str, ctx.gc_align, " "), " │ ", color = :white) - percent_str = @sprintf("%4.1f", 100 * record.gctime / record.time) + percent_str = @sprintf("%4.1f", 100 * base.gctime / base.time) printstyled(ctx.stdout, lpad(percent_str, ctx.percent_align, " "), " │ ", color = :white) - alloc_str = @sprintf("%5.2f", record.bytes / 2^20) + alloc_str = @sprintf("%5.2f", base.bytes / 2^20) printstyled(ctx.stdout, lpad(alloc_str, ctx.alloc_align, " "), " │ ", color = :white) - rss_str = @sprintf("%5.2f", record.rss / 2^20) + rss_str = @sprintf("%5.2f", memory_usage(record) / 2^20) printstyled(ctx.stdout, lpad(rss_str, ctx.rss_align, " "), " │\n", color = :white) flush(ctx.stdout) @@ -211,7 +243,8 @@ function print_test_finished(record::TestRecord, wrkr, test, ctx::TestIOContext) end end -function print_test_failed(record::TestRecord, wrkr, test, ctx::TestIOContext) +function print_test_failed(record::AbstractTestRecord, wrkr, test, ctx::TestIOContext) + base = parent(record) lock(ctx.lock) try printstyled(ctx.stderr, test, color = :red) @@ -221,11 +254,11 @@ function print_test_failed(record::TestRecord, wrkr, test, ctx::TestIOContext) , color = :red ) - time_str = @sprintf("%7.2f", record.time) + time_str = @sprintf("%7.2f", base.time) printstyled(ctx.stderr, lpad(time_str, ctx.elapsed_align + 1, " "), " │", color = :red) if ctx.verbose - init_time_str = @sprintf("%7.2f", record.total_time - record.time) + init_time_str = @sprintf("%7.2f", base.total_time - base.time) printstyled(ctx.stdout, lpad(init_time_str, ctx.elapsed_align + 1, " "), " │ ", color = :red) end @@ -243,7 +276,7 @@ function print_test_failed(record::TestRecord, wrkr, test, ctx::TestIOContext) end end -function print_test_crashed(wrkr, test, ctx::TestIOContext) +function print_test_crashed(::Type{<:AbstractTestRecord}, wrkr, test, ctx::TestIOContext) lock(ctx.lock) try printstyled(ctx.stderr, test, color = :red) @@ -313,7 +346,59 @@ function Test.finish(ts::WorkerTestSet) return ts.wrapped_ts end -function runtest(f, name, init_code, start_time) +""" + execute(::Type{R}, mod::Module, f, name, start_time, custom_args) where {R<:AbstractTestRecord} + +Run the test expression `f` inside the sandbox module `mod` and return an +`R <: AbstractTestRecord`. This is the extension point for custom record +types: dispatch `execute(::Type{MyRecord}, …)` to collect additional per-test +statistics without re-implementing the sandbox scaffolding. + +The default method for [`TestRecord`](@ref) wraps the test set in a +[`WorkerTestSet`](@ref) placeholder (so `DefaultTestSet` doesn't swallow +results at the top level), captures `@timed` stats, and records `Sys.maxrss()`. +Custom implementations commonly call `execute(TestRecord, mod, f, name, +start_time, custom_args)` to reuse that baseline and wrap the returned record +in a new record type. + +Arguments: + +- `mod` — the per-test sandbox module; the test expression `f` is evaluated + into it via `@eval mod`. +- `f` — the test expression from the `testsuite` dictionary. +- `name` — the test name (used as the top-level `@testset` name). +- `start_time` — wall-clock time at which the scheduler picked up this test; + subtract from `time()` to get total elapsed time including worker wait. +- `custom_args` — the `custom_args` value forwarded from [`runtests`](@ref) + (arbitrary, typically a `NamedTuple`). +""" +function execute(::Type{TestRecord}, mod::Module, f, name, start_time, custom_args) + data = @eval mod begin + GC.gc(true) + Random.seed!(1) + + # @testset CustomTestRecord switches the all lower-level testset to our custom testset, + # so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`. + # This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`. + stats = @timed @testset WorkerTestSet "placeholder" begin + @testset DefaultTestSet $name begin + $f + end + end + + compile_time = @static VERSION >= v"1.11" ? stats.compile_time : 0.0 + (; testset=stats.value, stats.time, stats.bytes, stats.gctime, compile_time) + end + + # process results + rss = Sys.maxrss() + record = TestRecord(data..., rss, time() - start_time) + + GC.gc(true) + return record +end + +function runtest(RecordType::Type{<:AbstractTestRecord}, f, name, init_code, start_time, custom_args) function inner() # generate a temporary module to execute the tests in mod = @eval(Main, module $(gensym(name)) end) @@ -325,29 +410,7 @@ function runtest(f, name, init_code, start_time) Core.eval(mod, init_code) - data = @eval mod begin - GC.gc(true) - Random.seed!(1) - - # @testset CustomTestRecord switches the all lower-level testset to our custom testset, - # so we need to have two layers here such that the user-defined testsets are using `DefaultTestSet`. - # This also guarantees our invariant about `WorkerTestSet` containing a single `DefaultTestSet`. - stats = @timed @testset WorkerTestSet "placeholder" begin - @testset DefaultTestSet $name begin - $f - end - end - - compile_time = @static VERSION >= v"1.11" ? stats.compile_time : 0.0 - (; testset=stats.value, stats.time, stats.bytes, stats.gctime, compile_time) - end - - # process results - rss = Sys.maxrss() - record = TestRecord(data..., rss, time() - start_time) - - GC.gc(true) - return record + return execute(RecordType, mod, f, name, start_time, custom_args) end @static if VERSION >= v"1.13.0-DEV.1044" @@ -510,10 +573,16 @@ function addworker(; exeflags = exe[2:end] end - push!(env, "JULIA_NUM_THREADS" => "1") + # don't mutate the caller's vector; multiple workers may share a default + worker_env = copy(env) + push!(worker_env, "JULIA_NUM_THREADS" => "1") # Malt already sets OPENBLAS_NUM_THREADS to 1 - push!(env, "OPENBLAS_NUM_THREADS" => "1") - wrkr = PTRWorker(; exename, exeflags, env) + push!(worker_env, "OPENBLAS_NUM_THREADS" => "1") + wrkr = PTRWorker(; exename, exeflags, env = worker_env) + # make ParallelTestRunner available to `init_worker_code`; users commonly + # need it to reference `AbstractTestRecord`, `execute`, etc. when defining + # custom record types. + Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) if init_worker_code != :() Malt.remote_eval_wait(Main, wrkr.w, init_worker_code) end @@ -699,6 +768,11 @@ end init_code = :(), init_worker_code = :(), test_worker = Returns(nothing), + RecordType::Type{<:AbstractTestRecord} = TestRecord, + custom_args = (;), + exename = nothing, + exeflags = nothing, + env = Vector{Pair{String, String}}(), stdout = Base.stdout, stderr = Base.stderr, max_worker_rss = get_max_worker_rss()) @@ -723,6 +797,25 @@ Several keyword arguments are also supported: - `init_worker_code`: Code use to initialize each worker. This is run only once per worker instead of once per test. - `test_worker`: Optional function that takes a test name and `init_worker_code` if `init_worker_code` is defined and returns a specific worker. When returning `nothing`, the test will be assigned to any available default worker. +- `RecordType`: Concrete subtype of [`AbstractTestRecord`](@ref) used to collect + per-test statistics. Defaults to [`TestRecord`](@ref). To extend the default + record with extra data, define `struct MyRecord <: AbstractTestRecord; + base::TestRecord; …; end` and dispatch [`execute`](@ref) on the new type — + typically by calling `execute(TestRecord, mod, f, name, start_time, + custom_args)` and wrapping the result. The default `print_*` methods read + baseline fields through [`parent`](@ref), so wrapped types inherit the + standard output; override `print_*` only when you need different layout. + The record type must be defined on both the main process and all workers + (e.g. via `init_worker_code`) since it crosses the Malt serialization + boundary. +- `custom_args`: Arbitrary value (typically a `NamedTuple`) forwarded to + [`execute`](@ref). Lets callers thread per-run configuration into a custom + `RecordType`'s `execute` method without going through `init_code`. +- `exename`, `exeflags`, `env`: Forwarded to every internal `addworker` call, so + they affect all default-pool workers (and any respawns). `exename` may be a + `String` or a `Cmd` — passing a `Cmd` lets callers wrap the julia invocation + with a tool such as `compute-sanitizer`. Custom workers created from inside a + `test_worker` hook are the caller's responsibility. - `stdout` and `stderr`: I/O streams to write to (default: `Base.stdout` and `Base.stderr`) - `max_worker_rss`: RSS threshold where a worker will be restarted once it is reached. @@ -800,6 +893,11 @@ issues during long test runs. The memory limit is set based on system architectu function runtests(mod::Module, args::ParsedArgs; testsuite::Dict{String,Expr} = find_tests(pwd()), init_code = :(), init_worker_code = :(), test_worker = Returns(nothing), + RecordType::Type{<:AbstractTestRecord} = TestRecord, + custom_args = (;), + exename = nothing, + exeflags = nothing, + env = Vector{Pair{String, String}}(), stdout = Base.stdout, stderr = Base.stderr, max_worker_rss = get_max_worker_rss()) # # set-up @@ -874,8 +972,8 @@ function runtests(mod::Module, args::ParsedArgs; stderr.lock = print_lock end - io_ctx = test_IOContext(stdout, stderr, print_lock, name_align, !isnothing(args.verbose)) - print_header(io_ctx, testgroupheader, workerheader) + io_ctx = test_IOContext(RecordType, stdout, stderr, print_lock, name_align, !isnothing(args.verbose)) + print_header(RecordType, io_ctx, testgroupheader, workerheader) status_lines_visible = Ref(0) @@ -975,7 +1073,7 @@ function runtests(mod::Module, args::ParsedArgs; # Optionally print verbose started message if args.verbose !== nothing clear_status() - print_test_started(wrkr, test_name, io_ctx) + print_test_started(RecordType, wrkr, test_name, io_ctx) end elseif msg_type == :finished @@ -992,7 +1090,7 @@ function runtests(mod::Module, args::ParsedArgs; test_name, wrkr = msg[2], msg[3] clear_status() - print_test_crashed(wrkr, test_name, io_ctx) + print_test_crashed(RecordType, wrkr, test_name, io_ctx) end end @@ -1056,7 +1154,8 @@ function runtests(mod::Module, args::ParsedArgs; end # if a worker failed, spawn a new one if wrkr === nothing || !Malt.isrunning(wrkr) - wrkr = p = addworker(; init_worker_code, io_ctx.color) + wrkr = p = addworker(; init_worker_code, io_ctx.color, + exename, exeflags, env) end # run the test @@ -1064,7 +1163,8 @@ function runtests(mod::Module, args::ParsedArgs; result = try Malt.remote_eval_wait(Main, wrkr.w, :(import ParallelTestRunner)) Malt.remote_call_fetch(invokelatest, wrkr.w, runtest, - testsuite[test], test, init_code, test_t0) + RecordType, testsuite[test], test, + init_code, test_t0, custom_args) catch ex if isa(ex, InterruptException) # the worker got interrupted, signal other tasks to stop @@ -1169,7 +1269,7 @@ function runtests(mod::Module, args::ParsedArgs; for (testname, result, output, _start, _stop) in results if !isempty(output) print(io_ctx.stdout, "\nOutput generated during execution of '") - if result isa Exception || anynonpass(result.value) + if result isa Exception || anynonpass(result[]) printstyled(io_ctx.stdout, testname; color=:red) else printstyled(io_ctx.stdout, testname; color=:normal) diff --git a/test/runtests.jl b/test/runtests.jl index 0a0efb6..7b4ce92 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -123,6 +123,55 @@ end @test contains(str, "SUCCESS") end +@testset "custom record type" begin + # Custom record wraps the default `TestRecord` and adds one field. It must + # be defined on both the main process and every worker (via + # init_worker_code) because the record crosses the Malt serialization + # boundary. + init_worker_code = quote + using ParallelTestRunner: TestRecord + struct MyRecord <: ParallelTestRunner.AbstractTestRecord + base::TestRecord + extra::String + end + function ParallelTestRunner.execute( + ::Type{MyRecord}, mod::Module, f, name, start_time, custom_args, + ) + base = ParallelTestRunner.execute(TestRecord, mod, f, name, start_time, custom_args) + MyRecord(base, custom_args.tag) + end + function ParallelTestRunner.print_test_finished( + record::MyRecord, wrkr, test, ctx::ParallelTestRunner.TestIOContext, + ) + lock(ctx.lock) + try + println(ctx.stdout, "EXTRA[$test]=$(record.extra)") + flush(ctx.stdout) + finally + unlock(ctx.lock) + end + end + end + + eval(init_worker_code) # also define on the main process so the record deserializes + + testsuite = Dict( + "custom" => quote + @test 1 + 1 == 2 + end, + ) + + io = IOBuffer() + runtests(ParallelTestRunner, ["--verbose"]; testsuite, + init_worker_code, RecordType = MyRecord, + custom_args = (; tag = "hello"), + stdout = io, stderr = io) + str = String(take!(io)) + + @test contains(str, "EXTRA[custom]=hello") + @test contains(str, "SUCCESS") +end + @testset "custom worker" begin function test_worker(name) if name == "needs env var" @@ -158,6 +207,30 @@ end @test contains(str, "SUCCESS") end +@testset "global worker kwargs" begin + # `exename`/`exeflags`/`env` on runtests should propagate to every + # default-pool worker. We verify via an environment variable propagated + # through `env`, Julia flags threaded through `exeflags`, and `exename` + # supplied as a `Cmd` prefixing the julia binary (what CUDA.jl uses to + # wrap julia with `compute-sanitizer`). + testsuite = Dict( + "env var" => quote + @test ENV["GLOBAL_WORKER_TEST"] == "yes" + end, + "threads" => quote + @test Base.Threads.nthreads() == 2 + end, + ) + io = IOBuffer() + runtests(ParallelTestRunner, ["--verbose"]; testsuite, + env = ["GLOBAL_WORKER_TEST" => "yes"], + exeflags = ["--threads=2"], + exename = `$(Base.julia_cmd()[1])`, # trivial Cmd wrapping julia + stdout = io, stderr = io) + str = String(take!(io)) + @test contains(str, "SUCCESS") +end + @testset "custom worker with `init_worker_code`" begin init_worker_code = quote should_be_defined() = true