From b6f05914c70897b8800389304595057f6f22c189 Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Tue, 9 Apr 2024 16:52:24 -0600 Subject: [PATCH 1/3] Attempt to introduce graceful close from workers back to coordinator --- src/workers.jl | 14 +++++++++++++- test/workers.jl | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/workers.jl b/src/workers.jl index 2b116514..1ffb3054 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -33,13 +33,17 @@ struct Request # ignoring other fields shutdown::Bool end +is_shutdown(r::Request) = r.shutdown # worker executes Request and returns a serialized Response object *if* Request has an id struct Response result error::Union{Nothing, Exception} id::UInt64 # matches a corresponding Request.id + # if true, worker is shutting down, so we can stop listening to it. + shutdown::Bool end +is_shutdown(r::Response) = r.shutdown # simple Future that coordinator can wait on until a Response comes back for a Request struct Future @@ -232,6 +236,7 @@ function process_responses(w::Worker, ev::Threads.Event) # get the next Response from the worker r = deserialize(w.socket) @assert r isa Response "Received invalid response from worker $(w.pid): $(r)" + is_shutdown(r) && break # println("Received response $(r) from worker $(w.pid)") @lock lock begin @assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)" @@ -318,7 +323,14 @@ function serve_requests(io) while true req = deserialize(io) @assert req isa Request - req.shutdown && break + if is_shutdown(req) + resp = Response(nothing, nothing, rand(UInt64), true) + @lock iolock begin + # println("sending response: $(resp)") + serialize(io, resp) + flush(io) + end + end # println("received request: $(req)") Threads.@spawn begin r = $req diff --git a/test/workers.jl b/test/workers.jl index 35dac221..05692f79 100644 --- a/test/workers.jl +++ b/test/workers.jl @@ -17,7 +17,7 @@ using Test @testset "clean shutdown ($w)" begin close(w) @test !process_running(w.process) - @test w.process.termsignal == Base.SIGTERM + @test w.process.termsignal == 0 @test w.process.exitcode == 0 @test !isopen(w.socket) @test w.terminated From 2f3e31a29ca27afc9e53551c9652a166bb690434 Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Thu, 18 Apr 2024 13:44:14 -0600 Subject: [PATCH 2/3] Fixup bug: forgot to break the new if-statement --- src/workers.jl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/workers.jl b/src/workers.jl index 1ffb3054..117abf4e 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -121,6 +121,7 @@ function Base.close(w::Worker, from::Symbol=:manual) end end wait(w) + @debug "Worker $(w.pid) has closed successfully." return end @@ -324,12 +325,14 @@ function serve_requests(io) req = deserialize(io) @assert req isa Request if is_shutdown(req) + @debug "Received shutdown request on worker $(getpid())" resp = Response(nothing, nothing, rand(UInt64), true) @lock iolock begin # println("sending response: $(resp)") serialize(io, resp) flush(io) end + break end # println("received request: $(req)") Threads.@spawn begin From 2a8768b7cfbc049d7daf345e51d5f56d6583c8e2 Mon Sep 17 00:00:00 2001 From: Nathan Daly Date: Thu, 18 Apr 2024 18:28:44 -0600 Subject: [PATCH 3/3] WIP: Trying to prevent preemptively terminating the process in response to its shutdown signal. But encountering a bug: https://github.com/JuliaLang/julia/issues/54145 --- src/workers.jl | 17 +++++++++++++++-- test/workers.jl | 1 + 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/workers.jl b/src/workers.jl index 117abf4e..14f0be6c 100644 --- a/src/workers.jl +++ b/src/workers.jl @@ -43,6 +43,8 @@ struct Response # if true, worker is shutting down, so we can stop listening to it. shutdown::Bool end +Response(a, b, c) = Response(a, b, c, false) +shutdown_response() = Response(nothing, nothing, rand(UInt64), true) is_shutdown(r::Response) = r.shutdown # simple Future that coordinator can wait on until a Response comes back for a Request @@ -96,6 +98,7 @@ function terminate!(w::Worker, from::Symbol=:manual) if !(w.socket.status == Base.StatusUninit || w.socket.status == Base.StatusInit || w.socket.handle === C_NULL) close(w.socket) end + @debug "Done cleaning up after terminating worker $(w.pid) from $from" return end @@ -210,6 +213,8 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event) try notify(ev) # notify we've started while !process_exited(proc) && !w.terminated + # Core.println(getpid(proc)) + # Core.println(process_exited(proc)) line = readline(proc) if !isempty(line) fn(io, w.pid, line) @@ -237,7 +242,14 @@ function process_responses(w::Worker, ev::Threads.Event) # get the next Response from the worker r = deserialize(w.socket) @assert r isa Response "Received invalid response from worker $(w.pid): $(r)" - is_shutdown(r) && break + if is_shutdown(r) + @debug "Received shutdown response from worker $(w.pid). Waiting for shutdown of $(w.process)" + # TODO(PR): SOMEHOW this wait(p) is not getting interrupted when p dies as a zombie process. + wait(w.process) + @debug "shutdown" + terminate!(w, :process_responses) + break + end # println("Received response $(r) from worker $(w.pid)") @lock lock begin @assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)" @@ -295,6 +307,7 @@ function startworker() serve_requests(accept(sock)) finally close(sock) + @debug "Shutting down worker $(getpid())" exit(0) end end @@ -326,7 +339,7 @@ function serve_requests(io) @assert req isa Request if is_shutdown(req) @debug "Received shutdown request on worker $(getpid())" - resp = Response(nothing, nothing, rand(UInt64), true) + resp = shutdown_response() @lock iolock begin # println("sending response: $(resp)") serialize(io, resp) diff --git a/test/workers.jl b/test/workers.jl index 05692f79..aa2f46e0 100644 --- a/test/workers.jl +++ b/test/workers.jl @@ -41,6 +41,7 @@ using Test w = Worker() @testset "remote_eval/remote_fetch ($w)" begin + @info "starting testset remote_eval/remote_fetch ($w)" expr = quote global x x = 101