Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress otlp payload with zlib #87

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
FROM openresty/openresty:1.21.4.1-0-centos

RUN yum install -y gcc
RUN yum install -y gcc git zlib-devel
RUN yum -y --enablerepo=powertools install libyaml-devel libffi-devel
RUN luarocks install lua-resty-http 0.16.1-0
RUN luarocks install lua-protobuf 0.3.3
RUN luarocks install lua-zlib 1.2
RUN luarocks install busted 2.0.0-1
RUN luarocks --server=http://rocks.moonscript.org install lyaml

Expand Down
7 changes: 5 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@ services:
image: jaegertracing/all-in-one
ports:
- 26686:16686
- 4319-4320:4317-4318
networks:
- opentelemetry-lua
otel-collector:
image: otel/opentelemetry-collector-contrib-dev:latest
command: [ "--config=/etc/otel-collector-config.yaml" ]
ports:
- 4317:4317
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
depends_on:
- jaeger
networks:
- opentelemetry-lua
test-server:
build:
context: ./examples/server
ports:
- 10080:80
depends_on:
- otel-collector
networks:
Expand Down
6 changes: 5 additions & 1 deletion examples/openresty/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ init_worker_by_lua_block {
local attr = require("opentelemetry.attribute")

-- create exporter
local exporter = otlp_exporter_new(exporter_client_new("otel-collector:4317", 3))
local use_gzip = true
local headers = {}
headers["Content-Encoding"] = "gzip"
local exporter_timeout = 10000
local exporter = otlp_exporter_new(exporter_client_new("otel-collector:4317", 3, headers), exporter_timeout, use_gzip)
-- create span processor
local batch_span_processor = batch_span_processor_new(exporter)
-- create tracer provider
Expand Down
20 changes: 14 additions & 6 deletions lib/opentelemetry/trace/exporter/http_client.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
local http = require("resty.http")
local zlib = require("zlib")
local otel_global = require("opentelemetry.global")
local exporter_request_compressed_payload_size = "otel.otlp_exporter.request_compressed_payload_size"
local exporter_request_uncompressed_payload_size = "otel.otlp_exporter.request_uncompressed_payload_size"

local _M = {
}
Expand All @@ -13,9 +17,11 @@ local mt = {
-- @address opentelemetry collector: host:port
-- @timeout export request timeout second
-- @headers export request headers
-- @httpc openresty http client instance
-- @use_gzip flag to enable gzip compression on request body
-- @return http client
------------------------------------------------------------------
function _M.new(address, timeout, headers)
function _M.new(address, timeout, headers, httpc)
headers = headers or {}
headers["Content-Type"] = "application/x-protobuf"

Expand All @@ -28,29 +34,31 @@ function _M.new(address, timeout, headers)
uri = uri,
timeout = timeout,
headers = headers,
httpc = httpc,
wperron marked this conversation as resolved.
Show resolved Hide resolved
}

return setmetatable(self, mt)
end

function _M.do_request(self, body)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider change do_request(self, body) to do_request(self, body, additional_headers).
Then call do_request(body, ["Content-Encoding": "gzip"]) if gzip is enabled.

local httpc = http.new()
httpc:set_timeout(self.timeout * 1000)
self.httpc = self.httpc or http.new()
self.httpc:set_timeout(self.timeout * 1000)

local res, err = httpc:request_uri(self.uri, {
local res, err = self.httpc:request_uri(self.uri, {
method = "POST",
headers = self.headers,
body = body,
})

if not res then
ngx.log(ngx.ERR, "request failed: ", err)
httpc:close()
self.httpc:close()
return nil, err
end

if res.status ~= 200 then
ngx.log(ngx.ERR, "request failed: ", res.body)
httpc:close()
self.httpc:close()
return nil, "request failed: " .. res.status
end

Expand Down
28 changes: 24 additions & 4 deletions lib/opentelemetry/trace/exporter/otlp.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ local mt = {
__index = _M
}

function _M.new(http_client, timeout_ms, circuit_reset_timeout_ms, circuit_open_threshold)
function _M.new(http_client, timeout_ms, use_gzip, circuit_reset_timeout_ms, circuit_open_threshold)
local self = {
client = http_client,
timeout_ms = timeout_ms or DEFAULT_TIMEOUT_MS,
use_gzip = use_gzip or false,
circuit = circuit.new({
reset_timeout_ms = circuit_reset_timeout_ms,
failure_threshold = circuit_open_threshold
})
}),
}
return setmetatable(self, mt)
end
Expand All @@ -35,7 +36,7 @@ end
-- @return true if call succeeded; false if call failed
-- @return nil if call succeeded; error message string if call failed
--------------------------------------------------------------------------------
local function call_collector(exporter, pb_encoded_body)
local function call_collector(exporter, pb_encoded_body, use_gzip)
local start_time_ms = util.gettimeofday_ms()
local failures = 0
local res
Expand All @@ -54,6 +55,25 @@ local function call_collector(exporter, pb_encoded_body)
return false, "Circuit breaker is open"
end

if use_gzip then
-- Compress (deflate) request body
-- the compression should be set to Best Compression and window size
-- should be set to 15+16, see reference below:
-- https://github.com/brimworks/lua-zlib/issues/4#issuecomment-26383801
local deflate_stream = zlib.deflate(zlib.BEST_COMPRESSION, 15+16)
local compressed_body, _, _, bytes_out = deflate_stream(pb_encoded_body, "finish")

otel_global.metrics_reporter:record_value(
exporter_request_uncompressed_payload_size, string.len(pb_encoded_body))
otel_global.metrics_reporter:record_value(
exporter_request_compressed_payload_size, bytes_out)

pb_encoded_body = compressed_body
else
otel_global.metrics_reporter:record_value(
exporter_request_uncompressed_payload_size, string.len(pb_encoded_body))
end
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gzip logic should move out while loop.


-- Make request
res, res_error = exporter.client:do_request(pb_encoded_body)
local after_time = util.gettimeofday_ms()
Expand Down Expand Up @@ -101,7 +121,7 @@ function _M.export_spans(self, spans)
body.resource_spans[1].instrumentation_library_spans[1].spans,
encoder.for_otlp(span))
end
return call_collector(self, pb.encode(body))
return call_collector(self, pb.encode(body), self.use_gzip)
end

