Skip to content

Commit 574221b

Browse files
Add and clean up monitor tests.
1 parent 7580fed commit 574221b

File tree

8 files changed

+106
-18
lines changed

8 files changed

+106
-18
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "async/container/supervisor/a_server"
7+
8+
module Async
9+
module Container
10+
module Supervisor
11+
AMonitor = Sus::Shared("a monitor") do
12+
include_context AServer
13+
14+
let(:monitors) {[monitor, registration_monitor]}
15+
16+
it "can add and remove connections" do
17+
worker = Worker.new(endpoint: endpoint)
18+
connection = worker.connect
19+
20+
event = registration_monitor.pop
21+
expect(event).to have_attributes(
22+
type: be == :register,
23+
)
24+
25+
connection.close
26+
27+
event = registration_monitor.pop
28+
expect(event).to have_attributes(
29+
type: be == :remove,
30+
)
31+
end
32+
end
33+
end
34+
end
35+
end

fixtures/async/container/supervisor/a_server.rb

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,31 @@ module Async
1313
module Container
1414
module Supervisor
1515
class RegistrationMonitor
16+
Event = Struct.new(:type, :connection)
17+
1618
def initialize
17-
@registrations = []
19+
@registrations = ::Thread::Queue.new
1820
end
1921

2022
attr :registrations
2123

2224
def run
2325
end
2426

27+
def status(call)
28+
call.push(registrations: @registrations.size)
29+
end
30+
31+
def pop(...)
32+
@registrations.pop(...)
33+
end
34+
2535
def register(connection)
26-
@registrations << connection
36+
@registrations << Event.new(:register, connection)
2737
end
2838

2939
def remove(connection)
30-
@registrations.delete(connection)
40+
@registrations << Event.new(:remove, connection)
3141
end
3242
end
3343

lib/async/container/supervisor/server.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ def initialize(monitors: [], endpoint: Supervisor.endpoint)
3939
# @parameter call [Connection::Call] The registration call.
4040
# @parameter call[:state] [Hash] The worker state to merge (e.g. process_id).
4141
def do_register(call)
42-
call.connection.state.merge!(call.message[:state])
42+
if state = call.message[:state]
43+
call.connection.state.merge!(state)
44+
end
4345

4446
connection_id = SecureRandom.uuid
4547
call.connection.state[:connection_id] = connection_id
@@ -52,7 +54,7 @@ def do_register(call)
5254
Console.error(self, "Error while registering process!", monitor: monitor, exception: error)
5355
end
5456
ensure
55-
call.finish
57+
call.finish(connection_id: connection_id)
5658
end
5759

5860
# Forward an operation to a worker connection.

lib/async/container/supervisor/worker.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ def self.run(...)
2525
#
2626
# @parameter state [Hash] The worker state to register with the supervisor.
2727
# @parameter endpoint [IO::Endpoint] The supervisor endpoint to connect to.
28-
def initialize(state, endpoint: Supervisor.endpoint)
28+
def initialize(state = nil, endpoint: Supervisor.endpoint)
29+
super(endpoint: endpoint)
2930
@state = state
30-
@endpoint = endpoint
3131
end
3232

3333
include Dispatchable
@@ -164,6 +164,7 @@ def do_garbage_profile_stop(call)
164164

165165
# Register the worker with the supervisor:
166166
connection.call(do: :register, state: @state)
167+
# We ignore the response (it contains the `connection_id`).
167168
end
168169
end
169170
end
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "async/container/supervisor/memory_monitor"
7+
require "async/container/supervisor/a_monitor"
8+
9+
require "sus/fixtures/console/captured_logger"
10+
11+
describe Async::Container::Supervisor::MemoryMonitor do
12+
include Sus::Fixtures::Console::CapturedLogger
13+
14+
let(:monitor) {subject.new(interval: 1, memory_sample: {duration: 1, timeout: 5})}
15+
it_behaves_like Async::Container::Supervisor::AMonitor
16+
17+
with "#run" do
18+
include Sus::Fixtures::Async::SchedulerContext
19+
20+
it "can run the monitor" do
21+
task = monitor.run
22+
expect(task).to be(:running?)
23+
end
24+
25+
it "can handle failures" do
26+
expect(monitor.cluster).to receive(:check!).and_raise(Errno::ESRCH)
27+
28+
task = monitor.run
29+
expect(task).to be(:running?)
30+
31+
sleep 1
32+
33+
expect(task).to be(:running?)
34+
end
35+
end
36+
end
37+

test/async/container/process_monitor.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
# Copyright, 2025, by Samuel Williams.
55

66
require "async/container/supervisor/process_monitor"
7-
require "async/container/supervisor/connection"
7+
require "async/container/supervisor/a_monitor"
8+
89
require "sus/fixtures/console/captured_logger"
910

1011
describe Async::Container::Supervisor::ProcessMonitor do
1112
include Sus::Fixtures::Console::CapturedLogger
13+
1214
let(:monitor) {subject.new(interval: 10)}
15+
it_behaves_like Async::Container::Supervisor::AMonitor
1316

14-
it "has a ppid" do
17+
it "has a parent process id" do
1518
expect(monitor.ppid).to be == Process.ppid
1619
end
1720

test/async/container/supervised.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,10 @@ def setup(container)
4242
worker = evaluator.make_supervised_worker(state)
4343
worker_task = worker.run
4444

45-
sleep(0.001) until registration_monitor.registrations.any?
45+
# Wait for the worker to register with the supervisor.
46+
event = registration_monitor.pop
47+
connection = event.connection
4648

47-
connection = registration_monitor.registrations.first
4849
expect(connection.state).to have_keys(
4950
process_id: be == ::Process.pid
5051
)

test/async/container/supervisor.rb

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
connection = worker.connect
1818

1919
# Wait for the client to connect to the server:
20-
sleep(0.001) until registration_monitor.registrations.any?
20+
event = registration_monitor.pop
21+
connection = event.connection
2122

22-
connection = registration_monitor.registrations.first
2323
expect(connection.state).to have_keys(
2424
process_id: be == ::Process.pid
2525
)
@@ -32,10 +32,10 @@
3232
worker = Async::Container::Supervisor::Worker.new(state, endpoint: endpoint)
3333
worker_task = worker.run
3434

35-
sleep(0.001) until registration_monitor.registrations.any?
35+
event = registration_monitor.pop
36+
connection = event.connection
3637

3738
path = File.join(@root, "memory.json")
38-
connection = registration_monitor.registrations.first
3939
connection.call(do: :memory_dump, path: path)
4040

4141
expect(File.size(path)).to be > 0
@@ -110,9 +110,8 @@ def reader_target.dispatch(call); end
110110
worker = Async::Container::Supervisor::Worker.new(state, endpoint: endpoint)
111111
worker_task = worker.run
112112

113-
sleep(0.001) until registration_monitor.registrations.any?
114-
115-
connection = registration_monitor.registrations.first
113+
event = registration_monitor.pop
114+
connection = event.connection
116115

117116
# Sample for a short duration (1 second for test speed)
118117
result = connection.call(do: :memory_sample, duration: 1)

0 commit comments

Comments
 (0)