Skip to content

Commit d0e010a

Browse files
committed
Allow dereferencing of Agent values while updating
ensure only one update is running
1 parent b6d7506 commit d0e010a

File tree

2 files changed

+96
-24
lines changed

2 files changed

+96
-24
lines changed

lib/concurrent/agent.rb

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,14 @@ class Agent
6060
# @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
6161
# returning the value returned from the proc
6262
def initialize(initial, opts = {})
63-
@value = initial
64-
@rescuers = []
65-
@validator = Proc.new { |result| true }
66-
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
67-
self.observers = CopyOnWriteObserverSet.new
68-
@executor = OptionsParser::get_executor_from(opts)
63+
@value = initial
64+
@rescuers = []
65+
@validator = Proc.new { |result| true }
66+
@timeout = opts.fetch(:timeout, TIMEOUT).freeze
67+
self.observers = CopyOnWriteObserverSet.new
68+
@executor = OptionsParser::get_executor_from(opts)
69+
@being_executed = false
70+
@stash = []
6971
init_mutex
7072
set_deref_options(opts)
7173
end
@@ -111,7 +113,11 @@ def rescue(clazz = StandardError, &block)
111113
# @yieldparam [Object] value the result of the last update operation
112114
# @yieldreturn [Boolean] true if the value is valid else false
113115
def validate(&block)
114-
@validator = block unless block.nil?
116+
unless block.nil?
117+
mutex.lock
118+
@validator = block
119+
mutex.unlock
120+
end
115121
self
116122
end
117123
alias_method :validates, :validate
@@ -124,8 +130,19 @@ def validate(&block)
124130
# the new value
125131
# @yieldparam [Object] value the current value
126132
# @yieldreturn [Object] the new value
133+
# @return [true, nil] nil when no block is given
127134
def post(&block)
128-
@executor.post{ work(&block) } unless block.nil?
135+
return nil if block.nil?
136+
mutex.lock
137+
post = if @being_executed
138+
@stash << block
139+
false
140+
else
141+
@being_executed = true
142+
end
143+
mutex.unlock
144+
@executor.post { work(&block) } if post
145+
true
129146
end
130147

131148
# Update the current value with the result of the given block operation
@@ -157,22 +174,38 @@ def try_rescue(ex) # :nodoc:
157174
# @!visibility private
158175
def work(&handler) # :nodoc:
159176
begin
160-
161-
should_notify = false
177+
should_notify = false
178+
validator, value = mutex.synchronize { [@validator, @value] }
179+
180+
begin
181+
# FIXME creates second thread
182+
result, valid = Concurrent::timeout(@timeout) do
183+
[result = handler.call(value),
184+
validator.call(result)]
185+
end
186+
rescue Exception => ex
187+
exception = ex
188+
end
162189

163190
mutex.synchronize do
164-
result = Concurrent::timeout(@timeout) do
165-
handler.call(@value)
166-
end
167-
if @validator.call(result)
168-
@value = result
191+
if !exception && valid
192+
@value = result
169193
should_notify = true
170194
end
195+
196+
if (stashed = @stash.shift)
197+
@executor.post { work(&stashed) }
198+
else
199+
@being_executed = false
200+
end
171201
end
172-
time = Time.now
173-
observers.notify_observers{ [time, self.value] } if should_notify
174-
rescue Exception => ex
175-
try_rescue(ex)
202+
203+
if should_notify
204+
time = Time.now
205+
observers.notify_observers { [time, self.value] }
206+
end
207+
208+
try_rescue(exception)
176209
end
177210
end
178211
end

spec/concurrent/agent_spec.rb

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,17 @@ def trigger_observable(observable)
147147
context '#post' do
148148

149149
it 'adds the given block to the queue' do
150-
executor.should_receive(:post).with(no_args).exactly(3).times
151-
subject.post { sleep(100) }
150+
executor.should_receive(:post).with(no_args).exactly(1).times
151+
subject.post { sleep(1) }
152152
subject.post { nil }
153153
subject.post { nil }
154154
sleep(0.1)
155+
subject.instance_variable_get(:@stash).size.should eq 2
155156
end
156157

157158
it 'does not add to the queue when no block is given' do
158-
executor.should_receive(:post).with(no_args).exactly(2).times
159-
subject.post { sleep(100) }
159+
executor.should_receive(:post).with(no_args).exactly(0).times
160160
subject.post
161-
subject.post { nil }
162161
sleep(0.1)
163162
end
164163
end
@@ -365,6 +364,46 @@ def trigger_observable(observable)
365364
end
366365
end
367366

367+
context 'clojure-like behaviour' do
368+
it 'does not block dereferencing when updating the value' do
369+
continue = IVar.new
370+
agent = Agent.new(0, executor: executor)
371+
agent.post { |old| old + continue.value }
372+
sleep 0.1
373+
Concurrent.timeout(0.2) { agent.value.should eq 0 }
374+
continue.set 1
375+
sleep 0.1
376+
end
377+
378+
it 'does not allow to execute two updates at the same time' do
379+
agent = Agent.new(0, executor: executor)
380+
continue1 = IVar.new
381+
continue2 = IVar.new
382+
f1 = f2 = false
383+
agent.post { |old| f1 = true; old + continue1.value }
384+
agent.post { |old| f2 = true; old + continue2.value }
385+
386+
sleep 0.1
387+
f1.should eq true
388+
f2.should eq false
389+
agent.value.should eq 0
390+
391+
continue1.set 1
392+
sleep 0.1
393+
f1.should eq true
394+
f2.should eq true
395+
agent.value.should eq 1
396+
397+
continue2.set 1
398+
sleep 0.1
399+
f1.should eq true
400+
f2.should eq true
401+
agent.value.should eq 2
402+
end
403+
404+
it 'waits with sending functions to other agents until update is done'
405+
end
406+
368407
context 'aliases' do
369408

370409
it 'aliases #deref for #value' do

0 commit comments

Comments
 (0)