Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions config/datadog_example.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[main]
upstream = amqp://localhost:5672
log_level = INFO
idle_connection_timeout = 5

[listen]
address = localhost
port = 5673

[datadog]
enabled = true
service_name = amqproxy
env = production
agent_host = localhost
agent_port = 8126
38 changes: 36 additions & 2 deletions src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
require "./version"
require "./server"
require "./http_server"
require "./tracer"
require "./nil_tracer"
require "./datadog_tracer"
require "option_parser"
require "uri"
require "ini"
Expand All @@ -17,6 +20,11 @@ class AMQProxy::CLI
@term_timeout = -1
@term_client_close_timeout = 0
@server : AMQProxy::Server? = nil
@datadog_enabled = false
@datadog_service_name = "amqproxy"
@datadog_env = "production"
@datadog_agent_host = "localhost"
@datadog_agent_port = 8126

def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity
INI.parse(File.read(path)).each do |name, section|
Expand All @@ -32,6 +40,17 @@ class AMQProxy::CLI
else raise "Unsupported config #{name}/#{key}"
end
end
when "datadog"
section.each do |key, value|
case key
when "enabled" then @datadog_enabled = value.downcase == "true"
when "service_name" then @datadog_service_name = value
when "env" then @datadog_env = value
when "agent_host" then @datadog_agent_host = value
when "agent_port" then @datadog_agent_port = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
end
when "listen"
section.each do |key, value|
case key
Expand All @@ -48,7 +67,7 @@ class AMQProxy::CLI
abort ex.message
end

def apply_env_variables
def apply_env_variables # ameba:disable Metrics/CyclomaticComplexity
@listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address
@listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port
@http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port
Expand All @@ -57,6 +76,11 @@ class AMQProxy::CLI
@term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout
@term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout
@upstream = ENV["AMQP_URL"]? || @upstream
@datadog_enabled = ENV["DD_TRACE_ENABLED"]?.try { |v| v.downcase == "true" } || @datadog_enabled
@datadog_service_name = ENV["DD_SERVICE"]? || @datadog_service_name
@datadog_env = ENV["DD_ENV"]? || @datadog_env
@datadog_agent_host = ENV["DD_AGENT_HOST"]? || @datadog_agent_host
@datadog_agent_port = ENV["DD_TRACE_AGENT_PORT"]?.try &.to_i || @datadog_agent_port
end

def run(argv)
Expand Down Expand Up @@ -88,6 +112,11 @@ class AMQProxy::CLI
@term_client_close_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug }
parser.on("--datadog-enabled", "Enable Datadog APM tracing") { @datadog_enabled = true }
parser.on("--datadog-service=SERVICE", "Datadog service name (default: amqproxy)") { |v| @datadog_service_name = v }
parser.on("--datadog-env=ENV", "Datadog environment (default: production)") { |v| @datadog_env = v }
parser.on("--datadog-agent-host=HOST", "Datadog agent host (default: localhost)") { |v| @datadog_agent_host = v }
parser.on("--datadog-agent-port=PORT", "Datadog agent port (default: 8126)") { |v| @datadog_agent_port = v.to_i }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
parser.invalid_option { |arg| abort "Invalid argument: #{arg}" }
Expand Down Expand Up @@ -117,7 +146,12 @@ class AMQProxy::CLI
Signal::INT.trap &->self.initiate_shutdown(Signal)
Signal::TERM.trap &->self.initiate_shutdown(Signal)

server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)
tracer : Tracer = if @datadog_enabled
DatadogTracer.new(@datadog_service_name, @datadog_env, AMQProxy::VERSION, @datadog_agent_host, @datadog_agent_port)
else
NilTracer.new
end
server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, tracer)

HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)
Expand Down
21 changes: 15 additions & 6 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ require "amq-protocol"
require "./version"
require "./upstream"
require "./records"
require "./tracer"
require "./nil_tracer"

module AMQProxy
class Client
Expand All @@ -14,8 +16,9 @@ module AMQProxy
@channel_max : UInt16
@heartbeat : UInt16
@last_heartbeat = Time.monotonic
@tracer : Tracer

def initialize(@socket : TCPSocket)
def initialize(@socket : TCPSocket, @tracer = NilTracer)
set_socket_options(@socket)
tune_ok, @credentials = negotiate(@socket)
@frame_max = tune_ok.frame_max
Expand All @@ -42,11 +45,17 @@ module AMQProxy

private def finish_publish(channel)
buffer = @publish_buffers[channel]
if upstream_channel = @channel_map[channel]
upstream_channel.write(buffer.publish)
upstream_channel.write(buffer.header)
buffer.bodies.each do |body|
upstream_channel.write(body)
@tracer.trace("amqp.publish", buffer.publish.exchange, {
amqp_exchange: buffer.publish.exchange,
amqp_routing_key: buffer.publish.routing_key,
amqp_body_size: buffer.header.body_size.to_s,
}) do
if upstream_channel = @channel_map[channel]
upstream_channel.write(buffer.publish)
upstream_channel.write(buffer.header)
buffer.bodies.each do |body|
upstream_channel.write(body)
end
end
end
ensure
Expand Down
95 changes: 95 additions & 0 deletions src/amqproxy/datadog_tracer.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
require "http/client"
require "json"
require "log"
require "./tracer"

