Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b7030e3
add middleware stack for sake of allowing forking
jasonayre May 13, 2016
2e46a33
fetch middleware
jasonayre May 13, 2016
03ddfc7
instantiate own middleware stack
jasonayre May 13, 2016
946b350
routes can have their own middleware which can be enhanced with addit…
jasonayre May 13, 2016
69b18f2
allow for block when defining route so use can be used to add middlew…
jasonayre May 13, 2016
0631d28
middleware gets passed with env
jasonayre May 13, 2016
96c9443
call middleware pulled from route not ActionSubscriber.config
jasonayre May 13, 2016
c085e4c
fetch middleware for route
jasonayre May 14, 2016
80ce5a8
just pass middleware klass for now to use as splatting not passing ar…
jasonayre May 14, 2016
c09babb
delegate middleware methods
jasonayre May 14, 2016
4018f25
routes can define a stack of middleware and use that, like a pheonix …
jasonayre May 15, 2016
29fd602
fetch middleware or fork from config as default
jasonayre May 15, 2016
4d1e03c
add spec for middleware stack
jasonayre May 15, 2016
a71fb51
set stacks like routes for consistency
jasonayre May 15, 2016
ff56991
fix forked stack to preserve args, just deep dup array and set ivar d…
jasonayre May 15, 2016
00d5653
add router stack spec
jasonayre May 15, 2016
f2a9032
add example of adding/applying middleware stack in routes
jasonayre May 15, 2016
9a05360
call route specific middleware for march hare subscriber
jasonayre May 15, 2016
5512438
add hash accessors to env to as it mimics rack/makes middleware more …
jasonayre May 15, 2016
cfe42ab
add action_subscriber_routes load hook so plugin authors can define o…
jasonayre May 16, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/action_subscriber/bunny/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def auto_pop!
:message_id => nil,
:routing_key => delivery_info.routing_key,
:queue => queue.name,
:middleware => route.middleware
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
enqueue_env(route.threadpool, env)
Expand All @@ -57,6 +58,7 @@ def auto_subscribe!
:message_id => properties.message_id,
:routing_key => delivery_info.routing_key,
:queue => queue.name,
:middleware => route.middleware
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
enqueue_env(route.threadpool, env)
Expand All @@ -72,7 +74,7 @@ def enqueue_env(threadpool, env)
logger.info "RECEIVED #{env.message_id} from #{env.queue}"
threadpool.async(env) do |env|
::ActiveSupport::Notifications.instrument "process_event.action_subscriber", :subscriber => env.subscriber.to_s, :routing_key => env.routing_key, :queue => env.queue do
::ActionSubscriber.config.middleware.call(env)
env.middleware.call(env)
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/action_subscriber/march_hare/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def auto_pop!
:message_id => metadata.message_id,
:routing_key => metadata.routing_key,
:queue => queue.name,
:middleware => route.middleware
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
enqueue_env(route.threadpool, env)
Expand All @@ -54,6 +55,7 @@ def auto_subscribe!
:message_id => metadata.message_id,
:routing_key => metadata.routing_key,
:queue => queue.name,
:middleware => route.middleware
}
env = ::ActionSubscriber::Middleware::Env.new(route.subscriber, encoded_payload, properties)
enqueue_env(route.threadpool, env)
Expand All @@ -73,7 +75,7 @@ def enqueue_env(threadpool, env)
logger.info "RECEIVED #{env.message_id} from #{env.queue}"
threadpool.async(env) do |env|
::ActiveSupport::Notifications.instrument "process_event.action_subscriber", :subscriber => env.subscriber.to_s, :routing_key => env.routing_key, :queue => env.queue do
::ActionSubscriber.config.middleware.call(env)
env.middleware.call(env)
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion lib/action_subscriber/middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
require "action_subscriber/middleware/error_handler"
require "action_subscriber/middleware/router"
require "action_subscriber/middleware/runner"
require "action_subscriber/middleware/stack"

