Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@ struct Request
# ignoring other fields
shutdown::Bool
end
is_shutdown(r::Request) = r.shutdown

# worker executes Request and returns a serialized Response object *if* Request has an id
struct Response
result
error::Union{Nothing, Exception}
id::UInt64 # matches a corresponding Request.id
# if true, worker is shutting down, so we can stop listening to it.
shutdown::Bool
end
Response(a, b, c) = Response(a, b, c, false)
shutdown_response() = Response(nothing, nothing, rand(UInt64), true)
is_shutdown(r::Response) = r.shutdown

# simple Future that coordinator can wait on until a Response comes back for a Request
struct Future
Expand Down Expand Up @@ -92,6 +98,7 @@ function terminate!(w::Worker, from::Symbol=:manual)
if !(w.socket.status == Base.StatusUninit || w.socket.status == Base.StatusInit || w.socket.handle === C_NULL)
close(w.socket)
end
@debug "Done cleaning up after terminating worker $(w.pid) from $from"
return
end

Expand All @@ -117,6 +124,7 @@ function Base.close(w::Worker, from::Symbol=:manual)
end
end
wait(w)
@debug "Worker $(w.pid) has closed successfully."
return
end

Expand Down Expand Up @@ -205,6 +213,8 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
try
notify(ev) # notify we've started
while !process_exited(proc) && !w.terminated
# Core.println(getpid(proc))
# Core.println(process_exited(proc))
line = readline(proc)
if !isempty(line)
fn(io, w.pid, line)
Expand Down Expand Up @@ -232,6 +242,14 @@ function process_responses(w::Worker, ev::Threads.Event)
# get the next Response from the worker
r = deserialize(w.socket)
@assert r isa Response "Received invalid response from worker $(w.pid): $(r)"
if is_shutdown(r)
@debug "Received shutdown response from worker $(w.pid). Waiting for shutdown of $(w.process)"
# TODO(PR): SOMEHOW this wait(p) is not getting interrupted when p dies as a zombie <defunct> process.
wait(w.process)
@debug "shutdown"
terminate!(w, :process_responses)
break
end
# println("Received response $(r) from worker $(w.pid)")
@lock lock begin
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)"
Expand Down Expand Up @@ -289,6 +307,7 @@ function startworker()
serve_requests(accept(sock))
finally
close(sock)
@debug "Shutting down worker $(getpid())"
exit(0)
end
end
Expand Down Expand Up @@ -318,7 +337,16 @@ function serve_requests(io)
while true
req = deserialize(io)
@assert req isa Request
req.shutdown && break
if is_shutdown(req)
@debug "Received shutdown request on worker $(getpid())"
resp = shutdown_response()
@lock iolock begin
# println("sending response: $(resp)")
serialize(io, resp)
flush(io)
end
break
end
# println("received request: $(req)")
Threads.@spawn begin
r = $req
Expand Down
3 changes: 2 additions & 1 deletion test/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ using Test
@testset "clean shutdown ($w)" begin
close(w)
@test !process_running(w.process)
@test w.process.termsignal == Base.SIGTERM
@test w.process.termsignal == 0
@test w.process.exitcode == 0
@test !isopen(w.socket)
@test w.terminated
Expand All @@ -41,6 +41,7 @@ using Test

w = Worker()
@testset "remote_eval/remote_fetch ($w)" begin
@info "starting testset remote_eval/remote_fetch ($w)"
expr = quote
global x
x = 101
Expand Down