diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index 1707a06..895cb89 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -7,6 +7,14 @@ CurrentModule = DistributedNext This documents notable changes in DistributedNext.jl. The format is based on [Keep a Changelog](https://keepachangelog.com). +## Unreleased + +### Changed +- The internals were completely refactored to move all global variables into a + single struct ([#61]). This should not be a user-visible change, but of course + it's possible that some things slipped through the cracks so please open an + issue if you encounter any bugs. + ## [v1.2.0] - 2026-03-21 ### Added diff --git a/src/DistributedNext.jl b/src/DistributedNext.jl index 27d7bfb..8dc7ebe 100644 --- a/src/DistributedNext.jl +++ b/src/DistributedNext.jl @@ -88,7 +88,7 @@ function _check_distributed_active() return false end - if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && inited[] + if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX.inited[] @warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other." return true else @@ -123,8 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock) Base.trylock(l::Lockable) = trylock(l.lock) Base.unlock(l::Lockable) = unlock(l.lock) -const REF_ID = Threads.Atomic{Int}(1) -next_ref_id() = Threads.atomic_add!(REF_ID, 1) +next_ref_id() = Threads.atomic_add!(CTX.ref_id, 1) struct RRID whence::Int @@ -149,10 +148,69 @@ include("pmap.jl") include("managers.jl") # LocalManager and SSHManager include("precompile.jl") -_stdlib_watcher_timer::Union{Timer, Nothing} = nothing +# Bundles all mutable global state for a distributed cluster into a single +# object. Currently a single global instance (`CTX`) is used, but multiple +# independent clusters could be supported in the future. +@kwdef struct ClusterContext + # Process identity + lproc::LocalProcess = LocalProcess() + role::Ref{Symbol} = Ref{Symbol}(:master) + + # Process group + pgrp::ProcessGroup = ProcessGroup([]) + + # Worker registries + map_pid_wrkr::Lockable{Dict{Int, Union{Worker, LocalProcess}}, ReentrantLock} = Lockable(Dict{Int, Union{Worker, LocalProcess}}()) + map_sock_wrkr::Lockable{IdDict{Any, Any}, ReentrantLock} = Lockable(IdDict()) + map_del_wrkr::Lockable{Set{Int}, ReentrantLock} = Lockable(Set{Int}()) + map_pid_statuses::Lockable{Dict{Int, Any}, ReentrantLock} = Lockable(Dict{Int, Any}()) + + # Lifecycle callbacks + worker_starting_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}() + worker_started_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}() + worker_exiting_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}() + worker_exited_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}() + + # Cluster manager + cluster_manager::Ref{ClusterManager} = Ref{ClusterManager}() + + # Synchronization + worker_lock::ReentrantLock = ReentrantLock() + inited::Threads.Atomic{Bool} = Threads.Atomic{Bool}(false) + next_pid::Threads.Atomic{Int} = Threads.Atomic{Int}(2) # 1 is reserved for the client (always) + + # Remote references + ref_id::Threads.Atomic{Int} = Threads.Atomic{Int}(1) + # Tracks whether a particular `AbstractRemoteRef` (identified by its RRID) + # exists on this worker. The `client_refs` lock is also used to synchronize + # access to `.refs` and associated `clientset` state. + client_refs::WeakKeyDict{AbstractRemoteRef, Nothing} = WeakKeyDict{AbstractRemoteRef, Nothing}() # used as a WeakKeySet + any_gc_flag::Threads.Condition = Threads.Condition() + + # Serialization state + object_numbers::WeakKeyDict = WeakKeyDict() + obj_number_salt::Ref{Int} = Ref(0) + known_object_data::Dict{UInt64, Any} = Dict{UInt64, Any}() + + # Worker pools / macros + default_worker_pool::Ref{Union{AbstractWorkerPool, Nothing}} = Ref{Union{AbstractWorkerPool, Nothing}}(nothing) + next_worker_idx::Threads.Atomic{Int} = Threads.Atomic{Int}(0) + + # Network / SSH + tunnel_counter::Threads.Atomic{Int} = Threads.Atomic{Int}(1) + tunnel_hosts_map::Dict{String, Semaphore} = Dict{String, Semaphore}() + client_port::Ref{UInt16} = Ref{UInt16}(0) + + # Scoped value for exited callback pid + exited_callback_pid::ScopedValue{Int} = ScopedValue(-1) + + # Stdlib watcher + stdlib_watcher_timer::Ref{Union{Timer, Nothing}} = Ref{Union{Timer, Nothing}}(nothing) +end + +const CTX = ClusterContext() function __init__() - global _stdlib_watcher_timer init_parallel() if ccall(:jl_generating_output, Cint, ()) == 0 @@ -161,12 +219,12 @@ function __init__() # cluster cookie has been set, which is most likely to have been done # through Distributed.init_multi() being called by Distributed.addprocs() or # something. - _stdlib_watcher_timer = Timer(0; interval=1) do timer + CTX.stdlib_watcher_timer[] = Timer(0; interval=1) do timer if _check_distributed_active() close(timer) end end - atexit(() -> close(_stdlib_watcher_timer)) + atexit(() -> close(CTX.stdlib_watcher_timer[])) end end diff --git a/src/cluster.jl b/src/cluster.jl index bbbbad5..11d1a76 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -9,11 +9,8 @@ Cluster managers implement how workers can be added, removed and communicated wi """ abstract type ClusterManager end -# cluster_manager is a global constant -const cluster_manager = Ref{ClusterManager}() - function throw_if_cluster_manager_unassigned() - isassigned(cluster_manager) || error("cluster_manager is unassigned") + isassigned(CTX.cluster_manager) || error("cluster_manager is unassigned") return nothing end @@ -147,8 +144,8 @@ mutable struct Worker Worker(id::Int) = Worker(id, nothing) function Worker(id::Int, conn_func) @assert id > 0 - @lock map_pid_wrkr if haskey(map_pid_wrkr[], id) - return map_pid_wrkr[][id] + @lock CTX.map_pid_wrkr if haskey(CTX.map_pid_wrkr[], id) + return CTX.map_pid_wrkr[][id] end w=new(id, Threads.ReentrantLock(), [], [], false, WorkerState_created, Threads.Condition(), time(), conn_func) w.initialized = Event() @@ -176,12 +173,12 @@ end function check_worker_state(w::Worker) if (@atomic w.state) === WorkerState_created if !isclusterlazy() - if PGRP.topology === :all_to_all + if CTX.pgrp.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker # may not have connected to us yet. Wait for some time. wait_for_conn(w) else - error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology)) + error("peer $(w.id) is not connected to $(myid()). Topology : " * string(CTX.pgrp.topology)) end else w.ct_time = time() @@ -273,20 +270,20 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std stderr_to_stdout && redirect_stderr(stdout) init_worker(cookie) - interface = IPv4(LPROC.bind_addr) - if LPROC.bind_port == 0 - (port, sock) = listenany(interface, LPROC.bind_port_hint) - LPROC.bind_port = Int(port) + interface = IPv4(CTX.lproc.bind_addr) + if CTX.lproc.bind_port == 0 + (port, sock) = listenany(interface, CTX.lproc.bind_port_hint) + CTX.lproc.bind_port = Int(port) else - sock = listen(interface, LPROC.bind_port) + sock = listen(interface, CTX.lproc.bind_port) end errormonitor(@async while isopen(sock) client = accept(sock) process_messages(client, client, true) end) print(out, "julia_worker:") # print header - print(out, "$(LPROC.bind_port)#") # print port - print(out, LPROC.bind_addr) + print(out, "$(CTX.lproc.bind_port)#") # print port + print(out, CTX.lproc.bind_addr) print(out, '\n') flush(out) @@ -407,16 +404,16 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. - cluster_manager[] = manager + CTX.cluster_manager[] = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 - @assert isempty(PGRP.refs) - @assert isempty(client_refs) + @assert isempty(CTX.pgrp.refs) + @assert isempty(CTX.client_refs) # System is started in head node mode, cleanup related entries - empty!(PGRP.workers) - @lock map_pid_wrkr empty!(map_pid_wrkr[]) + empty!(CTX.pgrp.workers) + @lock CTX.map_pid_wrkr empty!(CTX.map_pid_wrkr[]) cluster_cookie(cookie) nothing @@ -430,8 +427,6 @@ end # # Only one addprocs can be in progress at any time # -const worker_lock = ReentrantLock() - """ addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers @@ -481,14 +476,14 @@ function addprocs(manager::ClusterManager; kwargs...) # Call worker-starting callbacks warning_interval = params[:callback_warning_interval] - _run_callbacks_concurrently("worker-starting", worker_starting_callbacks, + _run_callbacks_concurrently("worker-starting", CTX.worker_starting_callbacks, warning_interval, [(manager, params)]) # Add new workers - new_workers = @lock worker_lock addprocs_locked(manager::ClusterManager, params) + new_workers = @lock CTX.worker_lock addprocs_locked(manager::ClusterManager, params) # Call worker-started callbacks - _run_callbacks_concurrently("worker-started", worker_started_callbacks, + _run_callbacks_concurrently("worker-started", CTX.worker_started_callbacks, warning_interval, new_workers) return new_workers @@ -497,12 +492,12 @@ end function addprocs_locked(manager::ClusterManager, params) topology(Symbol(params[:topology])) - if PGRP.topology !== :all_to_all + if CTX.pgrp.topology !== :all_to_all params[:lazy] = false end - if PGRP.lazy === nothing || nprocs() == 1 - PGRP.lazy = params[:lazy] + if CTX.pgrp.lazy === nothing || nprocs() == 1 + CTX.pgrp.lazy = params[:lazy] elseif isclusterlazy() != params[:lazy] throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(), ". Cannot set lazy=", params[:lazy]))) @@ -634,7 +629,7 @@ end function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # only node 1 can add new nodes, since nobody else has the full list of address:port - @assert LPROC.id == 1 + @assert CTX.lproc.id == 1 timeout = worker_timeout() # initiate a connect. Does not wait for connection completion in case of TCP. @@ -683,11 +678,11 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # - On master, receiving a JoinCompleteMsg triggers rr_ntfy_join (signifies that worker setup is complete) join_list = [] - if PGRP.topology === :all_to_all + if CTX.pgrp.topology === :all_to_all # need to wait for lower worker pids to have completed connecting, since the numerical value # of pids is relevant to the connection process, i.e., higher pids connect to lower pids and they # require the value of config.connect_at which is set only upon connection completion - for jw in PGRP.workers + for jw in CTX.pgrp.workers if (jw.id != 1) && (jw.id < w.id) lock(jw.c_state) do # wait for wl to join @@ -699,12 +694,12 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) end end - elseif PGRP.topology === :custom + elseif CTX.pgrp.topology === :custom # wait for requested workers to be up before connecting to them. filterfunc(x) = (x.id != 1) && isdefined(x, :config) && (notnothing(x.config.ident) in something(wconfig.connect_idents, [])) - wlist = filter(filterfunc, PGRP.workers) + wlist = filter(filterfunc, CTX.pgrp.workers) waittime = 0 while wconfig.connect_idents !== nothing && length(wlist) < length(wconfig.connect_idents) @@ -713,7 +708,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) end sleep(1.0) waittime += 1 - wlist = filter(filterfunc, PGRP.workers) + wlist = filter(filterfunc, CTX.pgrp.workers) end for wl in wlist @@ -733,7 +728,7 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) join_list) send_connection_hdr(w, true) enable_threaded_blas = something(wconfig.enable_threaded_blas, false) - join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, enable_threaded_blas, isclusterlazy()) + join_message = JoinPGRPMsg(w.id, all_locs, CTX.pgrp.topology, enable_threaded_blas, isclusterlazy()) send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message) errormonitor(@async manage(w.manager, w.id, w.config, :register)) @@ -742,8 +737,8 @@ function create_worker(manager::ClusterManager, wconfig::WorkerConfig) if timedwait(() -> isready(rr_ntfy_join), timeout) === :timed_out error("worker did not connect within $timeout seconds") end - lock(client_refs) do - delete!(PGRP.refs, ntfy_oid) + lock(CTX.client_refs) do + delete!(CTX.pgrp.refs, ntfy_oid) end return w.id @@ -811,7 +806,7 @@ function check_master_connect() errormonitor( Threads.@spawn begin timeout = worker_timeout() - if timedwait(() -> @lock(map_pid_wrkr, haskey(map_pid_wrkr[], 1)), timeout) === :timed_out + if timedwait(() -> @lock(CTX.map_pid_wrkr, haskey(CTX.map_pid_wrkr[], 1)), timeout) === :timed_out print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n") exit(1) end @@ -825,7 +820,7 @@ end Return the cluster cookie. """ -cluster_cookie() = (init_multi(); LPROC.cookie) +cluster_cookie() = (init_multi(); CTX.lproc.cookie) """ cluster_cookie(cookie) -> cookie @@ -840,14 +835,12 @@ function cluster_cookie(cookie) cookie = rpad(cookie, HDR_COOKIE_LEN) - LPROC.cookie = cookie + CTX.lproc.cookie = cookie cookie end -# 1 is reserved for the client (always) -const next_pid = Threads.Atomic{Int}(2) # Note that atomic_add!() returns the old value, which is what we want -get_next_pid() = Threads.atomic_add!(next_pid, 1) +get_next_pid() = Threads.atomic_add!(CTX.next_pid, 1) mutable struct ProcessGroup name::String @@ -858,22 +851,21 @@ mutable struct ProcessGroup ProcessGroup(w::Vector) = new("pg-default", w, Dict(), :all_to_all, nothing) end -const PGRP = ProcessGroup([]) function topology(t) @assert t in [:all_to_all, :master_worker, :custom] - if (PGRP.topology==t) || ((myid()==1) && (nprocs()==1)) || (myid() > 1) - PGRP.topology = t + if (CTX.pgrp.topology==t) || ((myid()==1) && (nprocs()==1)) || (myid() > 1) + CTX.pgrp.topology = t else - error("Workers with Topology $(PGRP.topology) already exist. Requested Topology $(t) cannot be set.") + error("Workers with Topology $(CTX.pgrp.topology) already exist. Requested Topology $(t) cannot be set.") end t end -isclusterlazy() = something(PGRP.lazy, false) +isclusterlazy() = something(CTX.pgrp.lazy, false) get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) -get_bind_addr(w::LocalProcess) = LPROC.bind_addr +get_bind_addr(w::LocalProcess) = CTX.lproc.bind_addr function get_bind_addr(w::Worker) if w.config.bind_addr === nothing if w.id != myid() @@ -883,25 +875,13 @@ function get_bind_addr(w::Worker) w.config.bind_addr end -# globals -const LPROC = LocalProcess() -const LPROCROLE = Ref{Symbol}(:master) const HDR_VERSION_LEN = 16 const HDR_COOKIE_LEN = 16 -const map_pid_wrkr = Lockable(Dict{Int, Union{Worker, LocalProcess}}()) -const map_sock_wrkr = Lockable(IdDict()) -const map_del_wrkr = Lockable(Set{Int}()) -const _exited_callback_pid = ScopedValue{Int}(-1) -const map_pid_statuses = Lockable(Dict{Int, Any}()) -const worker_starting_callbacks = Dict{Any, Base.Callable}() -const worker_started_callbacks = Dict{Any, Base.Callable}() -const worker_exiting_callbacks = Dict{Any, Base.Callable}() -const worker_exited_callbacks = Dict{Any, Base.Callable}() # whether process is a master or worker in a distributed setup -myrole() = LPROCROLE[] +myrole() = CTX.role[] function myrole!(proctype::Symbol) - LPROCROLE[] = proctype + CTX.role[] = proctype end # Callbacks @@ -980,14 +960,14 @@ try to either keep the callbacks fast to execute, or do the actual work asynchronously by spawning a task in the callback (beware of race conditions if you do this). """ -add_worker_starting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_starting_callbacks; +add_worker_starting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_starting_callbacks; arg_types=Tuple{ClusterManager, Dict}) """ remove_worker_starting_callback(key) Remove the callback for `key` that was added with [`add_worker_starting_callback()`](@ref). """ -remove_worker_starting_callback(key) = _remove_callback(key, worker_starting_callbacks) +remove_worker_starting_callback(key) = _remove_callback(key, CTX.worker_starting_callbacks) """ add_worker_started_callback(f::Base.Callable; key=nothing) -> key @@ -1005,14 +985,14 @@ try to either keep the callbacks fast to execute, or do the actual initialization asynchronously by spawning a task in the callback (beware of race conditions if you do this). """ -add_worker_started_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_started_callbacks) +add_worker_started_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_started_callbacks) """ remove_worker_started_callback(key) Remove the callback for `key` that was added with [`add_worker_started_callback()`](@ref). """ -remove_worker_started_callback(key) = _remove_callback(key, worker_started_callbacks) +remove_worker_started_callback(key) = _remove_callback(key, CTX.worker_started_callbacks) """ add_worker_exiting_callback(f::Base.Callable; key=nothing) -> key @@ -1026,14 +1006,14 @@ All worker-exiting callbacks will be executed concurrently and if they don't all finish before the `callback_timeout` passed to `rmprocs()` then the worker will be removed anyway. """ -add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exiting_callbacks) +add_worker_exiting_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_exiting_callbacks) """ remove_worker_exiting_callback(key) Remove the callback for `key` that was added with [`add_worker_exiting_callback()`](@ref). """ -remove_worker_exiting_callback(key) = _remove_callback(key, worker_exiting_callbacks) +remove_worker_exiting_callback(key) = _remove_callback(key, CTX.worker_exiting_callbacks) """ add_worker_exited_callback(f::Base.Callable; key=nothing) -> key @@ -1051,7 +1031,7 @@ of `WorkerState_exterminated` means the worker died unexpectedly. All worker-exited callbacks will be executed concurrently. If a callback throws an exception it will be caught and printed. """ -add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, worker_exited_callbacks; +add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key, CTX.worker_exited_callbacks; arg_types=Tuple{Int, WorkerState}) """ @@ -1059,7 +1039,7 @@ add_worker_exited_callback(f::Base.Callable; key=nothing) = _add_callback(f, key Remove the callback for `key` that was added with [`add_worker_exited_callback()`](@ref). """ -remove_worker_exited_callback(key) = _remove_callback(key, worker_exited_callbacks) +remove_worker_exited_callback(key) = _remove_callback(key, CTX.worker_exited_callbacks) # cluster management related API """ @@ -1076,7 +1056,7 @@ julia> remotecall_fetch(() -> myid(), 4) 4 ``` """ -myid() = LPROC.id +myid() = CTX.lproc.id """ nprocs() @@ -1095,17 +1075,17 @@ julia> workers() ``` """ function nprocs() - if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) - n = length(PGRP.workers) + if myid() == 1 || (CTX.pgrp.topology === :all_to_all && !isclusterlazy()) + n = length(CTX.pgrp.workers) # filter out workers in the process of being setup/shutdown. - for jw in PGRP.workers + for jw in CTX.pgrp.workers if !isa(jw, LocalProcess) && ((@atomic jw.state) !== WorkerState_connected) n = n - 1 end end return n else - return length(PGRP.workers) + return length(CTX.pgrp.workers) end end @@ -1150,11 +1130,11 @@ julia> procs() ``` """ function procs() - if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) + if myid() == 1 || (CTX.pgrp.topology === :all_to_all && !isclusterlazy()) # filter out workers in the process of being setup/shutdown. - return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] + return Int[x.id for x in CTX.pgrp.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] else - return Int[x.id for x in PGRP.workers] + return Int[x.id for x in CTX.pgrp.workers] end end @@ -1167,14 +1147,14 @@ current worker is filtered out. other_procs() = filter(!=(myid()), procs()) function id_in_procs(id) # faster version of `id in procs()` - if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy()) - for x in PGRP.workers + if myid() == 1 || (CTX.pgrp.topology === :all_to_all && !isclusterlazy()) + for x in CTX.pgrp.workers if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === WorkerState_connected) return true end end else - for x in PGRP.workers + for x in CTX.pgrp.workers if (x.id::Int) == id return true end @@ -1193,8 +1173,8 @@ See also [`other_procs()`](@ref). """ function procs(pid::Integer) if myid() == 1 - all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] - if (pid == 1) || (isa(@lock(map_pid_wrkr, map_pid_wrkr[][pid].manager), LocalManager)) + all_workers = [x for x in CTX.pgrp.workers if isa(x, LocalProcess) || ((@atomic x.state) === WorkerState_connected)] + if (pid == 1) || (isa(@lock(CTX.map_pid_wrkr, CTX.map_pid_wrkr[][pid].manager), LocalManager)) Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)] else ipatpid = get_bind_addr(pid) @@ -1290,8 +1270,8 @@ function setstatus!(x, mod::Module, pid::Int=myid()) end if myid() == 1 - @lock map_pid_statuses begin - statuses = get!(map_pid_statuses[], pid, Dict{Module, Any}()) + @lock CTX.map_pid_statuses begin + statuses = get!(CTX.map_pid_statuses[], pid, Dict{Module, Any}()) statuses[mod] = x end else @@ -1300,8 +1280,8 @@ function setstatus!(x, mod::Module, pid::Int=myid()) end function _getstatus(pid, mod) - @lock map_pid_statuses begin - statuses = get(map_pid_statuses[], pid, nothing) + @lock CTX.map_pid_statuses begin + statuses = get(CTX.map_pid_statuses[], pid, nothing) isnothing(statuses) ? nothing : get(statuses, mod, nothing) end end @@ -1335,7 +1315,7 @@ function getstatus(mod::Module, pid::Int=myid()) # During the worker-exited callbacks this function may be called, at which # point it will not exist in procs(). Thus we check whether the function is # being called for an exited worker and allow it if so. - if !id_in_procs(pid) && _exited_callback_pid[] != pid + if !id_in_procs(pid) && CTX.exited_callback_pid[] != pid throw(ArgumentError("Worker $(pid) does not exist, cannot get its status")) end @@ -1403,12 +1383,12 @@ function rmprocs(pids...; waitfor=typemax(Int), callback_timeout=10) end function _rmprocs(pids, waitfor, callback_timeout) - lock(worker_lock) + lock(CTX.worker_lock) try # Run the callbacks callback_tasks = Tuple{Any, Task}[] for pid in pids - for (name, callback) in worker_exiting_callbacks + for (name, callback) in CTX.worker_exiting_callbacks push!(callback_tasks, (name, Threads.@spawn callback(pid))) end end @@ -1424,7 +1404,7 @@ function _rmprocs(pids, waitfor, callback_timeout) if p == 1 @warn "rmprocs: process 1 not removed" else - w = @lock map_pid_wrkr get(map_pid_wrkr[], p, nothing) + w = @lock CTX.map_pid_wrkr get(CTX.map_pid_wrkr[], p, nothing) if !isnothing(w) set_worker_state(w, WorkerState_terminating) kill(w.manager, p, w.config) @@ -1445,7 +1425,7 @@ function _rmprocs(pids, waitfor, callback_timeout) throw(ErrorException(estr)) end finally - unlock(worker_lock) + unlock(CTX.worker_lock) end end @@ -1463,19 +1443,19 @@ end # No-arg constructor added for compatibility with Julia 1.0 & 1.1, should be deprecated in the future ProcessExitedException() = ProcessExitedException(-1) -worker_from_id(i) = worker_from_id(PGRP, i) +worker_from_id(i) = worker_from_id(CTX.pgrp, i) function worker_from_id(pg::ProcessGroup, i) - @lock map_del_wrkr if !isempty(map_del_wrkr[]) && in(i, map_del_wrkr[]) + @lock CTX.map_del_wrkr if !isempty(CTX.map_del_wrkr[]) && in(i, CTX.map_del_wrkr[]) throw(ProcessExitedException(i)) end - w = @lock map_pid_wrkr get(map_pid_wrkr[], i, nothing) + w = @lock CTX.map_pid_wrkr get(CTX.map_pid_wrkr[], i, nothing) if w === nothing if myid() == 1 error("no process with id $i exists") end w = Worker(i) - @lock map_pid_wrkr map_pid_wrkr[][i] = w + @lock CTX.map_pid_wrkr CTX.map_pid_wrkr[][i] = w else w = w::Union{Worker, LocalProcess} end @@ -1491,7 +1471,7 @@ This is useful when writing custom [`serialize`](@ref) methods for a type, which optimizes the data written out depending on the receiving process id. """ function worker_id_from_socket(s) - w = @lock map_sock_wrkr get(map_sock_wrkr[], s, nothing) + w = @lock CTX.map_sock_wrkr get(CTX.map_sock_wrkr[], s, nothing) if isa(w,Worker) if s === w.r_stream || s === w.w_stream return w.id @@ -1505,30 +1485,30 @@ function worker_id_from_socket(s) end -register_worker(w) = register_worker(PGRP, w) +register_worker(w) = register_worker(CTX.pgrp, w) function register_worker(pg, w) push!(pg.workers, w) - @lock map_pid_wrkr map_pid_wrkr[][w.id] = w + @lock CTX.map_pid_wrkr CTX.map_pid_wrkr[][w.id] = w end function register_worker_streams(w) - @lock map_sock_wrkr begin - map_sock_wrkr[][w.r_stream] = w - map_sock_wrkr[][w.w_stream] = w + @lock CTX.map_sock_wrkr begin + CTX.map_sock_wrkr[][w.r_stream] = w + CTX.map_sock_wrkr[][w.w_stream] = w end end -deregister_worker(pid) = deregister_worker(PGRP, pid) +deregister_worker(pid) = deregister_worker(CTX.pgrp, pid) function deregister_worker(pg, pid) pg.workers = filter(x -> !(x.id == pid), pg.workers) - w = @lock map_pid_wrkr pop!(map_pid_wrkr[], pid, nothing) + w = @lock CTX.map_pid_wrkr pop!(CTX.map_pid_wrkr[], pid, nothing) if isa(w, Worker) if isdefined(w, :r_stream) - @lock map_sock_wrkr begin - pop!(map_sock_wrkr[], w.r_stream, nothing) + @lock CTX.map_sock_wrkr begin + pop!(CTX.map_sock_wrkr[], w.r_stream, nothing) if w.r_stream != w.w_stream - pop!(map_sock_wrkr[], w.w_stream, nothing) + pop!(CTX.map_sock_wrkr[], w.w_stream, nothing) end end end @@ -1536,7 +1516,7 @@ function deregister_worker(pg, pid) if myid() == 1 && (myrole() === :master) && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) - if PGRP.topology !== :all_to_all || isclusterlazy() + if CTX.pgrp.topology !== :all_to_all || isclusterlazy() for rpid in other_workers() try remote_do(deregister_worker, rpid, pid) @@ -1546,12 +1526,12 @@ function deregister_worker(pg, pid) end end end - @lock map_del_wrkr push!(map_del_wrkr[], pid) + @lock CTX.map_del_wrkr push!(CTX.map_del_wrkr[], pid) # delete this worker from our remote reference client sets ids = [] tonotify = [] - lock(client_refs) do + lock(CTX.client_refs) do for (id, rv) in pg.refs if in(pid, rv.clientset) push!(ids, id) @@ -1580,13 +1560,13 @@ function deregister_worker(pg, pid) # pid check. We go to some effort to make sure this works after # deregistering the worker because if it's called beforehand the worker # will incorrectly be shown in e.g. procs(). - @with _exited_callback_pid => pid begin - _run_callbacks_concurrently("worker-exited", worker_exited_callbacks, + @with CTX.exited_callback_pid => pid begin + _run_callbacks_concurrently("worker-exited", CTX.worker_exited_callbacks, warning_interval, [(pid, w.state)]; catch_exceptions=true) end # Delete its statuses - @lock map_pid_statuses delete!(map_pid_statuses[], pid) + @lock CTX.map_pid_statuses delete!(CTX.map_pid_statuses[], pid) end return @@ -1595,7 +1575,7 @@ end function interrupt(pid::Integer) @assert myid() == 1 - w = @lock map_pid_wrkr map_pid_wrkr[][pid] + w = @lock CTX.map_pid_wrkr CTX.map_pid_wrkr[][pid] if isa(w, Worker) manage(w.manager, w.id, w.config, :interrupt) end @@ -1635,11 +1615,11 @@ function check_same_host(pids) # We checkfirst if all test pids have been started using the local manager, # else we check for the same bind_to addr. This handles the special case # where the local ip address may change - as during a system sleep/awake - @lock map_pid_wrkr if all(p -> (p==1) || (isa(map_pid_wrkr[][p].manager, LocalManager)), pids) + @lock CTX.map_pid_wrkr if all(p -> (p==1) || (isa(CTX.map_pid_wrkr[][p].manager, LocalManager)), pids) return true else - first_bind_addr = notnothing(wp_bind_addr(map_pid_wrkr[][pids[1]])) - return all(p -> notnothing(wp_bind_addr(map_pid_wrkr[][p])) == first_bind_addr, pids[2:end]) + first_bind_addr = notnothing(wp_bind_addr(CTX.map_pid_wrkr[][pids[1]])) + return all(p -> notnothing(wp_bind_addr(CTX.map_pid_wrkr[][p])) == first_bind_addr, pids[2:end]) end end end @@ -1707,18 +1687,16 @@ function init_bind_addr() end end - global LPROC - LPROC.bind_addr = bind_addr - LPROC.bind_port = bind_port - LPROC.bind_port_hint = bind_port_hint + CTX.lproc.bind_addr = bind_addr + CTX.lproc.bind_port = bind_port + CTX.lproc.bind_port_hint = bind_port_hint end using Random: randstring # do initialization that's only needed when there is more than 1 processor -const inited = Threads.Atomic{Bool}(false) function init_multi() - if !Threads.atomic_cas!(inited, false, true) + if !Threads.atomic_cas!(CTX.inited, false, true) push!(Base.package_callbacks, _require_callback) atexit(terminate_all_workers) init_bind_addr() @@ -1731,11 +1709,9 @@ function init_parallel() start_gc_msgs_task() # start in "head node" mode, if worker, will override later. - global PGRP - global LPROC - LPROC.id = 1 - @assert isempty(PGRP.workers) - register_worker(LPROC) + CTX.lproc.id = 1 + @assert isempty(CTX.pgrp.workers) + register_worker(CTX.lproc) end write_cookie(io::IO) = print(io.in, string(cluster_cookie(), "\n")) diff --git a/src/clusterserialize.jl b/src/clusterserialize.jl index 347ca9c..9f96abe 100644 --- a/src/clusterserialize.jl +++ b/src/clusterserialize.jl @@ -27,32 +27,27 @@ mutable struct ClusterSerializer{I<:IO} <: AbstractSerializer end ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io) -const object_numbers = WeakKeyDict() -const obj_number_salt = Ref(0) function object_number(s::ClusterSerializer, @nospecialize(l)) - global obj_number_salt, object_numbers - if haskey(object_numbers, l) - return object_numbers[l] + if haskey(CTX.object_numbers, l) + return CTX.object_numbers[l] end # a hash function that always gives the same number to the same # object on the same machine, and is unique over all machines. - ln = obj_number_salt[]+(UInt64(myid())<<44) - obj_number_salt[] += 1 - object_numbers[l] = ln + ln = CTX.obj_number_salt[]+(UInt64(myid())<<44) + CTX.obj_number_salt[] += 1 + CTX.object_numbers[l] = ln return ln::UInt64 end -const known_object_data = Dict{UInt64,Any}() - function lookup_object_number(s::ClusterSerializer, n::UInt64) - return get(known_object_data, n, nothing) + return get(CTX.known_object_data, n, nothing) end function remember_object(s::ClusterSerializer, @nospecialize(o), n::UInt64) - known_object_data[n] = o - if isa(o, Core.TypeName) && !haskey(object_numbers, o) + CTX.known_object_data[n] = o + if isa(o, Core.TypeName) && !haskey(CTX.object_numbers, o) # set up reverse mapping for serialize - object_numbers[o] = n + CTX.object_numbers[o] = n end return nothing end diff --git a/src/macros.jl b/src/macros.jl index b227e12..30f87a8 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -1,8 +1,7 @@ # This file is a part of Julia. License is MIT: https://julialang.org/license -const nextidx = Threads.Atomic{Int}(0) function nextproc() - idx = Threads.atomic_add!(nextidx, 1) + idx = Threads.atomic_add!(CTX.next_worker_idx, 1) return workers()[(idx % nworkers()) + 1] end diff --git a/src/managers.jl b/src/managers.jl index 1da53c0..ce9f917 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -414,9 +414,8 @@ function manage(manager::SSHManager, id::Integer, config::WorkerConfig, op::Symb end end -const tunnel_counter = Threads.Atomic{Int}(1) # This is defined such that the port numbers start at 9201 and wrap around at 32,000 -next_tunnel_port() = (Threads.atomic_add!(tunnel_counter, 1) % 22_800) + 9200 +next_tunnel_port() = (Threads.atomic_add!(CTX.tunnel_counter, 1) % 22_800) + 9200 """ @@ -496,7 +495,7 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi dir = params[:dir] exename = params[:exename] exeflags = params[:exeflags] - bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)` + bind_to = manager.restrict ? `127.0.0.1` : `$(CTX.lproc.bind_addr)` env = Dict{String,String}(params[:env]) # TODO: Maybe this belongs in base/initdefs.jl as a package_environment() function @@ -576,7 +575,6 @@ manage struct DefaultClusterManager <: ClusterManager end -const tunnel_hosts_map = Dict{String, Semaphore}() """ connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO) @@ -624,10 +622,10 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) end if tunnel - if !haskey(tunnel_hosts_map, pubhost) - tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int))) + if !haskey(CTX.tunnel_hosts_map, pubhost) + CTX.tunnel_hosts_map[pubhost] = Semaphore(something(config.max_parallel, typemax(Int))) end - sem = tunnel_hosts_map[pubhost] + sem = CTX.tunnel_hosts_map[pubhost] sshflags = notnothing(config.sshflags) multiplex = something(config.multiplex, false) @@ -664,7 +662,6 @@ function connect_w2w(pid::Int, config::WorkerConfig) (s,s) end -const client_port = Ref{UInt16}(0) function socket_reuse_port(iptype) if ccall(:jl_has_so_reuseport, Int32, ()) == 1 @@ -693,9 +690,9 @@ end function bind_client_port(sock::TCPSocket, iptype) bind_host = iptype(0) - if Sockets.bind(sock, bind_host, client_port[]) + if Sockets.bind(sock, bind_host, CTX.client_port[]) _addr, port = getsockname(sock) - client_port[] = port + CTX.client_port[] = port end return sock end diff --git a/src/messages.jl b/src/messages.jl index fe63a7d..dcd473d 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -193,7 +193,7 @@ end function flush_gc_msgs() try - for w in (PGRP::ProcessGroup).workers + for w in (CTX.pgrp::ProcessGroup).workers if isa(w,Worker) && ((@atomic w.state) == WorkerState_connected) && w.gcflag flush_gc_msgs(w) end @@ -209,7 +209,7 @@ function send_connection_hdr(w::Worker, cookie=true) # else when we initiate a connection we first send the cookie followed by our version. # The remote side validates the cookie. if cookie - write(w.w_stream, LPROC.cookie) + write(w.w_stream, CTX.lproc.cookie) end write(w.w_stream, rpad(VERSION_STRING, HDR_VERSION_LEN)[1:HDR_VERSION_LEN]) end diff --git a/src/process_messages.jl b/src/process_messages.jl index 0f2750a..094caab 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -81,9 +81,9 @@ function run_work_thunk_remotevalue(rv::RemoteValue, thunk) end function schedule_call(rid, thunk) - return lock(client_refs) do + return lock(CTX.client_refs) do rv = RemoteValue(def_rv_channel()) - (PGRP::ProcessGroup).refs[rid] = rv + (CTX.pgrp::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) errormonitor(@async run_work_thunk_remotevalue(rv, thunk)) return rv @@ -220,7 +220,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) if wpid < 1 println(stderr, e, CapturedException(e, catch_backtrace())) println(stderr, "Process($(myid())) - Unknown remote, closing connection.") - elseif @lock(map_del_wrkr, !(wpid in map_del_wrkr[])) + elseif @lock(CTX.map_del_wrkr, !(wpid in CTX.map_del_wrkr[])) werr = worker_from_id(wpid) oldstate = @atomic werr.state set_worker_state(werr, oldstate != WorkerState_terminating ? WorkerState_exterminated : WorkerState_terminated) @@ -318,24 +318,24 @@ function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) throw_if_cluster_manager_unassigned() # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker + w = Worker(msg.from_pid, r_stream, w_stream, CTX.cluster_manager[]; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) end function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, version) - w = @lock map_sock_wrkr map_sock_wrkr[][r_stream] + w = @lock CTX.map_sock_wrkr CTX.map_sock_wrkr[][r_stream] w.version = version end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) throw_if_cluster_manager_unassigned() - LPROC.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker + CTX.lproc.id = msg.self_pid + controller = Worker(1, r_stream, w_stream, CTX.cluster_manager[]; version=version)::Worker notify(controller.initialized) - register_worker(LPROC) + register_worker(CTX.lproc) topology(msg.topology) if !msg.enable_threaded_blas @@ -343,7 +343,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) end lazy = msg.lazy - PGRP.lazy = lazy + CTX.pgrp.lazy = lazy @sync for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() @@ -352,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. - Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig)) + Worker(rpid, ()->connect_to_peer(CTX.cluster_manager[], rpid, wconfig)) else - @async connect_to_peer(cluster_manager[], rpid, wconfig) + @async connect_to_peer(CTX.cluster_manager[], rpid, wconfig) end end end @@ -378,7 +378,7 @@ function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConf end function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version) - w = @lock map_sock_wrkr map_sock_wrkr[][r_stream] + w = @lock CTX.map_sock_wrkr CTX.map_sock_wrkr[][r_stream] environ = something(w.config.environ, Dict()) environ[:cpu_threads] = msg.cpu_threads w.config.environ = environ diff --git a/src/remotecall.jl b/src/remotecall.jl index a1cbb16..ca02791 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -4,16 +4,6 @@ import Base: eltype abstract type AbstractRemoteRef end -""" - client_refs - -Tracks whether a particular `AbstractRemoteRef` -(identified by its RRID) exists on this worker. - -The `client_refs` lock is also used to synchronize access to `.refs` and associated `clientset` state. -""" -const client_refs = WeakKeyDict{AbstractRemoteRef, Nothing}() # used as a WeakKeySet - """ Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing) @@ -67,7 +57,7 @@ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef end function test_existing_ref(r::AbstractRemoteRef) - found = getkey(client_refs, r, nothing) + found = getkey(CTX.client_refs, r, nothing) if found !== nothing @assert r.where > 0 if isa(r, Future) @@ -85,16 +75,16 @@ function test_existing_ref(r::AbstractRemoteRef) return found::typeof(r) end - client_refs[r] = nothing + CTX.client_refs[r] = nothing finalizer(finalize_ref, r) return r end function finalize_ref(r::AbstractRemoteRef) if r.where > 0 # Handle the case of the finalizer having been called manually - if trylock(client_refs.lock) # trylock doesn't call wait which causes yields + if trylock(CTX.client_refs.lock) # trylock doesn't call wait which causes yields try - delete!(client_refs.ht, r) # direct removal avoiding locks + delete!(CTX.client_refs.ht, r) # direct removal avoiding locks if isa(r, RemoteChannel) send_del_client_no_lock(r) else @@ -105,7 +95,7 @@ function finalize_ref(r::AbstractRemoteRef) end r.where = 0 finally - unlock(client_refs.lock) + unlock(CTX.client_refs.lock) end else finalizer(finalize_ref, r) @@ -170,8 +160,8 @@ A low-level API which returns the backing `AbstractChannel` for an `id` returned The call is valid only on the node where the backing channel exists. """ function channel_from_id(id) - rv = lock(client_refs) do - return get(PGRP.refs, id, false) + rv = lock(CTX.client_refs) do + return get(CTX.pgrp.refs, id, false) end if rv === false throw(ErrorException("Local instance of remote reference not found")) @@ -179,9 +169,9 @@ function channel_from_id(id) return rv.c end -lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(PGRP, rrid, f) +lookup_ref(rrid::RRID, f=def_rv_channel) = lookup_ref(CTX.pgrp, rrid, f) function lookup_ref(pg, rrid, f) - return lock(client_refs) do + return lock(CTX.client_refs) do rv = get(pg.refs, rrid, false) if rv === false # first we've heard of this ref @@ -240,9 +230,9 @@ end del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid()) -del_client(id, client) = del_client(PGRP, id, client) +del_client(id, client) = del_client(CTX.pgrp, id, client) function del_client(pg, id, client) - lock(client_refs) do + lock(CTX.client_refs) do _del_client(pg, id, client) end nothing @@ -271,14 +261,13 @@ end # and `send_add_client`. # XXX: Is this worth the additional complexity? # `flush_gc_msgs` has to iterate over all connected workers. -const any_gc_flag = Threads.Condition() function start_gc_msgs_task() errormonitor( Threads.@spawn begin while true - lock(any_gc_flag) do + lock(CTX.any_gc_flag) do # this might miss events - wait(any_gc_flag) + wait(CTX.any_gc_flag) end # Use invokelatest() so that custom message transport streams # for workers can be defined in a newer world age than the Task @@ -301,7 +290,7 @@ end function send_del_client_no_lock(rr) # for gc context to avoid yields if rr.where == myid() - _del_client(PGRP, remoteref_id(rr), myid()) + _del_client(CTX.pgrp, remoteref_id(rr), myid()) elseif id_in_procs(rr.where) # process only if a valid worker process_worker(rr) end @@ -312,8 +301,8 @@ function publish_del_msg!(w::Worker, msg) push!(w.del_msgs, msg) @atomic w.gcflag = true end - lock(any_gc_flag) do - notify(any_gc_flag) + lock(CTX.any_gc_flag) do + notify(CTX.any_gc_flag) end end @@ -331,7 +320,7 @@ function process_worker(rr) end function add_client(id, client) - lock(client_refs) do + lock(CTX.client_refs) do rv = lookup_ref(id) push!(rv.clientset, client) end @@ -356,8 +345,8 @@ function send_add_client(rr::AbstractRemoteRef, i) push!(w.add_msgs, (remoteref_id(rr), i)) @atomic w.gcflag = true end - lock(any_gc_flag) do - notify(any_gc_flag) + lock(CTX.any_gc_flag) do + notify(CTX.any_gc_flag) end end end @@ -459,8 +448,8 @@ function remotecall_fetch(f, w::Worker, args...; kwargs...) rv.waitingfor = w.id send_msg(w, MsgHeader(RRID(0,0), oid), CallMsg{:call_fetch}(f, args, kwargs)) v = take!(rv) - lock(client_refs) do - delete!(PGRP.refs, oid) + lock(CTX.client_refs) do + delete!(CTX.pgrp.refs, oid) end return isa(v, RemoteException) ? throw(v) : v end @@ -501,8 +490,8 @@ function remotecall_wait(f, w::Worker, args...; kwargs...) rr = Future(w) send_msg(w, MsgHeader(remoteref_id(rr), prid), CallWaitMsg(f, args, kwargs)) v = fetch(rv.c) - lock(client_refs) do - delete!(PGRP.refs, prid) + lock(CTX.client_refs) do + delete!(CTX.pgrp.refs, prid) end isa(v, RemoteException) && throw(v) return rr diff --git a/src/workerpool.jl b/src/workerpool.jl index db96820..ed7abc6 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -322,7 +322,6 @@ oversubscription). """ remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...) -const _default_worker_pool = Ref{Union{AbstractWorkerPool, Nothing}}(nothing) """ default_worker_pool() @@ -342,14 +341,14 @@ WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), function default_worker_pool()::AbstractWorkerPool # On workers retrieve the default worker pool from the master when accessed # for the first time - if _default_worker_pool[] === nothing + if CTX.default_worker_pool[] === nothing if myid() == 1 - _default_worker_pool[] = WorkerPool() + CTX.default_worker_pool[] = WorkerPool() else - _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) + CTX.default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) end end - return _default_worker_pool[]::AbstractWorkerPool + return CTX.default_worker_pool[]::AbstractWorkerPool end """ @@ -358,7 +357,7 @@ end Set a [`AbstractWorkerPool`](@ref) to be used by `remote(f)` and [`pmap`](@ref) (by default). """ function default_worker_pool!(pool::AbstractWorkerPool) - _default_worker_pool[] = pool + CTX.default_worker_pool[] = pool end """ diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index baa6fb1..7ff66d0 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -170,21 +170,21 @@ function test_futures_dgc(id) fid = remoteref_id(f) # remote value should be deleted after a fetch - @test remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, fid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid) == true @test f.v === nothing @test fetch(f) == id @test f.v !== nothing yield(); # flush gc msgs - @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, fid)) + @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid)) # if unfetched, it should be deleted after a finalize f = remotecall(myid, id) fid = remoteref_id(f) - @test remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, fid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid) == true @test f.v === nothing finalize(f) yield(); # flush gc msgs - @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, fid)) + @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, fid)) end @testset "GC tests for Futures" begin @@ -204,23 +204,23 @@ end put!(fstore, f) @test fetch(f) == wid1 - @test remotecall_fetch(k->haskey(DistributedNext.PGRP.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == true remotecall_fetch(r->(fetch(fetch(r)); yield()), wid2, fstore) sleep(0.5) # to ensure that wid2 gc messages have been executed on wid1 - @test remotecall_fetch(k->haskey(DistributedNext.PGRP.refs, k), wid1, fid) == false + @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == false # put! should release remote reference since it would have been cached locally f = Future(wid1) fid = remoteref_id(f) # should not be created remotely till accessed - @test remotecall_fetch(k->haskey(DistributedNext.PGRP.refs, k), wid1, fid) == false + @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == false # create it remotely isready(f) - @test remotecall_fetch(k->haskey(DistributedNext.PGRP.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == true put!(f, :OK) - @test remotecall_fetch(k->haskey(DistributedNext.PGRP.refs, k), wid1, fid) == false + @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == false @test fetch(f) === :OK # RemoteException should be thrown on a put! when another process has set the value @@ -231,7 +231,7 @@ end put!(fstore, f) # send f to wid2 put!(f, :OK) # set value from master - @test remotecall_fetch(k->haskey(DistributedNext.PGRP.refs, k), wid1, fid) == true + @test remotecall_fetch(k->haskey(DistributedNext.CTX.pgrp.refs, k), wid1, fid) == true testval = remotecall_fetch(wid2, fstore) do x try @@ -256,14 +256,14 @@ end f = remotecall_wait(identity, id_other, ones(10)) rrid = DistributedNext.RRID(f.whence, f.id) remotecall_fetch(f25847, id_other, f) - @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.PGRP.refs[rrid].clientset, id_other) + @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.CTX.pgrp.refs[rrid].clientset, id_other) remotecall_fetch(f25847, id_other, f) - @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.PGRP.refs[rrid].clientset, id_other) + @test BitSet([id_me]) == remotecall_fetch(()->DistributedNext.CTX.pgrp.refs[rrid].clientset, id_other) finalize(f) yield() # flush gc msgs - @test poll_while(() -> remotecall_fetch(chk_rrid->(yield(); haskey(DistributedNext.PGRP.refs, chk_rrid)), id_other, rrid)) + @test poll_while(() -> remotecall_fetch(chk_rrid->(yield(); haskey(DistributedNext.CTX.pgrp.refs, chk_rrid)), id_other, rrid)) end @testset "GC tests for RemoteChannels" begin @@ -273,12 +273,12 @@ end rrid = remoteref_id(rr) # remote value should be deleted after finalizing the ref - @test remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, rrid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, rrid) == true @test fetch(rr) === :OK - @test remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, rrid) == true + @test remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, rrid) == true finalize(rr) yield(); # flush gc msgs - @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.PGRP.refs, k)), id, rrid)) + @test poll_while(() -> remotecall_fetch(k->(yield();haskey(DistributedNext.CTX.pgrp.refs, k)), id, rrid)) end test_remoteref_dgc(id_me) test_remoteref_dgc(id_other) @@ -291,13 +291,13 @@ end fstore = RemoteChannel(wid2) put!(fstore, rr) - @test timedwait(() -> remotecall_fetch(k -> haskey(DistributedNext.PGRP.refs, k), wid1, rrid), 10) == :ok + @test timedwait(() -> remotecall_fetch(k -> haskey(DistributedNext.CTX.pgrp.refs, k), wid1, rrid), 10) == :ok finalize(rr) # finalize locally yield() # flush gc msgs - @test remotecall_fetch(k -> haskey(DistributedNext.PGRP.refs, k), wid1, rrid) == true + @test remotecall_fetch(k -> haskey(DistributedNext.CTX.pgrp.refs, k), wid1, rrid) == true remotecall_fetch(r -> (finalize(take!(r)); yield(); nothing), wid2, fstore) # finalize remotely sleep(0.5) # to ensure that wid2 messages have been executed on wid1 - @test poll_while(() -> remotecall_fetch(k -> haskey(DistributedNext.PGRP.refs, k), wid1, rrid)) + @test poll_while(() -> remotecall_fetch(k -> haskey(DistributedNext.CTX.pgrp.refs, k), wid1, rrid)) end end @@ -813,7 +813,7 @@ if DoFullTest all_w = workers() # Test sending fake data to workers. The worker processes will print an # error message but should not terminate. - for w in DistributedNext.PGRP.workers + for w in DistributedNext.CTX.pgrp.workers if isa(w, DistributedNext.Worker) local s = connect(w.config.host, w.config.port) write(s, randstring(32)) @@ -1120,7 +1120,7 @@ end end function test_blas_config(pid, expected) - for worker in DistributedNext.PGRP.workers + for worker in DistributedNext.CTX.pgrp.workers if worker.id == pid @test worker.config.enable_threaded_blas == expected return @@ -1628,7 +1628,7 @@ function launch(manager::WorkerArgTester, params::Dict, launched::Array, c::Cond exename = params[:exename] exeflags = params[:exeflags] - cmd = `$exename $exeflags --bind-to $(DistributedNext.LPROC.bind_addr) $(manager.worker_opt)` + cmd = `$exename $exeflags --bind-to $(DistributedNext.CTX.lproc.bind_addr) $(manager.worker_opt)` cmd = pipeline(detach(setenv(cmd, dir=dir))) io = open(cmd, "r+") manager.write_cookie && DistributedNext.write_cookie(io) @@ -1671,7 +1671,7 @@ nprocs()>1 && rmprocs(workers()) exeflags = params[:exeflags] jlcmd = "using DistributedNext; start_worker(\"\"; close_stdin=$(manager.close_stdin), stderr_to_stdout=$(manager.stderr_to_stdout));" - cmd = detach(setenv(`$exename $exeflags --bind-to $(DistributedNext.LPROC.bind_addr) -e $jlcmd`, dir=dir)) + cmd = detach(setenv(`$exename $exeflags --bind-to $(DistributedNext.CTX.lproc.bind_addr) -e $jlcmd`, dir=dir)) proc = open(cmd, "r+") wconfig = WorkerConfig() @@ -1716,7 +1716,7 @@ end remotecall_fetch(p) do ports_lower = [] # ports of pids lower than myid() ports_higher = [] # ports of pids higher than myid() - for w in DistributedNext.PGRP.workers + for w in DistributedNext.CTX.pgrp.workers w.id == myid() && continue port = Sockets._sockname(w.r_stream, true)[2] if (w.id == 1) @@ -2015,7 +2015,7 @@ end # status to have been deleted. Only works if the worker has a status of # course. function wait_for_deregistration(pid) - statuses = DistributedNext.map_pid_statuses + statuses = DistributedNext.CTX.map_pid_statuses @test timedwait(() -> @lock(statuses, !haskey(statuses[], pid)), 10) == :ok end diff --git a/test/distributed_stdlib_detection.jl b/test/distributed_stdlib_detection.jl index f3fc14e..fa0fb34 100644 --- a/test/distributed_stdlib_detection.jl +++ b/test/distributed_stdlib_detection.jl @@ -5,18 +5,20 @@ return String(take!(stderr_buf)) end + warning_msg = "DistributedNext has detected that the Distributed stdlib may be in use" + # Just loading Distributed should do nothing cmd = `$test_exename $test_exeflags -e 'using Distributed, DistributedNext; @assert !DistributedNext._check_distributed_active()'` - @test isempty(get_stderr(cmd)) + @test !contains(get_stderr(cmd), warning_msg) # Only one of the two being active should also do nothing cmd = `$test_exename $test_exeflags -e 'using Distributed, DistributedNext; Distributed.init_multi(); @assert !DistributedNext._check_distributed_active()'` - @test isempty(get_stderr(cmd)) + @test !contains(get_stderr(cmd), warning_msg) cmd = `$test_exename $test_exeflags -e 'using Distributed, DistributedNext; DistributedNext.init_multi(); @assert !DistributedNext._check_distributed_active()'` - @test isempty(get_stderr(cmd)) + @test !contains(get_stderr(cmd), warning_msg) # But both being active at the same time should trigger a warning cmd = `$test_exename $test_exeflags -e 'using Distributed, DistributedNext; Distributed.init_multi(); DistributedNext.init_multi(); @assert DistributedNext._check_distributed_active()'` - @test contains(get_stderr(cmd), "DistributedNext has detected that the Distributed stdlib may be in use") + @test contains(get_stderr(cmd), warning_msg) end diff --git a/test/topology.jl b/test/topology.jl index 5426dcc..448be5d 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -44,7 +44,7 @@ using Random exename = params[:exename] exeflags = params[:exeflags] - cmd = `$exename $exeflags --bind-to $(DistributedNext.LPROC.bind_addr) $(DistributedNext.get_worker_arg())` + cmd = `$exename $exeflags --bind-to $(DistributedNext.CTX.lproc.bind_addr) $(DistributedNext.get_worker_arg())` cmd = pipeline(detach(setenv(cmd, dir=dir))) for i in 1:manager.np io = open(cmd, "r+") @@ -101,7 +101,7 @@ using Random @everywhere if !isdefined(Main, :count_connected_workers) function count_connected_workers() count(x -> isa(x, DistributedNext.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream), - DistributedNext.PGRP.workers) + DistributedNext.CTX.pgrp.workers) end end end