Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
name: CI
on:
pull_request:
branches:
- main
push:
paths:
- 'run-specs-in-docker.sh'
Expand Down
1 change: 1 addition & 0 deletions amqproxy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this?

Submodule amqproxy added at d09258
3 changes: 3 additions & 0 deletions spec/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ RUN shards install
COPY src/ src/
COPY spec/ spec/

COPY spec/config.ini /tmp/config.ini
COPY spec/config_empty.ini /tmp/config_empty.ini
Comment on lines +14 to +15
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All files in spec/ is already copied on the line above?


COPY spec/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
189 changes: 189 additions & 0 deletions spec/amqproxy/config_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
require "spec"
require "../../src/amqproxy/config"

describe AMQProxy::Config do
it "loads defaults when no ini file, env vars or options are available" do
previous_argv = ARGV.clone
ARGV.clear

ARGV.concat([
"--config=/tmp/non_existing_file.ini",
])

config = AMQProxy::Config.load_with_cli(ARGV)
Comment on lines +6 to +13
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do all the ARGV handling in each spec, you can just pass an array of strings (or an empty array where needed) to load_with_cli instead.

Suggested change
previous_argv = ARGV.clone
ARGV.clear
ARGV.concat([
"--config=/tmp/non_existing_file.ini",
])
config = AMQProxy::Config.load_with_cli(ARGV)
config = AMQProxy::Config.load_with_cli(["--config=/tmp/non_existing_file.ini"])


config.listen_address.should eq "localhost"
config.listen_port.should eq 5673
config.http_port.should eq 15673
config.log_level.should eq ::Log::Severity::Info
config.idle_connection_timeout.should eq 5
config.term_timeout.should eq -1
config.term_client_close_timeout.should eq 0
config.upstream.should eq nil

# Restore ARGV
ARGV.clear
ARGV.concat(previous_argv)
end

it "reads from empty config file returning default configuration" do
previous_argv = ARGV.clone
ARGV.clear

ARGV.concat(["--config=/tmp/config_empty.ini"])

config = AMQProxy::Config.load_with_cli(ARGV)

config.listen_address.should eq "localhost"
config.listen_port.should eq 5673
config.http_port.should eq 15673
config.log_level.should eq ::Log::Severity::Info
config.idle_connection_timeout.should eq 5
config.term_timeout.should eq -1
config.term_client_close_timeout.should eq 0
config.upstream.should eq nil

# Restore ARGV
ARGV.clear
ARGV.concat(previous_argv)
end

it "reads from environment variables and overrules ini file values" do
previous_argv = ARGV.clone
ARGV.clear

ENV["LISTEN_ADDRESS"] = "example.com"
ENV["LISTEN_PORT"] = "5674"
ENV["HTTP_PORT"] = "15674"
ENV["LOG_LEVEL"] = "Error"
ENV["IDLE_CONNECTION_TIMEOUT"] = "12"
ENV["TERM_TIMEOUT"] = "13"
ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14"
ENV["UPSTREAM"] = "amqp://localhost:5674"

config = AMQProxy::Config.load_with_cli(ARGV)

config.listen_address.should eq "example.com"
config.listen_port.should eq 5674
config.http_port.should eq 15674
config.log_level.should eq ::Log::Severity::Error
config.idle_connection_timeout.should eq 12
config.term_timeout.should eq 13
config.term_client_close_timeout.should eq 14
config.upstream.should eq "amqp://localhost:5674"

# Clean up
ENV.delete("LISTEN_ADDRESS")
ENV.delete("LISTEN_PORT")
ENV.delete("HTTP_PORT")
ENV.delete("LOG_LEVEL")
ENV.delete("IDLE_CONNECTION_TIMEOUT")
ENV.delete("TERM_TIMEOUT")
ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT")
ENV.delete("UPSTREAM")

# Restore ARGV
ARGV.clear
ARGV.concat(previous_argv)
end

it "reads from command line arguments and overrules env vars" do
previous_argv = ARGV.clone
ARGV.clear

ENV["LISTEN_ADDRESS"] = "example.com"
ENV["LISTEN_PORT"] = "5674"
ENV["HTTP_PORT"] = "15674"
ENV["LOG_LEVEL"] = "Error"
ENV["IDLE_CONNECTION_TIMEOUT"] = "12"
ENV["TERM_TIMEOUT"] = "13"
ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14"
ENV["UPSTREAM"] = "amqp://localhost:5674"

