Skip to content

Commit 8724c16

Browse files
committed
Port improvements from ParallelProcessingTools
1 parent b3e8cb2 commit 8724c16

File tree

1 file changed

+25
-5
lines changed

1 file changed

+25
-5
lines changed

src/elastic.jl

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,28 @@ struct ElasticManager <: Distributed.ClusterManager
1717
terminated::Set{Int} # terminated worker ids
1818
topology::Symbol
1919
sockname
20+
manage_callback
2021
printing_kwargs
2122

22-
function ElasticManager(;addr=Sockets.IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all, printing_kwargs=())
23+
function ElasticManager(;
24+
addr=IPv4("127.0.0.1"), port=9009, cookie=nothing,
25+
topology=:all_to_all, manage_callback=elastic_no_op_callback, printing_kwargs=()
26+
)
2327
Distributed.init_multi()
2428
cookie !== nothing && Distributed.cluster_cookie(cookie)
2529

2630
# Automatically check for the IP address of the local machine
2731
if addr == :auto
2832
try
29-
addr = Sockets.getipaddr(Distributed.IPv4)
33+
addr = Sockets.getipaddr(Sockets.IPv4)
3034
catch
3135
error("Failed to automatically get host's IP address. Please specify `addr=` explicitly.")
3236
end
3337
end
3438

3539
l_sock = Distributed.listen(addr, port)
3640

37-
lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), printing_kwargs)
41+
lman = new(Dict{Int, Distributed.WorkerConfig}(), Channel{Sockets.TCPSocket}(typemax(Int)), Set{Int}(), topology, Sockets.getsockname(l_sock), manage_callback, printing_kwargs)
3842

3943
t1 = @async begin
4044
while true
@@ -57,8 +61,10 @@ ElasticManager(port) = ElasticManager(;port=port)
5761
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
5862
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)
5963

64+
elastic_no_op_callback(::ElasticManager, ::Integer, ::Symbol) = nothing
6065

6166
function process_worker_conn(mgr::ElasticManager, s::Sockets.TCPSocket)
67+
@debug "ElasticManager got new worker connection"
6268
# Socket is the worker's STDOUT
6369
wc = Distributed.WorkerConfig()
6470
wc.io = s
@@ -94,6 +100,7 @@ end
94100
function Distributed.launch(mgr::ElasticManager, params::Dict, launched::Array, c::Condition)
95101
# The workers have already been started.
96102
while isready(mgr.pending)
103+
@debug "ElasticManager.launch new worker"
97104
wc=Distributed.WorkerConfig()
98105
wc.io = take!(mgr.pending)
99106
push!(launched, wc)
@@ -104,8 +111,12 @@ end
104111

105112
function Distributed.manage(mgr::ElasticManager, id::Integer, config::Distributed.WorkerConfig, op::Symbol)
106113
if op == :register
114+
@debug "ElasticManager registering process id $id"
107115
mgr.active[id] = config
116+
mgr.manage_callback(mgr, id, op)
108117
elseif op == :deregister
118+
@debug "ElasticManager deregistering process id $id"
119+
mgr.manage_callback(mgr, id, op)
109120
delete!(mgr.active, id)
110121
push!(mgr.terminated, id)
111122
end
@@ -138,9 +149,18 @@ function Base.show(io::IO, mgr::ElasticManager)
138149
end
139150

140151
# Does not return. If executing from a REPL try
141-
# @async connect_to_cluster(.....)
152+
# @async elastic_worker(.....)
142153
# addr, port that a ElasticManager on the master processes is listening on.
143-
function elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)
154+
function elastic_worker(
155+
cookie::AbstractString, addr::AbstractString="127.0.0.1", port::Integer = 9009;
156+
stdout_to_master::Bool = true,
157+
Base.@nospecialize(env::AbstractVector = [],)
158+
)
159+
@debug "ElasticManager.elastic_worker(cookie, $addr, $port; stdout_to_master=$stdout_to_master, env=$env)"
160+
for (k, v) in env
161+
ENV[k] = v
162+
end
163+
144164
c = connect(addr, port)
145165
write(c, rpad(cookie, HDR_COOKIE_LEN)[1:HDR_COOKIE_LEN])
146166
stdout_to_master && redirect_stdout(c)

0 commit comments

Comments
 (0)