Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ version: 2.0
shards:
ameba:
git: https://github.com/crystal-ameba/ameba.git
version: 1.6.1
version: 1.6.4

amq-protocol:
git: https://github.com/cloudamqp/amq-protocol.cr.git
version: 1.1.14

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.2.5
version: 1.3.0

2 changes: 2 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ end
def with_http_server(idle_connection_timeout = 5, &)
with_server do |server, amqp_url|
http_server = AMQProxy::HTTPServer.new(server, "127.0.0.1", 15673)
spawn { http_server.listen }
begin
yield http_server, server, amqp_url
ensure
Expand All @@ -45,6 +46,7 @@ def verify_running_amqp!
port = UPSTREAM_URL.port || 5762
port = 5671 if tls && UPSTREAM_URL.port.nil?
TCPSocket.new(host, port, connect_timeout: 3.seconds).close
puts "AMQP running"
rescue Socket::ConnectError
STDERR.puts "[ERROR] Specs require a running rabbitmq server on #{host}:#{port}"
exit 1
Expand Down
13 changes: 13 additions & 0 deletions src/amqproxy.cr
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
require "./amqproxy/cli"
{% begin %}
{%
flags = [] of String
flags << "-Dpreview_mt" if flag?(:preview_mt)
flags << "-Dmt" if flag?(:mt)
flags << "-Dexecution_context" if flag?(:execution_context)
flags << "-Dtracing" if flag?(:tracing)
flags << "--release" if flag?(:release)
flags << "--static" if flag?(:static)
flags << "--debug" if flag?(:debug)
%}
puts "Built with #{Crystal::VERSION} #{Crystal::BUILD_COMMIT} {{flags.join(" ").id}}"
{% end %}
AMQProxy::CLI.new.run(ARGV)
7 changes: 4 additions & 3 deletions src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ class AMQProxy::CLI

server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)

HTTPServer.new(server, @listen_address, @http_port.to_i)
http_server = HTTPServer.new(server, @listen_address, @http_port.to_i)
spawn http_server.listen, name: "HTTP Server"
server.listen(@listen_address, @listen_port.to_i)

shutdown

# wait until all client connections are closed
Expand All @@ -133,11 +133,12 @@ class AMQProxy::CLI

@first_shutdown = true

def initiate_shutdown(_s : Signal)
def initiate_shutdown(signal : Signal)
unless server = @server
exit 0
end
if @first_shutdown
Log.info { "Shutting down due to signal #{signal}" }
@first_shutdown = false
server.stop_accepting_clients
else
Expand Down
41 changes: 26 additions & 15 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ module AMQProxy
end
end

private def with_channel_map(&)
yield @channel_map
end

