Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2049/strim-logs-with-offset' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2049] feat: logging offset

Closes DEX-2049

See merge request nstmrt/rubygems/sbmt-kafka_producer!30
  • Loading branch information
Arlantir committed Apr 15, 2024
2 parents 1cacbb5 + 51d2205 commit 228ade2
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.2.0] - 2024-04-12

### Changed

- Add logs with `offset`.

## [2.1.0] - 2024-03-14

### Changed
Expand Down
7 changes: 6 additions & 1 deletion lib/sbmt/kafka_producer/base_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ class BaseProducer
option :topic

def sync_publish!(payload, options = {})
around_publish do
report = around_publish do
client.produce_sync(payload: payload, **options.merge(topic: topic))
end
log_success(report)
true
end

Expand Down Expand Up @@ -81,6 +82,10 @@ def log_error(error)
ErrorTracker.error(error)
end

def log_success(report)
logger.info "Message has been successfully sent to Kafka - topic: #{report.topic_name}, partition: #{report.partition}, offset: #{report.offset}"
end

def format_exception_error(error)
text = "#{format_exception_error(error.cause)}. " if with_cause?(error)

Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_producer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaProducer
VERSION = "2.1.0"
VERSION = "2.2.0"
end
end
25 changes: 19 additions & 6 deletions spec/sbmt/kafka_producer/base_producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ def initialize(message, cause)
let(:topic) { "test_topic" }
let(:payload) { {message: "payload"} }
let(:error) { WaterDrop::Errors::ProduceError }
let(:delivery_report) do
instance_double(Rdkafka::Producer::DeliveryReport,
error: nil,
label: nil,
offset: 0,
partition: 0,
topic_name: "my_topic")
end
let(:delivery_handle) do
instance_double(Rdkafka::Producer::DeliveryHandle,
label: nil,
wait: delivery_report)
end

before do
allow(Sbmt::KafkaProducer::KafkaClientFactory).to receive(:default_client).and_return(client)
Expand All @@ -29,7 +42,7 @@ def initialize(message, cause)
payload: payload,
topic: "test_topic",
seed_brokers: "kafka://kafka:9092"
).and_return(true)
).and_return(delivery_report)
end

it "produces the payload via the client and returns true" do
Expand Down Expand Up @@ -70,15 +83,15 @@ def initialize(message, cause)

context "when payload is successfully delivered" do
before do
allow(client).to receive(:produce_async).with(
allow(client).to receive(:produce_sync).with(
payload: payload,
topic: "test_topic",
seed_brokers: "kafka://kafka:9092"
).and_return(true)
).and_return(delivery_report)
end

it "produces the payload via the client and returns true" do
expect(producer.async_publish!(payload, options)).to be(true)
expect(producer.sync_publish!(payload, options)).to be(true)
end
end

Expand All @@ -102,7 +115,7 @@ def initialize(message, cause)
payload: payload,
topic: "test_topic",
seed_brokers: "kafka://kafka:9092"
).and_return(true)
).and_return(delivery_handle)
end

it "produces the payload via the client and returns true" do
Expand Down Expand Up @@ -147,7 +160,7 @@ def initialize(message, cause)
payload: payload,
topic: "test_topic",
seed_brokers: "kafka://kafka:9092"
).and_return(true)
).and_return(delivery_handle)
end

it "produces the payload via the client and returns true" do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "ostruct"

describe Sbmt::KafkaProducer::Instrumentation::YabedaMetricsListener do
describe ".on_statistics_emitted" do
let(:base_rdkafka_stats) {
Expand Down
11 changes: 10 additions & 1 deletion spec/sbmt/kafka_producer/outbox_producer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@
end

describe "#sync_publish" do
let(:delivery_report) do
instance_double(Rdkafka::Producer::DeliveryReport,
error: nil,
label: nil,
offset: 0,
partition: 0,
topic_name: "my_topic")
end

it "calls client.produce_sync with payload and merged options" do
expect(client).to receive(:produce_sync).with(payload: payload, topic: topic, partition: 0)
expect(client).to receive(:produce_sync).with(payload: payload, topic: topic, partition: 0).and_return(delivery_report)
outbox_producer.sync_publish(payload, partition: 0)
end
end
Expand Down

0 comments on commit 228ade2

Please sign in to comment.