diff --git a/CHANGELOG.md b/CHANGELOG.md index d903508..cf9c271 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [2.2.0] - 2024-04-12 + +### Changed + +- Add logs with `offset`. + ## [2.1.0] - 2024-03-14 ### Changed diff --git a/lib/sbmt/kafka_producer/base_producer.rb b/lib/sbmt/kafka_producer/base_producer.rb index f901563..10eccd1 100644 --- a/lib/sbmt/kafka_producer/base_producer.rb +++ b/lib/sbmt/kafka_producer/base_producer.rb @@ -9,9 +9,10 @@ class BaseProducer option :topic def sync_publish!(payload, options = {}) - around_publish do + report = around_publish do client.produce_sync(payload: payload, **options.merge(topic: topic)) end + log_success(report) true end @@ -81,6 +82,10 @@ def log_error(error) ErrorTracker.error(error) end + def log_success(report) + logger.info "Message has been successfully sent to Kafka - topic: #{report.topic_name}, partition: #{report.partition}, offset: #{report.offset}" + end + def format_exception_error(error) text = "#{format_exception_error(error.cause)}. " if with_cause?(error) diff --git a/lib/sbmt/kafka_producer/version.rb b/lib/sbmt/kafka_producer/version.rb index ab92064..fa77b8e 100644 --- a/lib/sbmt/kafka_producer/version.rb +++ b/lib/sbmt/kafka_producer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaProducer - VERSION = "2.1.0" + VERSION = "2.2.0" end end diff --git a/spec/sbmt/kafka_producer/base_producer_spec.rb b/spec/sbmt/kafka_producer/base_producer_spec.rb index cc6bc9d..308bb36 100644 --- a/spec/sbmt/kafka_producer/base_producer_spec.rb +++ b/spec/sbmt/kafka_producer/base_producer_spec.rb @@ -15,6 +15,19 @@ def initialize(message, cause) let(:topic) { "test_topic" } let(:payload) { {message: "payload"} } let(:error) { WaterDrop::Errors::ProduceError } + let(:delivery_report) do + instance_double(Rdkafka::Producer::DeliveryReport, + error: nil, + label: nil, + offset: 0, + partition: 0, + topic_name: "my_topic") + end + let(:delivery_handle) do + instance_double(Rdkafka::Producer::DeliveryHandle, + label: nil, + wait: delivery_report) + end before do allow(Sbmt::KafkaProducer::KafkaClientFactory).to receive(:default_client).and_return(client) @@ -29,7 +42,7 @@ def initialize(message, cause) payload: payload, topic: "test_topic", seed_brokers: "kafka://kafka:9092" - ).and_return(true) + ).and_return(delivery_report) end it "produces the payload via the client and returns true" do @@ -70,15 +83,15 @@ def initialize(message, cause) context "when payload is successfully delivered" do before do - allow(client).to receive(:produce_async).with( + allow(client).to receive(:produce_sync).with( payload: payload, topic: "test_topic", seed_brokers: "kafka://kafka:9092" - ).and_return(true) + ).and_return(delivery_report) end it "produces the payload via the client and returns true" do - expect(producer.async_publish!(payload, options)).to be(true) + expect(producer.sync_publish!(payload, options)).to be(true) end end @@ -102,7 +115,7 @@ def initialize(message, cause) payload: payload, topic: "test_topic", seed_brokers: "kafka://kafka:9092" - ).and_return(true) + ).and_return(delivery_handle) end it "produces the payload via the client and returns true" do @@ -147,7 +160,7 @@ def initialize(message, cause) payload: payload, topic: "test_topic", seed_brokers: "kafka://kafka:9092" - ).and_return(true) + ).and_return(delivery_handle) end it "produces the payload via the client and returns true" do diff --git a/spec/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener_spec.rb b/spec/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener_spec.rb index e006a88..ce16e70 100644 --- a/spec/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener_spec.rb +++ b/spec/sbmt/kafka_producer/instrumentation/yabeda_metrics_listener_spec.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "ostruct" + describe Sbmt::KafkaProducer::Instrumentation::YabedaMetricsListener do describe ".on_statistics_emitted" do let(:base_rdkafka_stats) { diff --git a/spec/sbmt/kafka_producer/outbox_producer_spec.rb b/spec/sbmt/kafka_producer/outbox_producer_spec.rb index db33809..bd78d64 100644 --- a/spec/sbmt/kafka_producer/outbox_producer_spec.rb +++ b/spec/sbmt/kafka_producer/outbox_producer_spec.rb @@ -20,8 +20,17 @@ end describe "#sync_publish" do + let(:delivery_report) do + instance_double(Rdkafka::Producer::DeliveryReport, + error: nil, + label: nil, + offset: 0, + partition: 0, + topic_name: "my_topic") + end + it "calls client.produce_sync with payload and merged options" do - expect(client).to receive(:produce_sync).with(payload: payload, topic: topic, partition: 0) + expect(client).to receive(:produce_sync).with(payload: payload, topic: topic, partition: 0).and_return(delivery_report) outbox_producer.sync_publish(payload, partition: 0) end end