From aad66b77187c5930b6cec28729552d48cd62d6a6 Mon Sep 17 00:00:00 2001 From: William Perron Date: Mon, 5 Feb 2024 08:54:51 -0500 Subject: [PATCH] add metrics for compressed/uncompressed payload size --- examples/openresty/nginx.conf | 6 +- .../trace/exporter/http_client.lua | 21 +++---- lib/opentelemetry/trace/exporter/otlp.lua | 30 ++++++++-- spec/trace/exporter/http_client_spec.lua | 57 ------------------- spec/trace/exporter/otlp_spec.lua | 40 ++++++++++++- 5 files changed, 76 insertions(+), 78 deletions(-) delete mode 100644 spec/trace/exporter/http_client_spec.lua diff --git a/examples/openresty/nginx.conf b/examples/openresty/nginx.conf index 79fa449..b4699f2 100644 --- a/examples/openresty/nginx.conf +++ b/examples/openresty/nginx.conf @@ -7,7 +7,11 @@ init_worker_by_lua_block { local attr = require("opentelemetry.attribute") -- create exporter - local exporter = otlp_exporter_new(exporter_client_new("otel-collector:4317", 3)) + local use_gzip = true + local headers = {} + headers["Content-Encoding"] = "gzip" + local exporter_timeout = 10000 + local exporter = otlp_exporter_new(exporter_client_new("otel-collector:4317", 3, headers), exporter_timeout, use_gzip) -- create span processor local batch_span_processor = batch_span_processor_new(exporter) -- create tracer provider diff --git a/lib/opentelemetry/trace/exporter/http_client.lua b/lib/opentelemetry/trace/exporter/http_client.lua index 7d291ea..d92c94e 100644 --- a/lib/opentelemetry/trace/exporter/http_client.lua +++ b/lib/opentelemetry/trace/exporter/http_client.lua @@ -1,5 +1,9 @@ local http = require("resty.http") local zlib = require("zlib") +local otel_global = require("opentelemetry.global") +local exporter_request_compressed_payload_size = "otel.otlp_exporter.request_compressed_payload_size" +local exporter_request_uncompressed_payload_size = "otel.otlp_exporter.request_uncompressed_payload_size" + local _M = { } @@ -14,6 +18,7 @@ local mt = { -- @timeout export request timeout second -- @headers export request headers -- @httpc openresty http client instance +-- @use_gzip flag to enable gzip compression on request body -- @return http client ------------------------------------------------------------------ function _M.new(address, timeout, headers, httpc) @@ -31,26 +36,14 @@ function _M.new(address, timeout, headers, httpc) headers = headers, httpc = httpc, } + return setmetatable(self, mt) end -function _M.do_request(self, body, encode_gzip) +function _M.do_request(self, body) self.httpc = self.httpc or http.new() - - encode_gzip = encode_gzip or false self.httpc:set_timeout(self.timeout * 1000) - if encode_gzip then - -- Compress (deflate) request body - -- the compression should be set to Best Compression and window size - -- should be set to 15+16, see reference below: - -- https://github.com/brimworks/lua-zlib/issues/4#issuecomment-26383801 - self.headers["Content-Encoding"] = "gzip" - local deflate_stream = zlib.deflate(zlib.BEST_COMPRESSION, 15+16) - local compressed_body = deflate_stream(body, "finish") - body = compressed_body - end - local res, err = self.httpc:request_uri(self.uri, { method = "POST", headers = self.headers, diff --git a/lib/opentelemetry/trace/exporter/otlp.lua b/lib/opentelemetry/trace/exporter/otlp.lua index d35909c..0576b1a 100644 --- a/lib/opentelemetry/trace/exporter/otlp.lua +++ b/lib/opentelemetry/trace/exporter/otlp.lua @@ -14,14 +14,15 @@ local mt = { __index = _M } -function _M.new(http_client, timeout_ms, circuit_reset_timeout_ms, circuit_open_threshold) +function _M.new(http_client, timeout_ms, use_gzip, circuit_reset_timeout_ms, circuit_open_threshold) local self = { client = http_client, timeout_ms = timeout_ms or DEFAULT_TIMEOUT_MS, + use_gzip = use_gzip or false, circuit = circuit.new({ reset_timeout_ms = circuit_reset_timeout_ms, failure_threshold = circuit_open_threshold - }) + }), } return setmetatable(self, mt) end @@ -35,7 +36,7 @@ end -- @return true if call succeeded; false if call failed -- @return nil if call succeeded; error message string if call failed -------------------------------------------------------------------------------- -local function call_collector(exporter, pb_encoded_body) +local function call_collector(exporter, pb_encoded_body, use_gzip) local start_time_ms = util.gettimeofday_ms() local failures = 0 local res @@ -54,8 +55,27 @@ local function call_collector(exporter, pb_encoded_body) return false, "Circuit breaker is open" end + if use_gzip then + -- Compress (deflate) request body + -- the compression should be set to Best Compression and window size + -- should be set to 15+16, see reference below: + -- https://github.com/brimworks/lua-zlib/issues/4#issuecomment-26383801 + local deflate_stream = zlib.deflate(zlib.BEST_COMPRESSION, 15+16) + local compressed_body, _, _, bytes_out = deflate_stream(pb_encoded_body, "finish") + + otel_global.metrics_reporter:record_value( + exporter_request_uncompressed_payload_size, string.len(pb_encoded_body)) + otel_global.metrics_reporter:record_value( + exporter_request_compressed_payload_size, bytes_out) + + pb_encoded_body = compressed_body + else + otel_global.metrics_reporter:record_value( + exporter_request_uncompressed_payload_size, string.len(pb_encoded_body)) + end + -- Make request - res, res_error = exporter.client:do_request(pb_encoded_body, true) + res, res_error = exporter.client:do_request(pb_encoded_body) local after_time = util.gettimeofday_ms() otel_global.metrics_reporter:record_value( exporter_request_duration_metric, after_time - current_time) @@ -101,7 +121,7 @@ function _M.export_spans(self, spans) body.resource_spans[1].instrumentation_library_spans[1].spans, encoder.for_otlp(span)) end - return call_collector(self, pb.encode(body)) + return call_collector(self, pb.encode(body), self.use_gzip) end function _M.shutdown(self) diff --git a/spec/trace/exporter/http_client_spec.lua b/spec/trace/exporter/http_client_spec.lua deleted file mode 100644 index f5dd641..0000000 --- a/spec/trace/exporter/http_client_spec.lua +++ /dev/null @@ -1,57 +0,0 @@ -local http = require("resty.http") -local client = require "opentelemetry.trace.exporter.http_client" -local zlib = require("zlib") - -describe("export_spans", function() - it("invokes an HTTP request", function () - local httpc = http.new() - spy.on(httpc, "request_uri") - local c = client.new("http://localhost:8080", 10, nil, httpc) - local expected_headers = {} - expected_headers["Content-Type"] = "application/x-protobuf" - - c:do_request("Hello, World!") - assert.spy(httpc.request_uri).was_called_with( - match.is_truthy(), -- TODO(wperron) this *should* be the same ref, why is it not? - match.is_equal("http://localhost:8080/v1/traces"), - match.is_all_of( - match.is_table(), - match.is_same({ - method = "POST", - headers = expected_headers, - body = "Hello, World!" - }) - ) - ) - end) - - it("compresses the body when asked to", function () - local httpc = http.new() - spy.on(httpc, "request_uri") - local c = client.new("http://localhost:8080", 10, nil, httpc) - local expected_headers = {} - expected_headers["Content-Type"] = "application/x-protobuf" - expected_headers["Content-Encoding"] = "gzip" - local expected_body = string.fromhex("1F8B0800000000000203F348CDC9C9D75108CF2FCA49510400D0C34AEC0D000000") - - c:do_request("Hello, World!", true) - assert.spy(httpc.request_uri).was_called_with( - match.is_truthy(), -- TODO(wperron) this *should* be the same ref, why is it not? - match.is_equal("http://localhost:8080/v1/traces"), - match.is_all_of( - match.is_table(), - match.is_same({ - method = "POST", - headers = expected_headers, - body = expected_body - }) - ) - ) - end) -end) - -function string.fromhex(str) - return (str:gsub('..', function (cc) - return string.char(tonumber(cc, 16)) - end)) -end diff --git a/spec/trace/exporter/otlp_spec.lua b/spec/trace/exporter/otlp_spec.lua index dfece3d..a529418 100644 --- a/spec/trace/exporter/otlp_spec.lua +++ b/spec/trace/exporter/otlp_spec.lua @@ -4,6 +4,23 @@ local context = require "opentelemetry.context" local tp = Global.get_tracer_provider() local tracer = tp:tracer("test") +local function is_gzip(_, _) + return function(value) + -- check that the value starts with the two magic bytes 0x1f, 0x8b and + -- the compression method byte set to 0x08 + -- reference: https://www.ietf.org/rfc/rfc1952.txt + return string.sub(value, 1, 3) == string.from_hex("1F8B08") + end +end + +assert:register("matcher", "gzip", is_gzip) + +function string.from_hex(str) + return (str:gsub('..', function (cc) + return string.char(tonumber(cc, 16)) + end)) +end + describe("export_spans", function() it("invokes do_request when there are no failures", function() local span @@ -17,7 +34,28 @@ describe("export_spans", function() stub(ngx, "log") cb:export_spans({ span }) ngx.log:revert() - assert.spy(c.do_request).was_called_with(c, match.is_string(), match.is_all_of(match.is_boolean(), match.is_equal(true))) + assert.spy(c.do_request).was_called_with(c, match.is_all_of(match.is_string(), match.is_not_gzip())) + end) + + it("invokes do_request with gzip compression when configured", function() + local span + local ctx = context.new() + ctx, span = tracer:start(ctx, "test span") + span:finish() + + local headers = {} + headers["Content-Encoding"] = "gzip" + local c = client.new("http://localhost:8080", 10, headers) + spy.on(c, "do_request") + + local cb = exporter.new(c, 10000, true) + + -- Supress log message, since we expect it + stub(ngx, "log") + + cb:export_spans({ span }) + ngx.log:revert() + assert.spy(c.do_request).was_called_with(c, match.is_all_of(match.is_string(), match.is_gzip())) end) it("doesn't invoke protected_call when failures is equal to retry limit", function()