Skip to content

Commit 516f8de

Browse files
committed
Adding simple carrier functionality, tests and docs.
1 parent 1a643f0 commit 516f8de

File tree

12 files changed

+269
-48
lines changed

12 files changed

+269
-48
lines changed

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
1313
Mango = "5e49fdec-d473-4d14-b295-7bff2fcf1925"
1414
OSQP = "ab2f91bb-94b4-55e3-9ba0-7f65df51de79"
1515
Revise = "295af30f-e4ad-537b-8983-00126c2a3abe"
16+
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
1617

1718
[compat]
1819
AutoHashEquals = "2.2.0"
@@ -23,6 +24,7 @@ LinearAlgebra = "^1.9"
2324
Mango = "^0.5"
2425
OSQP = "0.8.1"
2526
Revise = "3.6.4"
27+
UUIDs = "1.11.0"
2628
julia = "^1.9"
2729

2830
[extras]

docs/src/carrier/mango.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,23 @@
11
To use the distributed algorithms, the role [`DistributedOptimizationRole`](@ref) can be used to integrate an algorithm.
22

3-
Some Distributed Algorithms require a coordinator, in this case another role can be used additionally [`CoordinatorRole`](@ref).
3+
Some distributed algorithms require a coordinator, in this case another role can be used additionally [`CoordinatorRole`](@ref).
4+
5+
```julia
6+
using Mango
7+
using DistributedResourceOptimization
8+
9+
container = create_tcp_container("127.0.0.1", 5555)
10+
11+
agent_one = add_agent_composed_of(container, DistributedOptimizationRole(
12+
create_cohda_participant(1, [[0.0, 1, 2], [1, 2, 3]])))
13+
agent_two = add_agent_composed_of(container, DistributedOptimizationRole(
14+
create_cohda_participant(2, [[0.0, 1, 2], [1, 2, 3]])))
15+
16+
initial_message = create_cohda_start_message([1.2, 2, 3])
17+
18+
auto_assign!(complete_topology(2), container)
19+
20+
activate(container) do
21+
send_message(agent_one, initial_message, address(agent_two))
22+
end
23+
```

docs/src/carrier/simple.md

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,33 @@
1-
TBD
1+
Often a carrier, managing neighbors, communication, message handling, is a big overhead not necessary for simulating the distributed optimization procedure. For this reason DRO.jl implements a simple carrier, which can be used to simplify this part of distributed computing.
2+
3+
The following example demonstrates how to use this simple carrier, here using the example of COHDA.
4+
5+
```julia
6+
using Mango
7+
using DistributedResourceOptimization
8+
9+
container = ActorContainer()
10+
actor_one = SimpleCarrier(container, create_cohda_participant(1, [[0.0, 1, 2], [1, 2, 3]]))
11+
actor_two = SimpleCarrier(container, create_cohda_participant(2, [[0.0, 1, 2], [1, 2, 3]]))
12+
13+
initial_message = create_cohda_start_message([1.2, 2, 3])
14+
15+
send_to_other(actor_one, initial_message, cid(actor_two))
16+
17+
```
18+
19+
To simplify this, it is also possible to completly omit the carrier and container construction, which makes carrier handling to a one-liner using [`start_distributed_optimization`](@ref).
20+
21+
```julia
22+
using Mango
23+
using DistributedResourceOptimization
24+
25+
actor_one = create_cohda_participant(1, [[0.0, 1, 2], [1, 2, 3]])
26+
actor_two = create_cohda_participant(2, [[0.0, 1, 2], [1, 2, 3]])
27+
28+
initial_message = create_cohda_start_message([1.2, 2, 3])
29+
30+
start_distributed_optimization([actor_one, actor_two], coordinator, initial_message)
31+
```
32+
33+
For coordinated optimization, the method [`start_coordinated_optimization`](@ref) can be used.

src/DistributedResourceOptimization.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11

22
module DistributedResourceOptimization
33

