Skip to content

Commit

Permalink
Merge branch 'feat/waterdrop-2-7' into 'master'
Browse files Browse the repository at this point in the history
[DEX-2238] feat: add support for waterdrop 2.7

See merge request nstmrt/rubygems/sbmt-kafka_producer!35
  • Loading branch information
bibendi committed Aug 29, 2024
2 parents 7c0cef2 + 41eb1c0 commit 693123f
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby: [ '2.7', '3.0', '3.1', '3.2', '3.3' ]
ruby: [ '3.0', '3.1', '3.2', '3.3' ]
env:
RUBY_VERSION: ${{ matrix.ruby }}
name: Ruby ${{ matrix.ruby }}
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tests:
image: ${BUILD_CONF_HARBOR_REGISTRY}/dhub/library/ruby:$RUBY_VERSION
parallel:
matrix:
- RUBY_VERSION: ['2.7', '3.0', '3.1', '3.2', '3.3']
- RUBY_VERSION: ['3.0', '3.1', '3.2', '3.3']
before_script:
- bin/setup
script:
Expand Down
4 changes: 2 additions & 2 deletions Appraisals
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# See compatibility table at https://www.fastruby.io/blog/ruby/rails/versions/compatibility-table.html

versions_map = {
"6.0" => %w[2.7],
"6.1" => %w[2.7 3.0],
"7.0" => %w[3.1],
"7.1" => %w[3.2, 3.3]
"7.1" => %w[3.2],
"7.2" => %w[3.3]
}

current_ruby_version = RUBY_VERSION.split(".").first(2).join(".")
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Fixed

## [3.0.0] - 2024-08-27

## BREAKING

- Drop support for Ruby 2.7
- Drop support for Rails 6.0
- Add support for Waterdrop 2.7
- `wait_timeout` configuration no longer deeded
- All time-related values are now configured in milliseconds: `connect_timeout`, `ack_timeout`, `retry_backoff`, `max_wait_timeout`, `wait_on_queue_full_timeout`

## Added

- Add `message_timeout` configuration

## [2.2.3] - 2024-06-20

### Fixed
Expand Down
3 changes: 0 additions & 3 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,3 @@
source "https://rubygems.org"

gemspec