module ActionSubscriber
module Middleware
def self.initialize_stack
builder = ::Middleware::Builder.new(:runner_class => ::ActionSubscriber::Middleware::Runner)
builder = ::ActionSubscriber::Middleware::Stack.new(:runner_class => ::ActionSubscriber::Middleware::Runner)

builder.use ErrorHandler
builder.use Decoder
Expand Down
13 changes: 12 additions & 1 deletion lib/action_subscriber/middleware/env.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Env
:message_id,
:routing_key,
:queue,
:subscriber
:subscriber,
:middleware

##
# @param subscriber [Class] the class that will handle this message
Expand All @@ -38,6 +39,16 @@ def initialize(subscriber, encoded_payload, properties)
@queue = properties.fetch(:queue)
@routing_key = properties.fetch(:routing_key)
@subscriber = subscriber
@middleware = properties.fetch(:middleware) { ::ActionSubscriber.config.middleware.forked }
end

#allow env to be get/set from outside, like rack middleware allows
def [](key)
instance_variable_get(:"@#{key}")
end

def []=(key, value)
instance_variable_set(:"@#{key}", value)
end

def acknowledge
Expand Down
13 changes: 13 additions & 0 deletions lib/action_subscriber/middleware/stack.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require 'middleware/builder'

module ActionSubscriber
module Middleware
class Stack < ::Middleware::Builder
def forked
forked_stack = self.class.new(:runner_class => @runner_class)
forked_stack.instance_variable_set(:@stack, @stack.dup)
forked_stack
end
end
end
end
11 changes: 9 additions & 2 deletions lib/action_subscriber/route.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ module ActionSubscriber
class Route
attr_reader :acknowledgements,
:action,
:durable,
:durable,
:exchange,
:prefetch,
:queue,
:routing_key,
:subscriber,
:threadpool
:threadpool,
:middleware

def initialize(attributes)
@acknowledgements = attributes.fetch(:acknowledgements)
Expand All @@ -20,6 +21,7 @@ def initialize(attributes)
@routing_key = attributes.fetch(:routing_key)
@subscriber = attributes.fetch(:subscriber)
@threadpool = attributes.fetch(:threadpool) { ::ActionSubscriber::Threadpool.pool(:default) }
@middleware = attributes.fetch(:middleware) { ::ActionSubscriber.config.middleware.forked }
end

def acknowledgements?
Expand All @@ -29,5 +31,10 @@ def acknowledgements?
def queue_subscription_options
{ :manual_ack => acknowledgements? }
end