module AMQProxy
class DatadogTracer < Tracer
Log = ::Log.for(self)

@service_name : String
@env : String
@version : String
@agent_host : String
@agent_port : Int32
@http_client : HTTP::Client

def initialize(service_name = "amqproxy", env = "production", version = VERSION, agent_host = "localhost", agent_port = 8126)
@service_name = service_name
@env = env
@version = version
@agent_host = agent_host
@agent_port = agent_port
@http_client = HTTP::Client.new(agent_host, agent_port)
@http_client.connect_timeout = 1.second
@http_client.read_timeout = 1.second
end

def trace(operation_name : String, resource : String? = nil, tags = NamedTuple.new, &)
trace_id = Random.rand(UInt64::MAX)
span_id = Random.rand(UInt64::MAX)
start_time = Time.monotonic
start_time_ns = Time.utc.to_unix_ns.to_i64

begin
result = yield
duration = (Time.monotonic - start_time).total_nanoseconds.to_i64
send_span(trace_id, span_id, operation_name, resource, start_time_ns, duration, tags, error: false)
result
rescue ex
duration = (Time.monotonic - start_time).total_nanoseconds.to_i64
error_tags = tags.merge(error_type: ex.class.name, error_message: ex.message)
send_span(trace_id, span_id, operation_name, resource, start_time_ns, duration, error_tags, error: true)
raise ex
end
end

private def send_span(trace_id : UInt64, span_id : UInt64, operation_name : String, resource : String?, start_time : Int64, duration : Int64, tags, error : Bool)
meta_tags = tags.merge({
env: @env,
version: @version,
language: "crystal",
})

span = {
trace_id: trace_id,
span_id: span_id,
name: operation_name,
resource: resource || operation_name,
service: @service_name,
type: "custom",
start: start_time,
duration: duration,
error: error ? 1 : 0,
meta: meta_tags,
}

payload = JSON.build do |json|
json.array do
json.array do
span.to_json(json)
end
end
end

spawn do
begin
response = @http_client.put("/v0.4/traces",
headers: HTTP::Headers{"Content-Type" => "application/json"},
body: payload
)

unless response.success?
Log.debug { "Failed to send trace to Datadog: #{response.status_code} #{response.body}" }
end
rescue ex
Log.debug { "Error sending trace to Datadog: #{ex.message}" }
end
end
end

def close
@http_client.close
end
end
end
12 changes: 12 additions & 0 deletions src/amqproxy/nil_tracer.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require "./tracer"

module AMQProxy
class NilTracer < Tracer
def trace(operation_name : String, resource : String? = nil, tags = NamedTuple.new, &)
yield
end

def close
end
end
end
23 changes: 13 additions & 10 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,26 @@ require "uri"
require "./channel_pool"
require "./client"
require "./upstream"
require "./tracer"
require "./nil_tracer"

module AMQProxy
class Server
Log = ::Log.for(self)
@clients_lock = Mutex.new
@clients = Array(Client).new
@tracer : Tracer

def self.new(url : URI)
def self.new(url : URI, tracer = NilTracer.new)
tls = url.scheme == "amqps"
host = url.host || "127.0.0.1"
port = url.port || 5762
port = 5671 if tls && url.port.nil?
idle_connection_timeout = url.query_params.fetch("idle_connection_timeout", 5).to_i
new(host, port, tls, idle_connection_timeout)
new(host, port, tls, idle_connection_timeout, tracer)
end

def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5)
def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, @tracer = NilTracer.new)
tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls
@channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials|
hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout)
Expand All @@ -45,12 +48,10 @@ module AMQProxy
Log.info { "Proxy listening on #{server.local_address}" }
loop do
socket = server.accept? || break
begin
addr = socket.remote_address
spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}"
rescue IO::Error
next
end
addr = socket.remote_address
spawn handle_connection(socket, addr), name: "Client#read_loop #{addr}"
rescue IO::Error
next
end
Log.info { "Proxy stopping accepting connections" }
end
Expand All @@ -74,7 +75,9 @@ module AMQProxy
end

private def handle_connection(socket, remote_address)
c = Client.new(socket)
c = @tracer.trace("client.connection", remote_address.to_s) do
Client.new(socket, @tracer)
end
active_client(c) do
channel_pool = @channel_pools[c.credentials]
c.read_loop(channel_pool)
Expand Down
6 changes: 6 additions & 0 deletions src/amqproxy/tracer.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module AMQProxy
abstract class Tracer
abstract def trace(operation_name : String, resource : String? = nil, tags = NamedTuple.new, &)
abstract def close
end
end