Skip to content

Commit db7aa11

Browse files
committed
Adds a helper for declaring heartbeat subscriptions
At TaskRabbit, we use the heartbeat to schedule regular work. This has resulted in a fairly fragmented implementation of the modulo and matching logic. By wrapping this in a helper function we can more easily search and grep for these subscriptions.
1 parent 6454df3 commit db7aa11

File tree

4 files changed

+178
-0
lines changed

4 files changed

+178
-0
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
66

77
## [Unreleased]
88

9+
### Added
10+
11+
- Adds `Dispatch#on_heartbeatch` which is a helper function for specifying heartbeat subscriptions.
12+
913
## [0.12.0]
1014

1115
### Changed

README.mdown

+10
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ QueueBus.dispatch("app_b") do
6363
subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do
6464
Mixpanel.homepage_action!(attributes["action"])
6565
end
66+
67+
# You may also declare a subscription to heartbeat events. This is a helper function
68+
# that works along with subscribe to make scheduling regular events easier.
69+
#
70+
# minute_interval: Executes every n minutes
71+
# hour_interval: Executes every n hours
72+
# minute: Executes on this minute
73+
# hour: Executes on this hour
74+
on_heartbeat "my_heartbeat_event", minute_interval: 5 do |attributes|
75+
end
6676
end
6777
```
6878

lib/queue_bus/dispatch.rb

+33
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,39 @@ def size
1616
@subscriptions.size
1717
end
1818

19+
def on_heartbeat(key, minute: nil, hour: nil, minute_interval: nil, hour_interval: nil, &block) # rubocop:disable Metrics/PerceivedComplexity, Metrics/MethodLength, Metrics/ParameterLists, Metrics/CyclomaticComplexity, Metrics/AbcSize
20+
if minute_interval && !minute_interval.positive?
21+
raise ArgumentError, 'minute_interval must be a positive integer'
22+
end
23+
24+
if hour_interval && !hour_interval.positive?
25+
raise ArgumentError, 'hour_interval must be a positive integer'
26+
end
27+
28+
matcher = { bus_event_type: :heartbeat_minutes }
29+
30+
if minute
31+
raise ArgumentError, 'minute must be a positive integer' unless minute.positive?
32+
33+
matcher['minute'] = minute
34+
end
35+
36+
if hour
37+
raise ArgumentError, 'hour must be a positive integer' unless hour.positive?
38+
39+
matcher['hour'] = hour
40+
end
41+
42+
subscribe(key, matcher) do |event|
43+
if (minute_interval.nil? || (event['minute'] % minute_interval).zero?) &&
44+
(hour_interval.nil? || (event['hour'] % hour_interval).zero?)
45+
46+
# Yield the block passed in.
47+
block.call
48+
end
49+
end
50+
end
51+
1952
def subscribe(key, matcher_hash = nil, &block)
2053
dispatch_event('default', key, matcher_hash, block)
2154
end

spec/dispatch_spec.rb

+131
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,137 @@ module QueueBus
2828
end.not_to raise_error
2929
end
3030

31+
describe '#on_heartbeat' do
32+
let(:dispatch) { Dispatch.new('heartbeat') }
33+
let(:event) { { bus_event_type: :heartbeat_minutes } }
34+
let(:event_name) { 'my-event' }
35+
36+
context 'when not declaring anything' do
37+
before do
38+
dispatch.on_heartbeat event_name do |_event|
39+
Runner2.run({})
40+
end
41+
end
42+
43+
it 'runs on every heart beat' do
44+
(0..24).each do |hour|
45+
(0..60).each do |minute|
46+
expect do
47+
dispatch.execute(
48+
event_name, event.merge('hour' => hour, 'minute' => minute)
49+
)
50+
end.to change(Runner2, :value).by(1)
51+
end
52+
end
53+
end
54+
end
55+
56+
context 'when running on hour 8' do
57+
before do
58+
dispatch.on_heartbeat event_name, hour: 8 do |_event|
59+
Runner2.run({})
60+
end
61+
end
62+
63+
it 'subscribes to hour 8' do
64+
expect(dispatch.subscriptions.all.first.matcher.filters).to eq('bus_event_type' => 'heartbeat_minutes', 'hour' => '8')
65+
end
66+
end
67+
68+
context 'when running on minute 4' do
69+
before do
70+
dispatch.on_heartbeat event_name, minute: 4 do |_event|
71+
Runner2.run({})
72+
end
73+
end
74+
75+
it 'subscribes to minute 4' do
76+
expect(dispatch.subscriptions.all.first.matcher.filters).to eq('bus_event_type' => 'heartbeat_minutes', 'minute' => '4')
77+
end
78+
end
79+
80+
context 'when running on minute 4 and hour 8' do
81+
before do
82+
dispatch.on_heartbeat event_name, hour: 8, minute: 4 do |_event|
83+
Runner2.run({})
84+
end
85+
end
86+
87+
it 'subscribes to minute 4 and hour 8' do
88+
expect(dispatch.subscriptions.all.first.matcher.filters)
89+
.to eq('bus_event_type' => 'heartbeat_minutes', 'minute' => '4', 'hour' => '8')
90+
end
91+
end
92+
93+
context 'when declaring minute intervals' do
94+
before do
95+
dispatch.on_heartbeat event_name, minute_interval: 5 do |_event|
96+
Runner2.run({})
97+
end
98+
end
99+
100+
it 'runs the runner when the minute buzzes (modulos to 5)' do
101+
(0..60).each do |minute|
102+
if minute % 5 == 0
103+
expect { dispatch.execute(event_name, event.merge('minute' => minute)) }
104+
.to change(Runner2, :value).by(1)
105+
else
106+
expect { dispatch.execute(event_name, event.merge('minute' => minute)) }
107+
.not_to change(Runner2, :value)
108+
end
109+
end
110+
end
111+
end
112+
113+
context 'when declaring hour intervals' do
114+
before do
115+
dispatch.on_heartbeat event_name, hour_interval: 3 do |_event|
116+
Runner2.run({})
117+
end
118+
end
119+
120+
it 'runs the runner when the hour fizzes (modulos to 3)' do
121+
(0..60).each do |hour|
122+
if hour % 3 == 0
123+
expect { dispatch.execute(event_name, event.merge('hour' => hour)) }
124+
.to change(Runner2, :value).by(1)
125+
else
126+
expect { dispatch.execute(event_name, event.merge('hour' => hour)) }
127+
.not_to change(Runner2, :value)
128+
end
129+
end
130+
end
131+
end
132+
133+
context 'when declaring hour and minute intervals' do
134+
before do
135+
dispatch.on_heartbeat event_name, minute_interval: 5, hour_interval: 3 do |_event|
136+
Runner2.run({})
137+
end
138+
end
139+
140+
it 'runs the runner when the time fizzbuzzes (modulos to 3 and 5)' do
141+
(0..24).each do |hour|
142+
(0..60).each do |minute|
143+
if hour % 3 == 0 && minute % 5 == 0
144+
expect do
145+
dispatch.execute(
146+
event_name, event.merge('hour' => hour, 'minute' => minute)
147+
)
148+
end.to change(Runner2, :value).by(1)
149+
else
150+
expect do
151+
dispatch.execute(
152+
event_name, event.merge('hour' => hour, 'minute' => minute)
153+
)
154+
end.not_to change(Runner2, :value)
155+
end
156+
end
157+
end
158+
end
159+
end
160+
end
161+
31162
describe 'Top Level' do
32163
before(:each) do
33164
QueueBus.dispatch('testit') do

0 commit comments

Comments
 (0)