4+
include("misc/util.jl")
45
include("carrier/core.jl")
56
include("algorithm/core.jl")
67
include("algorithm/heuristic/cohda/core.jl")
@@ -12,5 +13,6 @@ include("algorithm/admm/consensus_admm.jl")
1213
include("algorithm/admm/sharing_admm.jl")
1314

1415
include("carrier/mango.jl")
16+
include("carrier/simple.jl")
1517

1618
end

src/algorithm/heuristic/cohda/core.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ function process_exchange_message(algorithm_data::COHDAAlgorithmData, messages::
270270
# act
271271
for other in others(carrier, "$(algorithm_data.participant_id)")
272272
wm = act(algorithm_data, sysconf, candidate)
273-
send(carrier, wm, other)
273+
send_to_other(carrier, wm, other)
274274
end
275275
end
276276
end

src/carrier/core.jl

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,14 @@
1-
export Carrier, send_to_other, reply_to_other, send_and_wait_for_answers, send_awaitable, schedule_using, others
1+
export Carrier, send_to_other, reply_to_other, send_awaitable, schedule_using, others
2+
3+
mutable struct EventWithValue
4+
event::Base.Event
5+
value::Any
6+
end
7+
8+
function Base.wait(event::EventWithValue)
9+
wait(event.event)
10+
return event.value
11+
end
212

313
"""
414
Carrier
@@ -49,27 +59,6 @@ function reply_to_other(carrier::Carrier, content_data::Any, meta::Any)
4959
end
5060

5161

52-
"""
53-
send_and_wait_for_answers(carrier::Carrier, content_data::Any, receivers::Any)
54-
55-
Sends `content_data` using `carrier` to the given `receivers` and waits for their responses.
56-
57-
# Arguments
58-
- `carrier::Carrier`: The carrier object responsible for sending the data.
59-
- `content_data::Any`: The data to be sent to the receivers.
60-
- `receivers::Any`: The target receivers to which the data will be sent.
61-
62-
# Returns
63-
Returns the responses received from the receivers after sending the data.
64-
65-
# Notes
66-
This function blocks execution until all expected answers are received from the receivers.
67-
"""
68-
function send_and_wait_for_answers(carrier::Carrier, content_data::Any, receivers::Any)
69-
throw("NotImplemented")
70-
end
71-
72-
7362
"""
7463
send_awaitable(carrier::Carrier, content_data::Any, receiver::Any)
7564

src/carrier/mango.jl

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -116,16 +116,6 @@ function send_to_other(carrier::MangoCarrier, content::Any, receiver::AgentAddre
116116
return send_message(carrier.parent, content, receiver, optimization_message=true)
117117
end
118118

119-
mutable struct EventWithValue
120-
event::Base.Event
121-
value::Any
122-
end
123-
124-
function Base.wait(event::EventWithValue)
125-
wait(event.event)
126-
return event.value
127-
end
128-
129119
function Base.wait(carrier::MangoCarrier, event::EventWithValue)
130120
wait(carrier.parent.context.agent.scheduler, event.event)
131121
return event.value
@@ -144,17 +134,6 @@ function reply_to_other(carrier::MangoCarrier, content_data::Any, meta::Any)
144134
reply_to(carrier.parent, content_data, meta)
145135
end
146136

147-
function send_and_wait_for_answers(carrier::MangoCarrier, content_data::Any, receivers::Vector{AgentAddress})
148-
role = carrier.parent
149-
event = EventWithValue(Base.Event(), nothing)
150-
send_and_handle_answers(role, content_data, receivers, optimization_message=true) do _, answers, _
151-
event.value = answers
152-
notify(event.event)
153-
end
154-
wait(event.event)
155-
return event.value
156-
end
157-
158137
function schedule_using(carrier::MangoCarrier, to_be_scheduled::Function, delay_s::Float64)
159138
schedule(carrier.parent, to_be_scheduled, AwaitableTaskData(Timer(delay_s)))
160139
end

src/carrier/simple.jl

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
export SimpleCarrier, ActorContainer, cid, start_distributed_optimization, start_coordinated_optimization
2+
3+
4+
using UUIDs
5+
6+
abstract type AbstractSimpleCarrier <: Carrier end
7+
8+
"""
9+
ActorContainer
10+
11+
A container to manage multiple `SimpleCarrier`.
12+
"""
13+
struct ActorContainer
14+
actors::Vector{AbstractSimpleCarrier}
15+
function ActorContainer()
16+
return new(Vector{AbstractSimpleCarrier}())
17+
end
18+
end
19+
20+
function register(actor_container::ActorContainer, carrier::AbstractSimpleCarrier)
21+
push!(actor_container.actors, carrier)
22+
carrier.aid = length(actor_container.actors)
23+
end
24+
25+
"""
26+
SimpleCarrier
27+
28+
A concrete implementation of the `Carrier` type representing a simple carrier for distributed resource optimization.
29+
This carrier facilitates communication and scheduling among distributed algorithms within an `ActorContainer`.
30+
"""
31+
mutable struct SimpleCarrier <: AbstractSimpleCarrier
32+
container::ActorContainer
33+
actor::Union{<:Coordinator, <:DistributedAlgorithm}
34+
aid::Real
35+
uuid_to_handler::Dict{UUID, Function}
36+
function SimpleCarrier(container::ActorContainer, actor::Union{<:Coordinator, <:DistributedAlgorithm})
37+
carrier = new(container, actor, -1, Dict{UUID, Function}())
38+
register(container, carrier)
39+
return carrier
40+
end
41+
end
42+
43+
"""
44+
cid(carrier::SimpleCarrier)
45+
return the carrier id
46+
"""
47+
function cid(carrier::SimpleCarrier)
48+
return carrier.aid
49+
end
50+
51+
function Base.wait(carrier::SimpleCarrier, event::EventWithValue)
52+
wait(event.event)
53+
return event.value
54+
end
55+
56+
function _dispatch_to(carrier, content, meta)
57+
if haskey(meta, :message_id) && haskey(carrier.uuid_to_handler, meta[:message_id])
58+
carrier.uuid_to_handler[meta[:message_id]](carrier, content, meta)
59+
else
60+
on_exchange_message(carrier.actor, carrier, content, meta)
61+
end
62+
end
63+
64+
function send_to_other(carrier::SimpleCarrier, content::Any, receiver::Real; meta::Dict{Symbol,Any}=Dict{Symbol,Any}())
65+
other_carrier::Carrier = carrier.container.actors[receiver]
66+
main_meta = Dict(:sender => carrier.aid, :message_id => uuid4())
67+
union_meta = merge(main_meta, meta) # important: meta can override main_meta entries
68+
return @spawnlog begin
69+
_dispatch_to(other_carrier, content, union_meta)
70+
end
71+
end
72+
73+
function reply_to_other(carrier::SimpleCarrier, content_data::Any, meta)
74+
return send_to_other(carrier, content_data, meta[:sender], meta=merge(meta, Dict(:reply => true)))
75+
end
76+
77+
function send_awaitable(carrier::SimpleCarrier, content::Any, receiver::Real; meta::Dict{Symbol,Any}=Dict{Symbol,Any}())
78+
other_carrier::Carrier = carrier.container.actors[receiver]
79+
main_meta = Dict(:sender => carrier.aid, :message_id => uuid4())
80+
union_meta = merge(main_meta, meta) # important: meta can override main_meta entries
81+
event = EventWithValue(Base.Event(), nothing)
82+
carrier.uuid_to_handler[union_meta[:message_id]] = function(other_carrier::Carrier, content::Any, union_meta::Dict{Symbol,Any})
83+
event.value = content
84+
notify(event.event)
85+
end
86+
@spawnlog begin
87+
_dispatch_to(other_carrier, content, union_meta)
88+
end
89+
return event
90+
end
91+
92+
function schedule_using(carrier::SimpleCarrier, to_be_scheduled::Function, delay_s::Float64)
93+
@spawnlog begin
94+
sleep(delay_s)
95+
to_be_scheduled()
96+
end
97+
end
98+
99+
function others(carrier::SimpleCarrier, id::String)
100+
return setdiff!(collect(range(1, length(carrier.container.actors))), [cid(carrier)])
101+
end
102+
103+
"""
104+
start_distributed_optimization(actors::Vector{<:DistributedAlgorithm}, start_message::Any)
105+
106+
Start a distributed optimization process among the provided `actors` using the given `start_message`.
107+
Return a waitable object that can be used to monitor the progress of the optimization.
108+
"""
109+
function start_distributed_optimization(actors::Vector{<:DistributedAlgorithm}, start_message::Any)
110+
actor_container = ActorContainer()
111+
carriers = [SimpleCarrier(actor_container, actor) for actor in actors]
112+
return send_to_other(carriers[1], start_message, cid(carriers[2]))
113+
end
114+
115+
"""
116+
start_coordinated_optimization(actors::Vector{<:DistributedAlgorithm}, coordinator::Coordinator, start_message::Any)
117+
118+
Start a coordinated optimization process among the provided `actors` and a `coordinator` using the given `start_message`.
119+
Return the result of the optimization process. The coordinator manages the overall optimization flow.
120+
"""
121+
function start_coordinated_optimization(actors::Vector{<:DistributedAlgorithm}, coordinator::Coordinator, start_message::Any)
122+
actor_container = ActorContainer()
123+
carriers = [SimpleCarrier(actor_container, actor) for actor in actors]
124+
coordinator_carrier = SimpleCarrier(actor_container, coordinator)
125+
return start_optimization(coordinator, coordinator_carrier, start_message, Dict())
126+
end

src/misc/util.jl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
function log_exception(e, backtrace=nothing)
3+
bt = catch_backtrace()
4+
if !isnothing(backtrace)
5+
bt = backtrace
6+
end
7+
msg = sprint(io -> begin
8+
println(io, "Exception occurred in thread ", Threads.threadid())
9+
showerror(io, e)
10+
println(io)
11+
Base.show_backtrace(io, bt)
12+
end)
13+
@error msg
14+
end
15+
16+
macro spawnlog(expr)
17+
quote
18+
Threads.@spawn try
19+
$(esc(expr))
20+
catch ex
21+
log_exception(ex)
22+
rethrow(ex)
23+
end
24+
end
25+
end

test/admm/sharing_admm_tests.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,4 +225,21 @@ end
225225
@test handle.got_it
226226
@test handle2.got_it
227227
@test handle3.got_it
228+
end
229+
230+
231+
@testset "TestFlexADMMAWithSimpleCarrierExpress" begin
232+
flex_actor = create_admm_flex_actor_one_to_many(10, [0.1, 0.5, -1])
233+
flex_actor2 = create_admm_flex_actor_one_to_many(15, [0.1, 0.5, -1])
234+
flex_actor3 = create_admm_flex_actor_one_to_many(10, [-1.0, 0.0, 1.0])
235+
236+
coordinator = create_sharing_target_distance_admm_coordinator()
237+
238+
admm_start = create_admm_start(create_admm_sharing_data([-4, 0, 6], [5,1,1]))
239+
240+
start_coordinated_optimization([flex_actor, flex_actor2, flex_actor3], coordinator, admm_start)
241+
242+
@test isapprox(flex_actor.x, [0.00018920502370025307, -6.385117748282918e-5, 0.0001192856645796826], atol=1e-3)
243+
@test isapprox(flex_actor2.x, [0.00018947342781542052, -6.398798323709664e-5, 0.00011955174808086514], atol=1e-3)
244+
@test isapprox(flex_actor3.x, [-3.9830282351073403, -7.203077063324391e-7, 3.98302823714599], atol=1e-3)
228245
end

0 commit comments

Comments
 (0)