ARGV.concat([
"--listen=example_arg.com",
"--port=5675",
"--http-port=15675",
"--log-level=Warn",
"--idle-connection-timeout=15",
"--term-timeout=16",
"--term-client-close-timeout=17",
"amqp://localhost:5679",
])

config = AMQProxy::Config.load_with_cli(ARGV)

config.listen_address.should eq "example_arg.com"
config.log_level.should eq ::Log::Severity::Warn
config.listen_port.should eq 5675
config.http_port.should eq 15675
config.idle_connection_timeout.should eq 15
config.term_timeout.should eq 16
config.term_client_close_timeout.should eq 17
config.upstream.should eq "amqp://localhost:5679"

# Clean Up
ENV.delete("LISTEN_ADDRESS")
ENV.delete("LISTEN_PORT")
ENV.delete("HTTP_PORT")
ENV.delete("LOG_LEVEL")
ENV.delete("IDLE_CONNECTION_TIMEOUT")
ENV.delete("TERM_TIMEOUT")
ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT")
ENV.delete("UPSTREAM")

# Restore ARGV
ARGV.clear
ARGV.concat(previous_argv)
end

it "sets log level to debug when debug flag is present" do
previous_argv = ARGV.clone
ARGV.clear

ARGV.concat([
"--listen=example_arg.com",
"--port=5675",
"--http-port=15675",
"--log-level=Warn",
"--idle-connection-timeout=15",
"--term-timeout=16",
"--term-client-close-timeout=17",
"--debug",
"amqp://localhost:5679",
])

config = AMQProxy::Config.load_with_cli(ARGV)

config.listen_address.should eq "example_arg.com"
config.log_level.should eq ::Log::Severity::Debug
config.listen_port.should eq 5675
config.http_port.should eq 15675
config.idle_connection_timeout.should eq 15
config.term_timeout.should eq 16
config.term_client_close_timeout.should eq 17
config.upstream.should eq "amqp://localhost:5679"

# Restore ARGV
ARGV.clear
ARGV.concat(previous_argv)
end

it "keeps the log level to trace when debug flag is present" do
previous_argv = ARGV.clone
ARGV.clear

ARGV.concat([
"--log-level=Trace",
"--debug",
])

config = AMQProxy::Config.load_with_cli(ARGV)

config.log_level.should eq ::Log::Severity::Trace

# Restore ARGV
ARGV.clear
ARGV.concat(previous_argv)
end
end
13 changes: 13 additions & 0 deletions spec/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[main]
log_level = debug
idle_connection_timeout = 55
term_timeout = 56
term_client_close_timeout = 57
upstream = amqp://localhost:5678

[listen]
bind = 127.0.0.1
address = 127.0.0.2
port = 5678
http_port = 15678
log_level = debug
Empty file added spec/config_empty.ini
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to avoid having empty files in the repository, maybe this file can be created when needed in config_spec.cr and then cleaned up at the end of the spec.

Copy link
Member

