Skip to content

Commit 026641c

Browse files
committed
Extract OneByOne behavior from Agent
it can be also later used for Actors
1 parent 39b650a commit 026641c

File tree

4 files changed

+104
-44
lines changed

4 files changed

+104
-44
lines changed

lib/concurrent/agent.rb

Lines changed: 28 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Agent
4040
# is given at initialization
4141
TIMEOUT = 5
4242

43-
attr_reader :timeout
43+
attr_reader :timeout, :executor
4444

4545
# Initialize a new Agent with the given initial value and provided options.
4646
#
@@ -60,14 +60,12 @@ class Agent
6060
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161
# returning the value returned from the proc
6262
def initialize(initial, opts = {})
63-
@value = initial
64-
@rescuers = []
65-
@validator = Proc.new { |result| true }
66-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
67-
self.observers = CopyOnWriteObserverSet.new
68-
@executor = OptionsParser::get_executor_from(opts)
69-
@being_executed = false
70-
@stash = []
63+
@value = initial
64+
@rescuers = []
65+
@validator = Proc.new { |result| true }
66+
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
67+
self.observers = CopyOnWriteObserverSet.new
68+
@executor = OneByOne.new OptionsParser::get_executor_from(opts)
7169
init_mutex
7270
set_deref_options(opts)
7371
end
@@ -133,15 +131,7 @@ def validate(&block)
133131
# @return [true, nil] nil when no block is given
134132
def post(&block)
135133
return nil if block.nil?
136-
mutex.lock
137-
post = if @being_executed
138-
@stash << block
139-
false
140-
else
141-
@being_executed = true
142-
end
143-
mutex.unlock
144-
@executor.post { work(&block) } if post
134+
@executor.post { work(&block) }
145135
true
146136
end
147137

@@ -184,36 +174,31 @@ def try_rescue(ex) # :nodoc:
184174

185175
# @!visibility private
186176
def work(&handler) # :nodoc:
177+
validator, value = mutex.synchronize { [@validator, @value] }
178+
187179
begin
188-
validator, value = mutex.synchronize { [@validator, @value] }
189-
190-
begin
191-
# FIXME creates second thread
192-
result, valid = Concurrent::timeout(@timeout) do
193-
[result = handler.call(value),
194-
validator.call(result)]
195-
end
196-
rescue Exception => ex
197-
exception = ex
180+
# FIXME creates second thread
181+
result, valid = Concurrent::timeout(@timeout) do
182+
[result = handler.call(value),
183+
validator.call(result)]
198184
end
185+
rescue Exception => ex
186+
exception = ex
187+
end
199188

200-
mutex.lock
201-
should_notify = if !exception && valid
202-
@value = result
203-
true
204-
end
205-
stashed = @stash.shift || (@being_executed = false)
206-
mutex.unlock
207-
208-
@executor.post { work(&stashed) } if stashed
209-
210-
if should_notify
211-
time = Time.now
212-
observers.notify_observers { [time, self.value] }
213-
end
189+
mutex.lock
190+
should_notify = if !exception && valid
191+
@value = result
192+
true
193+
end
194+
mutex.unlock
214195

215-
try_rescue(exception)
196+
if should_notify
197+
time = Time.now
198+
observers.notify_observers { [time, self.value] }
216199
end
200+
201+
try_rescue(exception)
217202
end
218203
end
219204
end

lib/concurrent/executor/one_by_one.rb

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
module Concurrent
2+
3+
# Ensures that jobs are passed to the underlying executor one by one,
4+
# never running at the same time.
5+
class OneByOne
6+
7+
attr_reader :executor
8+
9+
Job = Struct.new(:args, :block) do
10+
def call
11+
block.call *args
12+
end
13+
end
14+
15+
# @param [Executor] executor
16+
def initialize(executor)
17+
@executor = executor
18+
@being_executed = false
19+
@stash = []
20+
@mutex = Mutex.new
21+
end
22+
23+
# Submit a task to the executor for asynchronous processing.
24+
#
25+
# @param [Array] args zero or more arguments to be passed to the task
26+
#
27+
# @yield the asynchronous task to perform
28+
#
29+
# @return [Boolean] `true` if the task is queued, `false` if the executor
30+
# is not running
31+
#
32+
# @raise [ArgumentError] if no task is given
33+
def post(*args, &task)
34+
return nil if task.nil?
35+
job = Job.new args, task
36+
@mutex.lock
37+
post = if @being_executed
38+
@stash << job
39+
false
40+
else
41+
@being_executed = true
42+
end
43+
@mutex.unlock
44+
@executor.post { work(job) } if post
45+
true
46+
end
47+
48+
# Submit a task to the executor for asynchronous processing.
49+
#
50+
# @param [Proc] task the asynchronous task to perform
51+
#
52+
# @return [self] returns itself
53+
def <<(task)
54+
post(&task)
55+
self
56+
end
57+
58+
private
59+
60+
# ensures next job is executed if any is stashed
61+
def work(job)
62+
job.call
63+
ensure
64+
@mutex.lock
65+
job = @stash.shift || (@being_executed = false)
66+
@mutex.unlock
67+
@executor.post { work(job) } if job
68+
end
69+
70+
end
71+
end

lib/concurrent/executors.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
require 'concurrent/executor/single_thread_executor'
77
require 'concurrent/executor/thread_pool_executor'
88
require 'concurrent/executor/timer_set'
9+
require 'concurrent/executor/one_by_one'

spec/concurrent/agent_spec.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,10 @@ def trigger_observable(observable)
152152
subject.post { nil }
153153
subject.post { nil }
154154
sleep(0.1)
155-
subject.instance_variable_get(:@stash).size.should eq 2
155+
subject.
156+
executor.
157+
instance_variable_get(:@stash).
158+
size.should eq 2
156159
end
157160

158161
it 'does not add to the queue when no block is given' do

0 commit comments

Comments
 (0)