function _M.shutdown(self)
Expand Down
17 changes: 6 additions & 11 deletions otel-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,10 @@ exporters:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
logging:
loglevel: debug

#zipkin:
# endpoint: "http://zipkin-all-in-one:9411/api/v2/spans"
# format: proto

jaeger:
endpoint: jaeger:14250
debug:
verbosity: detailed
otlp/jaeger:
endpoint: jaeger:4317
tls:
insecure: true

Expand All @@ -37,8 +32,8 @@ service:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, jaeger]
exporters: [debug, otlp/jaeger]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging, prometheus]
exporters: [debug, prometheus]
1 change: 1 addition & 0 deletions rockspec/opentelemetry-lua-master-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ description = {
dependencies = {
"lua-protobuf = 0.3.3",
"lua-resty-http = 0.16.1-0",
"lua-zlib = 1.2",
}

build = {
Expand Down
40 changes: 39 additions & 1 deletion spec/trace/exporter/otlp_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ local context = require "opentelemetry.context"
local tp = Global.get_tracer_provider()
local tracer = tp:tracer("test")

local function is_gzip(_, _)
return function(value)
-- check that the value starts with the two magic bytes 0x1f, 0x8b and
-- the compression method byte set to 0x08
-- reference: https://www.ietf.org/rfc/rfc1952.txt
return string.sub(value, 1, 3) == string.from_hex("1F8B08")
end
end

assert:register("matcher", "gzip", is_gzip)

function string.from_hex(str)
return (str:gsub('..', function (cc)
return string.char(tonumber(cc, 16))
end))
end

describe("export_spans", function()
it("invokes do_request when there are no failures", function()
local span
Expand All @@ -17,7 +34,28 @@ describe("export_spans", function()
stub(ngx, "log")
cb:export_spans({ span })
ngx.log:revert()
assert.spy(c.do_request).was_called_with(c, match.is_string())
assert.spy(c.do_request).was_called_with(c, match.is_all_of(match.is_string(), match.is_not_gzip()))
end)

it("invokes do_request with gzip compression when configured", function()
local span
local ctx = context.new()
ctx, span = tracer:start(ctx, "test span")
span:finish()

local headers = {}
headers["Content-Encoding"] = "gzip"
local c = client.new("http://localhost:8080", 10, headers)
spy.on(c, "do_request")

local cb = exporter.new(c, 10000, true)

-- Supress log message, since we expect it
stub(ngx, "log")

cb:export_spans({ span })
ngx.log:revert()
assert.spy(c.do_request).was_called_with(c, match.is_all_of(match.is_string(), match.is_gzip()))
end)

it("doesn't invoke protected_call when failures is equal to retry limit", function()
Expand Down
Loading