Skip to content

Commit 7c643cc

Browse files
committed
Merge pull request #66 from pitr-ch/actor
Actor improvements
2 parents b6d7506 + 199314f commit 7c643cc

File tree

5 files changed

+255
-91
lines changed

5 files changed

+255
-91
lines changed

lib/concurrent/actor/simple_actor_ref.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class SimpleActorRef
1111
def initialize(actor, opts = {})
1212
@actor = actor
1313
@mutex = Mutex.new
14-
@executor = SingleThreadExecutor.new
14+
@executor = OneByOne.new OptionsParser::get_executor_from(opts)
1515
@stop_event = Event.new
1616
@reset_on_error = opts.fetch(:reset_on_error, true)
1717
@exception_class = opts.fetch(:rescue_exception, false) ? Exception : StandardError
@@ -51,7 +51,6 @@ def post!(timeout, *msg)
5151
def shutdown
5252
@mutex.synchronize do
5353
return if shutdown?
54-
@executor.shutdown
5554
@actor.on_shutdown
5655
@stop_event.set
5756
end

lib/concurrent/agent.rb

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class Agent
4040
# is given at initialization
4141
TIMEOUT = 5
4242

43-
attr_reader :timeout
43+
attr_reader :timeout, :executor
4444

4545
# Initialize a new Agent with the given initial value and provided options.
4646
#
@@ -60,12 +60,12 @@ class Agent
6060
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161
# returning the value returned from the proc
6262
def initialize(initial, opts = {})
63-
@value = initial
64-
@rescuers = []
65-
@validator = Proc.new { |result| true }
66-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
63+
@value = initial
64+
@rescuers = []
65+
@validator = Proc.new { |result| true }
66+
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
6767
self.observers = CopyOnWriteObserverSet.new
68-
@executor = OptionsParser::get_executor_from(opts)
68+
@executor = OneByOne.new OptionsParser::get_executor_from(opts)
6969
init_mutex
7070
set_deref_options(opts)
7171
end
@@ -111,7 +111,11 @@ def rescue(clazz = StandardError, &block)
111111
# @yieldparam [Object] value the result of the last update operation
112112
# @yieldreturn [Boolean] true if the value is valid else false
113113
def validate(&block)
114-
@validator = block unless block.nil?
114+
unless block.nil?
115+
mutex.lock
116+
@validator = block
117+
mutex.unlock
118+
end
115119
self
116120
end
117121
alias_method :validates, :validate
@@ -124,8 +128,11 @@ def validate(&block)
124128
# the new value
125129
# @yieldparam [Object] value the current value
126130
# @yieldreturn [Object] the new value
131+
# @return [true, nil] nil when no block is given
127132
def post(&block)
128-
@executor.post{ work(&block) } unless block.nil?
133+
return nil if block.nil?
134+
@executor.post { work(&block) }
135+
true
129136
end
130137

131138
# Update the current value with the result of the given block operation
@@ -139,6 +146,16 @@ def <<(block)
139146
self
140147
end
141148

149+
# Waits/blocks until all the updates sent before this call are done.
150+
#
151+
# @param [Numeric] timeout the maximum time in second to wait.
152+
# @return [Boolean] false on timeout, true otherwise
153+
def await(timeout = nil)
154+
done = Event.new
155+
post { done.set }
156+
done.wait timeout
157+
end
158+
142159
private
143160

144161
# @!visibility private
@@ -147,33 +164,41 @@ def <<(block)
147164
# @!visibility private
148165
def try_rescue(ex) # :nodoc:
149166
rescuer = mutex.synchronize do
150-
@rescuers.find{|r| ex.is_a?(r.clazz) }
167+
@rescuers.find { |r| ex.is_a?(r.clazz) }
151168
end
152169
rescuer.block.call(ex) if rescuer
153170
rescue Exception => ex
171+
# puts "#{ex} (#{ex.class})\n#{ex.backtrace.join("\n")}"
154172
# supress
155173
end
156174

157175
# @!visibility private
158176
def work(&handler) # :nodoc:
177+
validator, value = mutex.synchronize { [@validator, @value] }
178+
159179
begin
180+
# FIXME creates second thread
181+
result, valid = Concurrent::timeout(@timeout) do
182+
[result = handler.call(value),
183+
validator.call(result)]
184+
end
185+
rescue Exception => ex
186+
exception = ex
187+
end
160188

161-
should_notify = false
189+
mutex.lock
190+
should_notify = if !exception && valid
191+
@value = result
192+
true
193+
end
194+
mutex.unlock
162195

163-
mutex.synchronize do
164-
result = Concurrent::timeout(@timeout) do
165-
handler.call(@value)
166-
end
167-
if @validator.call(result)
168-
@value = result
169-
should_notify = true
170-
end
171-
end
196+
if should_notify
172197
time = Time.now
173-
observers.notify_observers{ [time, self.value] } if should_notify
174-
rescue Exception => ex
175-
try_rescue(ex)
198+
observers.notify_observers { [time, self.value] }
176199
end
200+
201+
try_rescue(exception)
177202
end
178203
end
179204
end

lib/concurrent/executor/one_by_one.rb

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
module Concurrent
2+
3+
# Ensures that jobs are passed to the underlying executor one by one,
4+
# never running at the same time.
5+
class OneByOne
6+
7+
attr_reader :executor
8+
9+
Job = Struct.new(:args, :block) do
10+
def call
11+
block.call *args
12+
end
13+
end
14+
15+
# @param [Executor] executor
16+
def initialize(executor)
17+
@executor = executor
18+
@being_executed = false
19+
@stash = []
20+
@mutex = Mutex.new
21+
end
22+
23+
# Submit a task to the executor for asynchronous processing.
24+
#
25+
# @param [Array] args zero or more arguments to be passed to the task
26+
#
27+
# @yield the asynchronous task to perform
28+
#
29+
# @return [Boolean] `true` if the task is queued, `false` if the executor
30+
# is not running
31+
#
32+
# @raise [ArgumentError] if no task is given
33+
def post(*args, &task)
34+
return nil if task.nil?
35+
job = Job.new args, task
36+
@mutex.lock
37+
post = if @being_executed
38+
@stash << job
39+
false
40+
else
41+
@being_executed = true
42+
end
43+
@mutex.unlock
44+
@executor.post { work(job) } if post
45+
true
46+
end
47+
48+
# Submit a task to the executor for asynchronous processing.
49+
#
50+
# @param [Proc] task the asynchronous task to perform
51+
#
52+
# @return [self] returns itself
53+
def <<(task)
54+
post(&task)
55+
self
56+
end
57+
58+
private
59+
60+
# ensures next job is executed if any is stashed
61+
def work(job)
62+
job.call
63+
ensure
64+
@mutex.lock
65+
job = @stash.shift || (@being_executed = false)
66+
@mutex.unlock
67+
@executor.post { work(job) } if job
68+
end
69+
70+
end
71+
end

lib/concurrent/executors.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
require 'concurrent/executor/single_thread_executor'
77
require 'concurrent/executor/thread_pool_executor'
88
require 'concurrent/executor/timer_set'
9+
require 'concurrent/executor/one_by_one'

0 commit comments

Comments
 (0)