delegate :use, :to => :middleware
delegate :insert, :to => :middleware
delegate :insert_after, :to => :middleware
delegate :insert_before, :to => :middleware
end
end
16 changes: 14 additions & 2 deletions lib/action_subscriber/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module ActionSubscriber
class Router
def self.draw_routes(&block)
router = self.new
::ActiveSupport.run_load_hooks(:action_subscriber_routes, router)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't fork the middleware stack and instead use a tiered middleware where we first spin through the global middleware and then run through a route's middleware stack at the end of the global middleware, we wouldn't need to run any load hooks here. We also wouldn't need to make our own stack forking mechanism (because we wouldn't need it).

I understand that would mean you couldn't change a middleware in the global stack for a specific route, but in my mind, that's exactly how I'd expect a middleware stack to work. It's a singleton-esque pattern. The main reason I point this out is because I recently went through load hook hell while work on ActionSubscriber to get a few bugs fixed, and seeing a second load hook make me nervous.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this was added more for the ability of extensibility, i.e. in my gem which uses a middleware stack which adds a certain behavior, I can go

::ActionSubscriber.on_load(: action_subscriber_routes) do
  stack :whatever do
    use A
    use B
  end
end

etc. But the implementation could be changed if the router is removed from the stack and stacks were composable, i.e.

  class MyPluginsMiddlewareStack < ActionSubscriber::MiddlewareStack
    use A
    use B
  end

  ActionSubscriber.config.middleware_stacks << MyPluginsMiddlewareStack

For example

router.instance_eval(&block)
router.routes
end
Expand Down Expand Up @@ -39,11 +40,22 @@ def resource_name(route_settings)
route_settings[:subscriber].name.underscore.gsub(/_subscriber/, "").to_s
end

def route(subscriber, action, options = {})
def stack(name, &block)
stacks[name] ||= ::ActionSubscriber.config.middleware.forked.instance_eval(&block)
end

def stacks
@stacks ||= {}
end

def route(subscriber, action, options = {}, &block)
route_settings = DEFAULT_SETTINGS.merge(options).merge(:subscriber => subscriber, :action => action)
route_settings[:routing_key] ||= default_routing_key_for(route_settings)
route_settings[:queue] ||= default_queue_for(route_settings)
routes << Route.new(route_settings)
route_settings[:middleware] = stacks[options[:stack]] if options.key?(:stack)
_route = Route.new(route_settings)
_route.instance_eval(&block) if block_given?
routes << _route
end

def routes
Expand Down
19 changes: 18 additions & 1 deletion routing.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,26 @@ The `route` method supports the following options:
* `exchange` specify which exchange you expect messages to be published to (default `"events"`)
* This is the equivalent of calling `exchange :actions` in your subscriber
* `publisher` this will prefix your queue and routing key with the publishers name
* This is the equivalent of puting `publisher :foo` in your subscriber
* This is the equivalent of putting `publisher :foo` in your subscriber
* `queue` specifies which queue you will subscribe to rather than letting ActionSubscriber infer it from the name of the subscriber and action
* `routing_key` specifies the routing key that will be bound to your queue
* `stack` Lets you use a custom middleware stack you already defined using the stack method

## Middleware Stacks

This give you the ability to build and apply a middleware stack on a per route basis.

``` ruby
::ActionSubscriber.draw_routes do
stack :resourceful do
use ParseResourcePayload
use LoadResource
end

default_routes_for ::UserSubscriber
route ::NotificationSubscriber, :created, :publisher => :newman, :exchange => :events, :stack => :resourceful
end
```

<h3 id="footnotes">Footnotes</h3>

Expand Down
13 changes: 13 additions & 0 deletions spec/lib/action_subscriber/middleware/env_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@
specify { expect(subject.routing_key).to eq(properties[:routing_key]) }
specify { expect(subject.queue).to eq(properties[:queue]) }

describe "#[]" do
it "gets instance variable" do
expect(subject[:action]).to eq :created
end
end

describe "#[]=" do
it "sets instance variable" do
subject[:whatever] = :something
expect(subject[:whatever]).to eq :something
end
end

describe "#acknowledge" do
it "sends an acknowledgement to rabbitmq" do
expect(channel).to receive(:ack).with(properties[:delivery_tag], false)
Expand Down
15 changes: 15 additions & 0 deletions spec/lib/action_subscriber/middleware/stack_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
describe ActionSubscriber::Middleware::Stack do
subject { ::ActionSubscriber::Middleware.initialize_stack }

context "#forked" do
let(:forked_stack) { subject.forked }

it "duplicates the stack without modifying original" do
class A; end;
forked_stack.use(A)
expect(forked_stack.instance_variable_get(:@stack).object_id).to_not eq subject.instance_variable_get(:@stack).object_id
expect(forked_stack.instance_variable_get(:@stack).map(&:first)).to include(A)
expect(subject.instance_variable_get(:@stack).map(&:first)).to_not include(A)
end
end
end
24 changes: 24 additions & 0 deletions spec/lib/action_subscriber/router_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,28 @@ def dim; end
expect(routes.last.subscriber).to eq(SparkleSubscriber)
expect(routes.last.queue).to eq("alice.tommy.sparkle.dim")
end

it "can define a stack of middleware and use on a per route basis" do
class FakeMiddleware
def initialize(app)
@app = app
end

def call(env)
env
end
end

routes = described_class.draw_routes do
stack :fake_stack do
use FakeMiddleware
end

route FakeSubscriber, :foo, :stack => :fake_stack
route FakeSubscriber, :bar
end

expect(routes.first.middleware.instance_variable_get(:@stack).last.first).to eq(FakeMiddleware)
expect(routes.last.middleware.instance_variable_get(:@stack).last.first).to_not eq(FakeMiddleware)
end
end