Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ CurrentModule = DistributedNext
This documents notable changes in DistributedNext.jl. The format is based on
[Keep a Changelog](https://keepachangelog.com).

## Unreleased

### Changed
- The internals were completely refactored to move all global variables into a
single struct ([#61]). This should not be a user-visible change, but of course
it's possible that some things slipped through the cracks so please open an
issue if you encounter any bugs.

## [v1.2.0] - 2026-03-21

### Added
Expand Down
72 changes: 65 additions & 7 deletions src/DistributedNext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ function _check_distributed_active()
return false
end

if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && inited[]
if isdefined(Base.loaded_modules[distributed_pkgid].LPROC, :cookie) && CTX.inited[]
@warn "DistributedNext has detected that the Distributed stdlib may be in use. Be aware that these libraries are not compatible, you should use either one or the other."
return true
else
Expand Down Expand Up @@ -123,8 +123,7 @@ Base.lock(l::Lockable) = lock(l.lock)
Base.trylock(l::Lockable) = trylock(l.lock)
Base.unlock(l::Lockable) = unlock(l.lock)

const REF_ID = Threads.Atomic{Int}(1)
next_ref_id() = Threads.atomic_add!(REF_ID, 1)
next_ref_id() = Threads.atomic_add!(CTX.ref_id, 1)

struct RRID
whence::Int
Expand All @@ -149,10 +148,69 @@ include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
include("precompile.jl")

_stdlib_watcher_timer::Union{Timer, Nothing} = nothing
# Bundles all mutable global state for a distributed cluster into a single
# object. Currently a single global instance (`CTX`) is used, but multiple
# independent clusters could be supported in the future.
@kwdef struct ClusterContext
# Process identity
lproc::LocalProcess = LocalProcess()
role::Ref{Symbol} = Ref{Symbol}(:master)

# Process group
pgrp::ProcessGroup = ProcessGroup([])

# Worker registries
map_pid_wrkr::Lockable{Dict{Int, Union{Worker, LocalProcess}}, ReentrantLock} = Lockable(Dict{Int, Union{Worker, LocalProcess}}())
map_sock_wrkr::Lockable{IdDict{Any, Any}, ReentrantLock} = Lockable(IdDict())
map_del_wrkr::Lockable{Set{Int}, ReentrantLock} = Lockable(Set{Int}())
map_pid_statuses::Lockable{Dict{Int, Any}, ReentrantLock} = Lockable(Dict{Int, Any}())

# Lifecycle callbacks
worker_starting_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
worker_started_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
worker_exiting_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()
worker_exited_callbacks::Dict{Any, Base.Callable} = Dict{Any, Base.Callable}()

# Cluster manager
cluster_manager::Ref{ClusterManager} = Ref{ClusterManager}()

# Synchronization
worker_lock::ReentrantLock = ReentrantLock()
inited::Threads.Atomic{Bool} = Threads.Atomic{Bool}(false)
next_pid::Threads.Atomic{Int} = Threads.Atomic{Int}(2) # 1 is reserved for the client (always)

# Remote references
ref_id::Threads.Atomic{Int} = Threads.Atomic{Int}(1)
# Tracks whether a particular `AbstractRemoteRef` (identified by its RRID)
# exists on this worker. The `client_refs` lock is also used to synchronize
# access to `.refs` and associated `clientset` state.
client_refs::WeakKeyDict{AbstractRemoteRef, Nothing} = WeakKeyDict{AbstractRemoteRef, Nothing}() # used as a WeakKeySet
any_gc_flag::Threads.Condition = Threads.Condition()

# Serialization state
object_numbers::WeakKeyDict = WeakKeyDict()
obj_number_salt::Ref{Int} = Ref(0)
known_object_data::Dict{UInt64, Any} = Dict{UInt64, Any}()

# Worker pools / macros
default_worker_pool::Ref{Union{AbstractWorkerPool, Nothing}} = Ref{Union{AbstractWorkerPool, Nothing}}(nothing)
next_worker_idx::Threads.Atomic{Int} = Threads.Atomic{Int}(0)

# Network / SSH
tunnel_counter::Threads.Atomic{Int} = Threads.Atomic{Int}(1)
tunnel_hosts_map::Dict{String, Semaphore} = Dict{String, Semaphore}()
client_port::Ref{UInt16} = Ref{UInt16}(0)

# Scoped value for exited callback pid
exited_callback_pid::ScopedValue{Int} = ScopedValue(-1)

# Stdlib watcher
stdlib_watcher_timer::Ref{Union{Timer, Nothing}} = Ref{Union{Timer, Nothing}}(nothing)
end

const CTX = ClusterContext()

function __init__()
global _stdlib_watcher_timer
init_parallel()

if ccall(:jl_generating_output, Cint, ()) == 0
Expand All @@ -161,12 +219,12 @@ function __init__()
# cluster cookie has been set, which is most likely to have been done
# through Distributed.init_multi() being called by Distributed.addprocs() or
# something.
_stdlib_watcher_timer = Timer(0; interval=1) do timer
CTX.stdlib_watcher_timer[] = Timer(0; interval=1) do timer
if _check_distributed_active()
close(timer)
end
end
atexit(() -> close(_stdlib_watcher_timer))
atexit(() -> close(CTX.stdlib_watcher_timer[]))
end
end

Expand Down
Loading
Loading