private def finish_publish(channel)
buffer = @publish_buffers[channel]
if upstream_channel = @channel_map[channel]
if upstream_channel = with_channel_map &.[channel]
upstream_channel.write(buffer.publish)
upstream_channel.write(buffer.header)
buffer.bodies.each do |body|
Expand All @@ -61,6 +65,7 @@ module AMQProxy
socket.read_timeout = (@heartbeat / 2).ceil.seconds if @heartbeat > 0
loop do
frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian)
Log.trace { "Received frame: #{frame}" }
@last_heartbeat = Time.monotonic
case frame
when AMQ::Protocol::Frame::Heartbeat # noop
Expand All @@ -70,13 +75,15 @@ module AMQProxy
write AMQ::Protocol::Frame::Connection::CloseOk.new
return
when AMQ::Protocol::Frame::Channel::Open
raise "Channel already opened" if @channel_map.has_key? frame.channel
upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel))
@channel_map[frame.channel] = upstream_channel
with_channel_map do |channel_map|
raise "Channel already opened" if channel_map.has_key? frame.channel
upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel))
channel_map[frame.channel] = upstream_channel
end
write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::CloseOk
# Server closed channel, CloseOk reply to server is already sent
@channel_map.delete(frame.channel)
with_channel_map &.delete(frame.channel)
when AMQ::Protocol::Frame::Basic::Publish
@publish_buffers[frame.channel] = PublishBuffer.new(frame)
when AMQ::Protocol::Frame::Header
Expand All @@ -92,7 +99,7 @@ module AMQProxy
else
src_channel = frame.channel
begin
if upstream_channel = @channel_map[frame.channel]
if upstream_channel = with_channel_map &.[frame.channel]?
upstream_channel.write(frame)
else
# Channel::Close is sent, waiting for CloseOk
Expand Down Expand Up @@ -121,7 +128,7 @@ module AMQProxy
end
end
rescue ex : IO::Error
Log.debug { "Disconnected #{ex.inspect}" }
Log.debug(exception: ex) { "Disconnected #{ex.inspect}" }
else
Log.debug { "Disconnected" }
ensure
Expand All @@ -132,6 +139,7 @@ module AMQProxy
# Send frame to client, channel id should already be remapped by the caller
def write(frame : AMQ::Protocol::Frame)
@lock.synchronize do
Log.trace { "Sending frame: #{frame}" }
case frame
when AMQ::Protocol::Frame::BytesBody
# Upstream might send large frames, split them to support lower client frame_max
Expand All @@ -149,9 +157,9 @@ module AMQProxy
end
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
with_channel_map &.[frame.channel] = nil
when AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
with_channel_map &.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
@socket.close rescue nil
end
Expand All @@ -174,13 +182,15 @@ module AMQProxy
end

private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED")
@channel_map.each_value do |upstream_channel|
upstream_channel.try &.close(code, reason)
rescue Upstream::WriteError
Log.debug { "Upstream write error while closing client's channels" }
next # Nothing to do
with_channel_map do |channel_map|
channel_map.each_value do |upstream_channel|
upstream_channel.try &.close(code, reason)
rescue Upstream::WriteError
Log.debug { "Upstream write error while closing client's channels" }
next # Nothing to do
end
channel_map.clear
end
@channel_map.clear
end

private def expect_more_frames?(frame) : Bool
Expand Down Expand Up @@ -221,6 +231,7 @@ module AMQProxy
socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice
socket.flush
socket.close
Log.debug { "Invalid protocol start: #{proto}" }
raise IO::EOFError.new("Invalid protocol start")
end

Expand Down
5 changes: 4 additions & 1 deletion src/amqproxy/http_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module AMQProxy
end
end
bind_tcp
spawn @http.listen, name: "HTTP Server"
Log.info { "HTTP server listening on #{@address}:#{@port}" }
end

Expand All @@ -31,6 +30,10 @@ module AMQProxy
Log.info { "Bound to #{addr}" }
end

def listen
@http.listen
end

