Skip to content

Commit

Permalink
Merge branch 'feat/DEX-2119/clients' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2119] feat: memoize kafka clients

Closes DEX-2119

See merge request nstmrt/rubygems/sbmt-kafka_producer!29
  • Loading branch information
bibendi committed Mar 18, 2024
2 parents 45be3d2 + 915504f commit 1cacbb5
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 6 deletions.
6 changes: 4 additions & 2 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
stages:
- test
include:
- project: "nstmrt/rubygems/templates"
ref: master
file: "build-rubygems.yml"

lint:
stage: test
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [2.1.0] - 2024-03-14

### Changed

- Memoize kafka clients. Add a registry with them to KafkaClientFactory.

## [2.0.0] - 2024-01-29

### Changed
Expand Down
21 changes: 18 additions & 3 deletions lib/sbmt/kafka_producer/kafka_client_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
module Sbmt
module KafkaProducer
class KafkaClientFactory
CLIENTS_REGISTRY_MUTEX = Mutex.new
CLIENTS_REGISTRY = {}

class << self
def default_client
@default_client ||= ConnectionPool::Wrapper.new do
Expand All @@ -15,15 +18,27 @@ def default_client
def build(kafka = {})
return default_client if kafka.empty?

ConnectionPool::Wrapper.new do
WaterDrop::Producer.new do |config|
configure_client(config, kafka)
fetch_client(kafka) do
ConnectionPool::Wrapper.new do
WaterDrop::Producer.new do |config|
configure_client(config, kafka)
end
end
end
end

private

def fetch_client(kafka)
key = Digest::SHA1.hexdigest(Marshal.dump(kafka))
return CLIENTS_REGISTRY[key] if CLIENTS_REGISTRY.key?(key)

CLIENTS_REGISTRY_MUTEX.synchronize do
return CLIENTS_REGISTRY[key] if CLIENTS_REGISTRY.key?(key)
CLIENTS_REGISTRY[key] = yield
end
end

def configure_client(kafka_config, kafka_options = {})
kafka_config.logger = config.logger_class.classify.constantize.new
kafka_config.kafka = config.to_kafka_options.merge(custom_kafka_config(kafka_options)).symbolize_keys
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_producer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaProducer
VERSION = "2.0.0"
VERSION = "2.1.0"
end
end
5 changes: 5 additions & 0 deletions spec/sbmt/kafka_producer/kafka_client_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
it "returns a ConnectionPool::Wrapper with a WaterDrop::Producer inside" do
expect(described_class.build(kafka_config).with { |producer| producer }).to be_instance_of(WaterDrop::Producer)
end

it "always returns the same client" do
client = described_class.build(kafka_config)
expect(described_class.build(kafka_config).object_id).to eq client.object_id
end
end
end

Expand Down

0 comments on commit 1cacbb5

Please sign in to comment.