@dentarg dentarg Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it good to avoid empty files in the repo? If the file serves a purpose (used in specs? I haven't actually checked this 🙈), we should not be afraid to have it around. Creating it with code sounds both more complex and slower to me.

Copy link
Member

@kickster97 kickster97 Jun 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning is mostly about cleanliness so the repository remains free of files that are solely for transient test purposes.
Sure complexion grows slightly, thats a good point. I don't know the performance impact of creating an empty file, but is it really notably expensive to create once?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have files like this in the repo I think it should be clear in some way what they are used for so its less confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree it should be more made more clear. Could be made by having a "fixtures" or "config fixtures" directory.

Empty file.
118 changes: 25 additions & 93 deletions src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
require "./config"
require "./version"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require "./version" should probably be moved to config.cr since that's where AMQProxy::VERSION is used?

require "./server"
require "./http_server"
Expand All @@ -9,94 +10,27 @@ require "log"
class AMQProxy::CLI
Log = ::Log.for(self)

@listen_address = "localhost"
@listen_port = 5673
@http_port = 15673
@log_level : ::Log::Severity = ::Log::Severity::Info
@idle_connection_timeout : Int32 = 5
@term_timeout = -1
@term_client_close_timeout = 0
@config : AMQProxy::Config? = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible to make this not optional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I do that? creating the instance happens in the run method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a run() method, you can use an initializer and let cli.new() set @config when initializing, that way I think you can avoid letting it be Nilable.

@server : AMQProxy::Server? = nil

def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity
INI.parse(File.read(path)).each do |name, section|
case name
when "main", ""
section.each do |key, value|
case key
when "upstream" then @upstream = value
when "log_level" then @log_level = ::Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "term_timeout" then @term_timeout = value.to_i
when "term_client_close_timeout" then @term_client_close_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
end
end
when "listen"
section.each do |key, value|
case key
when "port" then @listen_port = value.to_i
when "bind", "address" then @listen_address = value
when "log_level" then @log_level = ::Log::Severity.parse(value)
else raise "Unsupported config #{name}/#{key}"
end
end
else raise "Unsupported config section #{name}"
end
end
rescue ex
abort ex.message
end

def apply_env_variables
@listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address
@listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port
@http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port
@log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level
@idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout
@term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout
@term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout
@upstream = ENV["AMQP_URL"]? || @upstream
end

def run(argv)
raise "run cant be called multiple times" unless @server.nil?

# Parse config file first
OptionParser.parse(argv) do |parser|
parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) }
parser.invalid_option { } # Invalid arguments are handled by the next OptionParser
end

apply_env_variables
# load cascading configuration. load sequence: defaults -> file -> env -> cli
config = @config = AMQProxy::Config.load_with_cli(argv)

# Parse CLI arguments
p = OptionParser.parse(argv) do |parser|
parser.banner = "Usage: amqproxy [options] [amqp upstream url]"
parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v|
@listen_address = v
end
parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i }
parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i }
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v|
@term_timeout = v.to_i
end
parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close before sending Close to clients (default: 0s)") do |v|
@term_client_close_timeout = v.to_i
end
parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug }
parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 }
parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 }
parser.invalid_option { |arg| abort "Invalid argument: #{arg}" }
end
log_backend = if ENV.has_key?("JOURNAL_STREAM")
::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher)
else
::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher)
end
::Log.setup_from_env(default_level: config.log_level, backend: log_backend)

@upstream ||= argv.shift?
upstream_url = @upstream || abort p.to_s
Log.debug { config.inspect }

upstream_url = config.upstream || abort "Upstream AMQP url is required. Add -h switch for help."
u = URI.parse upstream_url

abort "Invalid upstream URL" unless u.host
default_port =
case u.scheme
Expand All @@ -107,20 +41,13 @@ class AMQProxy::CLI
port = u.port || default_port
tls = u.scheme == "amqps"

log_backend = if ENV.has_key?("JOURNAL_STREAM")
::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher)
else
::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher)
end
::Log.setup_from_env(default_level: @log_level, backend: log_backend)

Signal::INT.trap &->self.initiate_shutdown(Signal)
Signal::TERM.trap &->self.initiate_shutdown(Signal)

server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)
server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, config.idle_connection_timeout)

HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)
HTTPServer.new(server, config.listen_address, config.http_port)
server.listen(config.listen_address, config.listen_port)

shutdown

Expand Down Expand Up @@ -149,17 +76,22 @@ class AMQProxy::CLI
unless server = @server
raise "Can't call shutdown before run"
end

unless config = @config
raise "Configuration has not been loaded"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that for some reason @config isn't set we dont disconnect clients, we just stop the process. Isn't @config always set here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes It is always set, but here my Crystal knowledge falls short. I just don't know how I can convince the compiler it is. When I use not_nil I get the suggestion not to use this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to make sure that it is set by being strict that it has a value, i.e. not letting it be nilable :)

end

if server.client_connections > 0
if @term_client_close_timeout > 0
wait_for_clients_to_close @term_client_close_timeout.seconds
if config.term_client_close_timeout > 0
wait_for_clients_to_close config.term_client_close_timeout.seconds
end
server.disconnect_clients
end

if server.client_connections > 0
if @term_timeout >= 0
if config.term_timeout >= 0
spawn do
sleep @term_timeout.seconds
sleep config.term_timeout.seconds
abort "Exiting with #{server.client_connections} client connections still open"
end
end
Expand Down
Loading
Loading