Skip to content

Commit 63c498c

Browse files
verifying whether MW clusters are supported in deploying
1 parent d2999e9 commit 63c498c

File tree

2 files changed

+58
-29
lines changed

2 files changed

+58
-29
lines changed

src/deploy.jl

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,25 @@ function cluster_deploy(contract_handle, config_args...)
5858
try
5959
pids = cluster_deploy(cluster_type, cluster_handle, cluster_features, instance_type)
6060
catch e
61-
@warn "some error deploying cluster $cluster_handle ($e)"
61+
@info isa(e, RemoteException)
62+
@info e.captured.ex
63+
@info e.captured.ex.msg
64+
if isa(e, RemoteException) && isa(e.captured.ex, ErrorException) && e.captured.ex.msg == "Only process 1 can add and remove workers"
65+
@error "MW clusters are not supported."
66+
@info "To support MW clusters, the extended version of Distributed.jl must be installed in the cluster's entry node. See the README or documentation."
67+
@warn "the cluster will be terminated"
68+
cluster_terminate(cluster_handle)
69+
return :unsupported_mwcluster
70+
else
71+
@error "Some error deploying cluster $cluster_handle ($e)"
72+
@warn "the cluster will be terminated"
73+
cluster_terminate(cluster_handle)
74+
return nothing
75+
end
6276
end
6377

64-
if !isnothing(pids)
65-
cluster_deploy_info[cluster_handle][:pids] = pids
66-
return cluster_handle
67-
else
68-
@warn "error launching processes -- cluster will be terminated"
69-
cluster_terminate(cluster_handle)
70-
@error "deployment failed due to an unrecoverable error in launching processes"
71-
return nothing
72-
end
78+
cluster_deploy_info[cluster_handle][:pids] = pids
79+
return cluster_handle
7380

7481
end
7582

@@ -377,8 +384,11 @@ function cluster_interrupt(cluster_handle)
377384
node_provider = cluster_features[:node_provider]
378385
!can_interrupt(node_provider, cluster_handle) && error("cluster $cluster_handle cannot be interrupted")
379386
cluster_type = cluster_features[:cluster_type]
380-
kill_processes(cluster_handle, cluster_type, cluster_features)
381-
interrupt_cluster(node_provider, cluster_handle)
387+
try
388+
kill_processes(cluster_handle, cluster_type, cluster_features)
389+
finally
390+
interrupt_cluster(node_provider, cluster_handle)
391+
end
382392
#@info "the cluster $cluster_handle has been interrupted"
383393
catch e
384394
println(e)
@@ -424,10 +434,14 @@ function cluster_terminate(cluster_handle)
424434
check_cluster_handle(cluster_handle)
425435
cluster_features = cluster_deploy_info[cluster_handle][:features]
426436
node_provider = cluster_features[:node_provider]
427-
cluster_isrunning(node_provider, cluster_handle) && kill_processes(cluster_handle, cluster_features[:cluster_type], cluster_features)
428-
terminate_cluster(node_provider, cluster_handle)
429-
terminated_cluster[cluster_handle] = cluster_deploy_info[cluster_handle]
430-
delete!(cluster_deploy_info, cluster_handle)
437+
try
438+
cluster_isrunning(node_provider, cluster_handle) && kill_processes(cluster_handle, cluster_features[:cluster_type], cluster_features)
439+
sleep(1)
440+
finally
441+
terminate_cluster(node_provider, cluster_handle)
442+
terminated_cluster[cluster_handle] = cluster_deploy_info[cluster_handle]
443+
delete!(cluster_deploy_info, cluster_handle)
444+
end
431445
#@info "the cluster $cluster_handle has been terminated"
432446
catch e
433447
println(e)
@@ -440,7 +454,7 @@ function kill_processes(cluster_handle, _::Type{<:ManagerWorkers}, cluster_featu
440454
pids = cluster_deploy_info[cluster_handle][:pids]
441455
if !isempty(pids)
442456
wids = remotecall_fetch(workers, pids[1], role=:master)
443-
remotecall_fetch(rmprocs, pids[1], wids)
457+
!in(1, wids) && remotecall_fetch(rmprocs, pids[1], wids)
444458
#@fetchfrom pids[1] rmprocs(workers(role=:master))
445459
rmprocs(pids)
446460
end
@@ -465,11 +479,13 @@ function cluster_restart(cluster_handle::Symbol)
465479
cluster_provider = cluster_features[:node_provider]
466480
!cluster_isrunning(cluster_provider, cluster_handle) && error("cluster is not running")
467481
cluster_type = cluster_features[:cluster_type]
468-
kill_processes(cluster_handle, cluster_type, cluster_features)
469-
ips = get_ips(cluster_provider, cluster_handle)
470-
471-
pids = launch_processes(cluster_provider, cluster_type, cluster_handle, ips)
472-
cluster_deploy_info[cluster_handle][:pids] = pids
482+
try
483+
kill_processes(cluster_handle, cluster_type, cluster_features)
484+
finally
485+
ips = get_ips(cluster_provider, cluster_handle)
486+
pids = launch_processes(cluster_provider, cluster_type, cluster_handle, ips)
487+
cluster_deploy_info[cluster_handle][:pids] = pids
488+
end
473489
catch e
474490
println(e)
475491
return :fail

test/localcluster.jl

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,32 @@ cluster_nodes(cluster_instance_1) |> length == np
4141

4242
# deploy cluster and test node processes
4343
cluster_instance_2 = @deploy cluster_contract_2
44+
if cluster_instance_2 == :unsupported_mwcluster
45+
mwcluster_support = false
46+
cluster_instance_2 = @deploy cluster_contract_1
47+
else
48+
mwcluster_support = true
49+
end
50+
4451
@test !isnothing(cluster_instance_2)
4552
@test isa(cluster_instance_2, Symbol)
4653
cluster_nodes(cluster_instance_2) |> length == 1
47-
pid_manager = @nodes(cluster_instance_2) |> first
48-
nw = @fetchfrom pid_manager workers(role=:master)
49-
@test length(nw) == np
54+
55+
if mwcluster_support
56+
pid_manager = @nodes(cluster_instance_2) |> first
57+
nw = @fetchfrom pid_manager workers(role=:master)
58+
@test length(nw) == np
59+
end
5060

5161
# check cluster nodes after restarting
5262
@test @restart(cluster_instance_2) == :success
53-
cluster_nodes(cluster_instance_2) |> length == 1
54-
pid_manager = @nodes(cluster_instance_2) |> first
55-
nw = @fetchfrom pid_manager workers(role=:master)
56-
@test length(nw) == np
63+
64+
if mwcluster_support
65+
cluster_nodes(cluster_instance_2) |> length == 1
66+
pid_manager = @nodes(cluster_instance_2) |> first
67+
nw = @fetchfrom pid_manager workers(role=:master)
68+
@test length(nw) == np
69+
end
5770

5871
# interrrupt fails for local clusters
5972
@test @interrupt(cluster_instance_1, cluster_instance_2) |> r -> all(map(x -> x == :fail, r))

0 commit comments

Comments
 (0)