Skip to content

Commit 7a8d26a

Browse files
committed
Avoid torn writes on notify pipe.
1 parent 997d280 commit 7a8d26a

File tree

6 files changed

+30
-12
lines changed

6 files changed

+30
-12
lines changed

lib/async/container/channel.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ module Container
1010
# Provides a basic multi-thread/multi-process uni-directional communication channel.
1111
class Channel
1212
# Initialize the channel using a pipe.
13-
def initialize
13+
def initialize(timeout: 1.0)
1414
@in, @out = ::IO.pipe
15+
@in.timeout = timeout
1516
end
1617

1718
# The input end of the pipe.
@@ -43,12 +44,11 @@ def close
4344
# @returns [Hash]
4445
def receive
4546
if data = @in.gets
46-
begin
47-
return JSON.parse(data, symbolize_names: true)
48-
rescue
49-
return {line: data}
50-
end
47+
return JSON.parse(data, symbolize_names: true)
5148
end
49+
rescue => error
50+
Console.error(self, "Error during channel receive!", error)
51+
return nil
5252
end
5353
end
5454
end

lib/async/container/forked.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,8 @@ def self.spawn(*arguments, name: nil, **options)
136136

137137
# Initialize the process.
138138
# @parameter name [String] The name to use for the child process.
139-
def initialize(name: nil)
140-
super()
139+
def initialize(name: nil, **options)
140+
super(**options)
141141

142142
@name = name
143143
@status = nil

lib/async/container/notify/pipe.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ def before_spawn(arguments, options)
6363
# Formats the message using JSON and sends it to the parent controller.
6464
# This is suitable for use with {Channel}.
6565
def send(**message)
66-
data = ::JSON.dump(message)
66+
data = ::JSON.dump(message) << "\n"
6767

68-
@io.puts(data)
68+
@io.write(data)
6969
@io.flush
7070
end
7171

lib/async/container/threaded.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ def self.fork(**options)
124124
# Initialize the thread.
125125
#
126126
# @parameter name [String] The name to use for the child thread.
127-
def initialize(name: nil)
128-
super()
127+
def initialize(name: nil, **options)
128+
super(**options)
129129

130130
@status = nil
131131

test/async/container/channel.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,12 @@
2929

3030
expect(channel.receive).to be == {line: "Hello, World!\n"}
3131
end
32+
33+
with "timeout" do
34+
let(:channel) {subject.new(timeout: 0.001)}
35+
36+
it "fails gracefully on timeout" do
37+
expect(channel.receive).to be_nil
38+
end
39+
end
3240
end

test/async/container/notify/pipe.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,14 @@
3030

3131
expect(container.statistics).to have_attributes(failures: be == 0)
3232
end
33+
34+
it "writes data with one write call" do
35+
output = StringIO.new
36+
expect(output).to receive(:write).with("{\"ready\":true,\"status\":\"All systems go!\"}\n")
37+
38+
client = Async::Container::Notify::Pipe.new(output)
39+
client.send(ready: true, status: "All systems go!")
40+
41+
expect(output.string).to be == "{\"ready\":true,\"status\":\"All systems go!\"}\n"
42+
end
3343
end

0 commit comments

Comments
 (0)