# FIXME: remove this after drop support for Ruby 2.7
gem "ffi", "< 1.17"
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2024 SberMarket Tech
Copyright (c) 2024 Kuper Tech

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[![Gem Version](https://badge.fury.io/rb/sbmt-kafka_producer.svg)](https://badge.fury.io/rb/sbmt-kafka_producer)
[![Build Status](https://github.com/SberMarket-Tech/sbmt-kafka_producer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/SberMarket-Tech/sbmt-kafka_producer/actions?query=branch%3Amaster)
[![Build Status](https://github.com/Kuper-Tech/sbmt-kafka_producer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Kuper-Tech/sbmt-kafka_producer/actions?query=branch%3Amaster)

# Sbmt-KafkaProducer

This gem is used to produce Kafka messages. It is a wrapper over the [waterdrop](https://github.com/karafka/waterdrop) gem, and it is recommended for use as a transport with the [sbmt-outbox](https://github.com/SberMarket-Tech/sbmt-outbox) gem.
This gem is used to produce Kafka messages. It is a wrapper over the [waterdrop](https://github.com/karafka/waterdrop) gem, and it is recommended for use as a transport with the [sbmt-outbox](https://github.com/Kuper-Tech/sbmt-outbox) gem.

## Installation

Expand All @@ -21,7 +21,7 @@ bundle install

## Demo

Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps

## Auto configuration

Expand Down Expand Up @@ -49,7 +49,7 @@ As the result, a sync producer will be created.

### Outbox producer

To generate an Outbox producer for use with Gem [sbmt-Outbox](https://github.com/SberMarket-Tech/sbmt-outbox), run the following command:
To generate an Outbox producer for use with Gem [sbmt-Outbox](https://github.com/Kuper-Tech/sbmt-outbox), run the following command:

```shell
rails g kafka_producer:outbox_producer SomeOutboxItem
Expand All @@ -66,20 +66,20 @@ default: &default
# see more options at https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb
wait_on_queue_full: true
max_payload_size: 1000012
max_wait_timeout: 5
wait_timeout: 0.005
max_wait_timeout_ms: 60000
auth:
kind: plaintext
kafka:
servers: "kafka:9092" # required
max_retries: 2 # optional, default: 2
required_acks: -1 # optional, default: -1
ack_timeout: 1 # in seconds, optional, default: 1
retry_backoff: 1 # in seconds, optional, default: 1
connect_timeout: 1 # in seconds, optional, default: 1
ack_timeout: 1000 # in milliseconds, optional, default: 1000
retry_backoff: 1000 # in milliseconds, optional, default: 1000
connect_timeout: 2000 # in milliseconds, optional, default: 2000
message_timeout: 55000 # in milliseconds, optional, default: 55000
kafka_config: # low-level custom Kafka options
queue.buffering.max.messages: 1
queue.buffering.max.ms: 10_000
queue.buffering.max.ms: 10000

development:
<<: *default
Expand Down
6 changes: 3 additions & 3 deletions dip.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '7'

environment:
RUBY_VERSION: '3.2'
RUBY_VERSION: '3.3'

compose:
files:
Expand Down Expand Up @@ -35,14 +35,14 @@ interaction:
subcommands:
all:
command: bundle exec appraisal rspec
rails-6.0:
command: bundle exec appraisal rails-6.0 rspec
rails-6.1:
command: bundle exec appraisal rails-6.1 rspec
rails-7.0:
command: bundle exec appraisal rails-7.0 rspec
rails-7.1:
command: bundle exec appraisal rails-7.1 rspec
rails-7.2:
command: bundle exec appraisal rails-7.2 rspec

rubocop:
description: Run Ruby linter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ default: &default
deliver: true
wait_on_queue_full: true
max_payload_size: 1000012
max_wait_timeout: 5
wait_timeout: 0.005
max_wait_timeout: 60000
ignore_kafka_error: true

auth:
Expand All @@ -14,11 +13,12 @@ default: &default

kafka:
servers: "kafka:9092"
connect_timeout: 2000
message_timeout: 55000
ack_timeout: 10000
retry_backoff: 10000
max_retries: 2
required_acks: -1
ack_timeout: 1
retry_backoff: 1
connect_timeout: 1

development:
<<: *default
Expand Down
31 changes: 20 additions & 11 deletions lib/sbmt/kafka_producer/config/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,36 @@ class Kafka < Dry::Struct
# srv1:port1,srv2:port2,...
SERVERS_REGEXP = /^[a-z\d.\-:]+(,[a-z\d.\-:]+)*$/.freeze

# https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb
KAFKA_CONFIG_KEYS_REMAP = {
servers: :"bootstrap.servers",
connect_timeout: :"socket.connection.setup.timeout.ms",
message_timeout: :"message.timeout.ms",
ack_timeout: :"request.timeout.ms",
retry_backoff: :"retry.backoff.ms",
max_retries: :"message.send.max.retries",
required_acks: :"request.required.acks"
}

attribute :servers, Sbmt::KafkaProducer::Types::String.constrained(format: SERVERS_REGEXP)

# defaults are rdkafka's
# see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
attribute :connect_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1)
attribute :ack_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1)
attribute :connect_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(2000)
attribute :ack_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1000)
attribute :retry_backoff, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1000)
attribute :message_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(55000)
attribute :required_acks, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(-1)
attribute :max_retries, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(2)
attribute :retry_backoff, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1)

attribute :kafka_config, Sbmt::KafkaProducer::Types::ConfigAttrs.optional.default({}.freeze)

def to_kafka_options
kafka_config.merge(
"bootstrap.servers": servers,
"socket.connection.setup.timeout.ms": connect_timeout.to_f * 1000,
"request.timeout.ms": ack_timeout.to_f * 1000,
"request.required.acks": required_acks,
"message.send.max.retries": max_retries,
"retry.backoff.ms": retry_backoff.to_f * 1000
).symbolize_keys
cfg = KAFKA_CONFIG_KEYS_REMAP.each_with_object({}) do |(key, kafka_key), hash|
hash[kafka_key] = self[key]
end

kafka_config.symbolize_keys.merge(cfg)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/sbmt/kafka_producer/config/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def instance
config_name :kafka_producer

attr_config :ignore_kafka_error, :deliver, :wait_on_queue_full,
:max_payload_size, :max_wait_timeout, :wait_timeout,
:max_payload_size, :max_wait_timeout,
:wait_on_queue_full_timeout,
auth: {}, kafka: {},
logger_class: "::Sbmt::KafkaProducer::Logger",
Expand All @@ -37,7 +37,7 @@ def instance
coerce_types ignore_kafka_error: :boolean,
deliver: :boolean, wait_on_queue_full: :boolean,
max_payload_size: :integer, max_wait_timeout: :integer,
wait_timeout: :float, wait_on_queue_full_timeout: :float
wait_on_queue_full_timeout: :integer
coerce_types kafka: coerce_to(Kafka)
coerce_types auth: coerce_to(Auth)

Expand Down
26 changes: 10 additions & 16 deletions lib/sbmt/kafka_producer/kafka_client_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@ def default_client
end
end

def build(kafka = {})
return default_client if kafka.empty?
def build(kafka_options = {})
return default_client if kafka_options.empty?

fetch_client(kafka) do
fetch_client(kafka_options) do
ConnectionPool::Wrapper.new do
WaterDrop::Producer.new do |config|
configure_client(config, kafka)
configure_client(config, kafka_options)
end
end
end
end

private

def fetch_client(kafka)
key = Digest::SHA1.hexdigest(Marshal.dump(kafka))
def fetch_client(kafka_options)
key = Digest::SHA1.hexdigest(Marshal.dump(kafka_options))
return CLIENTS_REGISTRY[key] if CLIENTS_REGISTRY.key?(key)

CLIENTS_REGISTRY_MUTEX.synchronize do
Expand All @@ -49,22 +49,16 @@ def configure_client(kafka_config, kafka_options = {})
kafka_config.wait_on_queue_full = config.wait_on_queue_full if config.wait_on_queue_full.present?
kafka_config.max_payload_size = config.max_payload_size if config.max_payload_size.present?
kafka_config.max_wait_timeout = config.max_wait_timeout if config.max_wait_timeout.present?
kafka_config.wait_timeout = config.wait_timeout if config.wait_timeout.present?
kafka_config.wait_on_queue_full_timeout = config.wait_on_queue_full_timeout if config.wait_on_queue_full_timeout.present?

kafka_config.monitor.subscribe(config.metrics_listener_class.classify.constantize.new)
end

def custom_kafka_config(kafka_options)
result = {}

result["socket.connection.setup.timeout.ms"] = kafka_options["connect_timeout"].to_f * 1000 if kafka_options.key?("connect_timeout")
result["request.timeout.ms"] = kafka_options["ack_timeout"].to_f * 1000 if kafka_options.key?("ack_timeout")
result["request.required.acks"] = kafka_options["required_acks"] if kafka_options.key?("required_acks")
result["message.send.max.retries"] = kafka_options["max_retries"] if kafka_options.key?("max_retries")
result["retry.backoff.ms"] = kafka_options["retry_backoff"].to_f * 1000 if kafka_options.key?("retry_backoff")

result
kafka_options = kafka_options.symbolize_keys
short_options = kafka_options.extract!(*Config::Kafka::KAFKA_CONFIG_KEYS_REMAP.keys)
cfg = short_options.transform_keys(Config::Kafka::KAFKA_CONFIG_KEYS_REMAP)
kafka_options.merge!(cfg)
end

def config
Expand Down
2 changes: 1 addition & 1 deletion lib/sbmt/kafka_producer/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Sbmt
module KafkaProducer
VERSION = "2.2.3"
VERSION = "3.0.0"
end
end
16 changes: 8 additions & 8 deletions sbmt-kafka_producer.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ Gem::Specification.new do |spec|
spec.name = "sbmt-kafka_producer"
spec.license = "MIT"
spec.version = Sbmt::KafkaProducer::VERSION
spec.authors = ["Sbermarket Ruby-Platform Team"]
spec.authors = ["Kuper Ruby-Platform Team"]

spec.summary = "Ruby gem for producing Kafka messages"
spec.description = "This gem is used for producing Kafka messages. It represents a wrapper over Waterdrop gem and is recommended for using as a transport with sbmt-outbox"
spec.homepage = "https://github.com/SberMarket-Tech/sbmt-kafka_producer"
spec.required_ruby_version = ">= 2.7.0"
spec.homepage = "https://github.com/Kuper-Tech/sbmt-kafka_producer"
spec.required_ruby_version = ">= 3.0.0"

spec.metadata["allowed_push_host"] = "https://rubygems.org"

Expand All @@ -32,12 +32,12 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency "anyway_config", "~> 2.4"
spec.add_dependency "connection_pool"
spec.add_dependency "connection_pool", "~> 2.0"
spec.add_dependency "dry-initializer", "~> 3.0"
spec.add_dependency "dry-struct"
spec.add_dependency "waterdrop", "~> 2.5", "< 2.7" # BREAKING: undefined method `wait_timeout='' for #<Karafka::Core::Configurable::Node`
spec.add_dependency "dry-struct", "~> 1.5"
spec.add_dependency "waterdrop", "~> 2.7", "< 2.8"
spec.add_dependency "zeitwerk", "~> 2.6"
spec.add_dependency "yabeda", ">= 0.11"
spec.add_dependency "yabeda", "~> 0.11"

spec.add_development_dependency "appraisal", ">= 2.4"
spec.add_development_dependency "bundler", ">= 2.1"
Expand All @@ -47,7 +47,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "opentelemetry-common", ">= 0.17.0"
spec.add_development_dependency "opentelemetry-instrumentation-base", ">= 0.17.0"
spec.add_development_dependency "rake", ">= 13.0"
spec.add_development_dependency "rails", ">= 6.0"
spec.add_development_dependency "rails", ">= 6.1"
spec.add_development_dependency "rspec", ">= 3.0"
spec.add_development_dependency "rspec_junit_formatter", ">= 0.6"
spec.add_development_dependency "rspec-rails", ">= 4.0"
Expand Down
12 changes: 6 additions & 6 deletions spec/internal/config/kafka_producer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ default: &default
deliver: true
wait_on_queue_full: true
max_payload_size: 1000012
max_wait_timeout: 5
wait_timeout: 0.005
max_wait_timeout: 60000
ignore_kafka_error: true
auth:
kind: plaintext
kafka:
servers: "kafka:9092"
max_retries: 2
required_acks: -1
ack_timeout: 1
retry_backoff: 1
connect_timeout: 1
ack_timeout: 1000
retry_backoff: 1000
connect_timeout: 2000
message_timeout: 55000
kafka_config:
queue.buffering.max.messages: 1
queue.buffering.max.ms: 10_000
queue.buffering.max.ms: 10000

development:
<<: *default
Expand Down
3 changes: 2 additions & 1 deletion spec/sbmt/kafka_producer/config/kafka_config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
describe Sbmt::KafkaProducer::Config::Kafka, type: :config do
let(:kafka_config_defaults) do
{
"socket.connection.setup.timeout.ms": 1000,
"socket.connection.setup.timeout.ms": 2000,
"message.timeout.ms": 55000,
"request.timeout.ms": 1000,
"request.required.acks": -1,
"message.send.max.retries": 2,
Expand Down
Loading

0 comments on commit 693123f

Please sign in to comment.