def metrics(context)
writer = PrometheusWriter.new(context.response, "amqproxy")
writer.write({name: "identity_info",
Expand Down
41 changes: 27 additions & 14 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ module AMQProxy
@clients_lock = Mutex.new
@clients = Array(Client).new

@channel_pools_lock = Mutex.new

def self.new(url : URI)
tls = url.scheme == "amqps"
host = url.host || "127.0.0.1"
Expand All @@ -29,12 +31,18 @@ module AMQProxy
Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" }
end

private def with_channel_pools(&)
@channel_pools_lock.synchronize do
yield @channel_pools
end
end

def client_connections
@clients.size
end

def upstream_connections
@channel_pools.each_value.sum &.connections
with_channel_pools &.each_value.sum(&.connections)
end

def listen(address, port)
Expand All @@ -43,10 +51,11 @@ module AMQProxy

def listen(@server : TCPServer)
Log.info { "Proxy listening on #{server.local_address}" }

while socket = server.accept?
begin
addr = socket.remote_address
spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}"
Log.debug { "Accepted new client from #{socket.remote_address} (#{socket.inspect})" }
handle_connection(socket)
rescue IO::Error
next
end
Expand All @@ -72,18 +81,22 @@ module AMQProxy
end
end

private def handle_connection(socket, remote_address)
c = Client.new(socket)
active_client(c) do
channel_pool = @channel_pools[c.credentials]
c.read_loop(channel_pool)
private def handle_connection(socket)
spawn(name: "Client #{socket.remote_address}") do
c = Client.new(socket)
channel_pool = with_channel_pools &.[c.credentials]
remote_address = socket.remote_address
Log.debug { "Client created for #{remote_address}" }
active_client(c) do
c.read_loop(channel_pool)
end
rescue IO::EOFError
# Client closed connection before/while negotiating
rescue ex # only raise from constructor, when negotating
Log.debug(exception: ex) { "Client negotiation failure (#{remote_address}) #{ex.inspect}" }
ensure
socket.close rescue nil
end
rescue IO::EOFError
# Client closed connection before/while negotiating
rescue ex # only raise from constructor, when negotating
Log.debug(exception: ex) { "Client negotiation failure (#{remote_address}) #{ex.inspect}" }
ensure
socket.close rescue nil
end

private def active_client(client, &)
Expand Down
37 changes: 19 additions & 18 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module AMQProxy
Log = ::Log.for(self)
@socket : IO
@channels = Hash(UInt16, DownstreamChannel).new
@channels_lock = Mutex.new
@channel_max : UInt16
@lock = Mutex.new
@remote_address : String
Expand Down Expand Up @@ -45,23 +44,27 @@ module AMQProxy
end

private def create_upstream_channel(downstream_channel : DownstreamChannel)
@channels_lock.synchronize do
with_channels do |channels|
1_u16.upto(@channel_max) do |i|
unless @channels.has_key?(i)
@channels[i] = downstream_channel
unless channels.has_key?(i)
channels[i] = downstream_channel
return UpstreamChannel.new(self, i)
end
end
raise ChannelMaxReached.new
end
end

private def with_channels(&)
yield @channels
end

def close_channel(id, code, reason)
send AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
end

def channels
@channels.size
with_channels &.size
end

# Frames from upstream (to client)
Expand All @@ -86,15 +89,15 @@ module AMQProxy
when AMQ::Protocol::Frame::Channel::OpenOk # assume it always succeeds
when AMQ::Protocol::Frame::Channel::Close
send AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) }
if downstream_channel = with_channels &.delete(frame.channel)
downstream_channel.write frame
end
when AMQ::Protocol::Frame::Channel::CloseOk # when client requested channel close
if downstream_channel = @channels_lock.synchronize { @channels.delete(frame.channel) }
if downstream_channel = with_channels &.delete(frame.channel)
downstream_channel.write(frame)
end
else
if downstream_channel = @channels_lock.synchronize { @channels[frame.channel]? }
if downstream_channel = with_channels &.[frame.channel]?
downstream_channel.write(frame)
else
Log.debug { "Frame for unmapped channel from upstream: #{frame}" }
Expand All @@ -114,21 +117,21 @@ module AMQProxy
end

private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR")
@channels_lock.synchronize do
return if @channels.empty?
Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" }
@channels.each_value do |downstream_channel|
with_channels do |channels|
return if channels.empty?
Log.debug { "Upstream connection closed, closing #{channels.size} client channels" }
channels.each_value do |downstream_channel|
downstream_channel.close(code, reason)
end
@channels.clear
channels.clear
end
end

private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection)
Log.debug { "Sending broadcast frame to all client connections" }
clients = Set(Client).new
@channels_lock.synchronize do
@channels.each_value do |downstream_channel|
with_channels do |channels|
channels.each_value do |downstream_channel|
clients << downstream_channel.client
end
end
Expand All @@ -144,9 +147,7 @@ module AMQProxy
raise "Connection frames should not be sent through here: #{frame}"
when AMQ::Protocol::Frame::Channel::CloseOk
# when upstream server requested a channel close and client confirmed
@channels_lock.synchronize do
@channels.delete(frame.channel)
end
with_channels &.delete(frame.channel)
end
send frame
end
Expand Down
Loading