From 41eb1c0eb62d189022009facedf53bcfb91d9574 Mon Sep 17 00:00:00 2001 From: Misha Merkushin Date: Mon, 26 Aug 2024 17:46:08 +0300 Subject: [PATCH] [DEX-2238] feat: add support for waterdrop 2.7 --- .github/workflows/tests.yml | 2 +- .gitlab-ci.yml | 2 +- Appraisals | 4 +- CHANGELOG.md | 14 +++++++ Gemfile | 3 -- LICENSE | 2 +- README.md | 20 +++++----- dip.yml | 6 +-- .../install/templates/kafka_producer.yml | 10 ++--- lib/sbmt/kafka_producer/config/kafka.rb | 31 +++++++++----- lib/sbmt/kafka_producer/config/producer.rb | 4 +- .../kafka_producer/kafka_client_factory.rb | 26 +++++------- lib/sbmt/kafka_producer/version.rb | 2 +- sbmt-kafka_producer.gemspec | 16 ++++---- spec/internal/config/kafka_producer.yml | 12 +++--- .../config/kafka_config_spec.rb | 3 +- .../kafka_producer/config/producer_spec.rb | 9 +++-- .../kafka_client_factory_spec.rb | 40 +++++++++---------- 18 files changed, 111 insertions(+), 95 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index ef37493..6012930 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,7 +31,7 @@ jobs: strategy: fail-fast: false matrix: - ruby: [ '2.7', '3.0', '3.1', '3.2', '3.3' ] + ruby: [ '3.0', '3.1', '3.2', '3.3' ] env: RUBY_VERSION: ${{ matrix.ruby }} name: Ruby ${{ matrix.ruby }} diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 41d64ea..a03d513 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -15,7 +15,7 @@ tests: image: ${BUILD_CONF_HARBOR_REGISTRY}/dhub/library/ruby:$RUBY_VERSION parallel: matrix: - - RUBY_VERSION: ['2.7', '3.0', '3.1', '3.2', '3.3'] + - RUBY_VERSION: ['3.0', '3.1', '3.2', '3.3'] before_script: - bin/setup script: diff --git a/Appraisals b/Appraisals index d3b3a6b..093b1f9 100644 --- a/Appraisals +++ b/Appraisals @@ -3,10 +3,10 @@ # See compatibility table at https://www.fastruby.io/blog/ruby/rails/versions/compatibility-table.html versions_map = { - "6.0" => %w[2.7], "6.1" => %w[2.7 3.0], "7.0" => %w[3.1], - "7.1" => %w[3.2, 3.3] + "7.1" => %w[3.2], + "7.2" => %w[3.3] } current_ruby_version = RUBY_VERSION.split(".").first(2).join(".") diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ea058d..9d74825 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed +## [3.0.0] - 2024-08-27 + +## BREAKING + +- Drop support for Ruby 2.7 +- Drop support for Rails 6.0 +- Add support for Waterdrop 2.7 +- `wait_timeout` configuration no longer deeded +- All time-related values are now configured in milliseconds: `connect_timeout`, `ack_timeout`, `retry_backoff`, `max_wait_timeout`, `wait_on_queue_full_timeout` + +## Added + +- Add `message_timeout` configuration + ## [2.2.3] - 2024-06-20 ### Fixed diff --git a/Gemfile b/Gemfile index 49aee2f..be173b2 100644 --- a/Gemfile +++ b/Gemfile @@ -3,6 +3,3 @@ source "https://rubygems.org" gemspec - -# FIXME: remove this after drop support for Ruby 2.7 -gem "ffi", "< 1.17" diff --git a/LICENSE b/LICENSE index ff7215f..b23893d 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2024 SberMarket Tech +Copyright (c) 2024 Kuper Tech Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 39d8344..1ab773c 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ [![Gem Version](https://badge.fury.io/rb/sbmt-kafka_producer.svg)](https://badge.fury.io/rb/sbmt-kafka_producer) -[![Build Status](https://github.com/SberMarket-Tech/sbmt-kafka_producer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/SberMarket-Tech/sbmt-kafka_producer/actions?query=branch%3Amaster) +[![Build Status](https://github.com/Kuper-Tech/sbmt-kafka_producer/actions/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Kuper-Tech/sbmt-kafka_producer/actions?query=branch%3Amaster) # Sbmt-KafkaProducer -This gem is used to produce Kafka messages. It is a wrapper over the [waterdrop](https://github.com/karafka/waterdrop) gem, and it is recommended for use as a transport with the [sbmt-outbox](https://github.com/SberMarket-Tech/sbmt-outbox) gem. +This gem is used to produce Kafka messages. It is a wrapper over the [waterdrop](https://github.com/karafka/waterdrop) gem, and it is recommended for use as a transport with the [sbmt-outbox](https://github.com/Kuper-Tech/sbmt-outbox) gem. ## Installation @@ -21,7 +21,7 @@ bundle install ## Demo -Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps +Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/Kuper-Tech/outbox-example-apps ## Auto configuration @@ -49,7 +49,7 @@ As the result, a sync producer will be created. ### Outbox producer -To generate an Outbox producer for use with Gem [sbmt-Outbox](https://github.com/SberMarket-Tech/sbmt-outbox), run the following command: +To generate an Outbox producer for use with Gem [sbmt-Outbox](https://github.com/Kuper-Tech/sbmt-outbox), run the following command: ```shell rails g kafka_producer:outbox_producer SomeOutboxItem @@ -66,20 +66,20 @@ default: &default # see more options at https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb wait_on_queue_full: true max_payload_size: 1000012 - max_wait_timeout: 5 - wait_timeout: 0.005 + max_wait_timeout_ms: 60000 auth: kind: plaintext kafka: servers: "kafka:9092" # required max_retries: 2 # optional, default: 2 required_acks: -1 # optional, default: -1 - ack_timeout: 1 # in seconds, optional, default: 1 - retry_backoff: 1 # in seconds, optional, default: 1 - connect_timeout: 1 # in seconds, optional, default: 1 + ack_timeout: 1000 # in milliseconds, optional, default: 1000 + retry_backoff: 1000 # in milliseconds, optional, default: 1000 + connect_timeout: 2000 # in milliseconds, optional, default: 2000 + message_timeout: 55000 # in milliseconds, optional, default: 55000 kafka_config: # low-level custom Kafka options queue.buffering.max.messages: 1 - queue.buffering.max.ms: 10_000 + queue.buffering.max.ms: 10000 development: <<: *default diff --git a/dip.yml b/dip.yml index 9111233..2c08de2 100644 --- a/dip.yml +++ b/dip.yml @@ -1,7 +1,7 @@ version: '7' environment: - RUBY_VERSION: '3.2' + RUBY_VERSION: '3.3' compose: files: @@ -35,14 +35,14 @@ interaction: subcommands: all: command: bundle exec appraisal rspec - rails-6.0: - command: bundle exec appraisal rails-6.0 rspec rails-6.1: command: bundle exec appraisal rails-6.1 rspec rails-7.0: command: bundle exec appraisal rails-7.0 rspec rails-7.1: command: bundle exec appraisal rails-7.1 rspec + rails-7.2: + command: bundle exec appraisal rails-7.2 rspec rubocop: description: Run Ruby linter diff --git a/lib/generators/kafka_producer/install/templates/kafka_producer.yml b/lib/generators/kafka_producer/install/templates/kafka_producer.yml index f14db41..e9cfd02 100644 --- a/lib/generators/kafka_producer/install/templates/kafka_producer.yml +++ b/lib/generators/kafka_producer/install/templates/kafka_producer.yml @@ -2,8 +2,7 @@ default: &default deliver: true wait_on_queue_full: true max_payload_size: 1000012 - max_wait_timeout: 5 - wait_timeout: 0.005 + max_wait_timeout: 60000 ignore_kafka_error: true auth: @@ -14,11 +13,12 @@ default: &default kafka: servers: "kafka:9092" + connect_timeout: 2000 + message_timeout: 55000 + ack_timeout: 10000 + retry_backoff: 10000 max_retries: 2 required_acks: -1 - ack_timeout: 1 - retry_backoff: 1 - connect_timeout: 1 development: <<: *default diff --git a/lib/sbmt/kafka_producer/config/kafka.rb b/lib/sbmt/kafka_producer/config/kafka.rb index e6549d0..910f17f 100644 --- a/lib/sbmt/kafka_producer/config/kafka.rb +++ b/lib/sbmt/kafka_producer/config/kafka.rb @@ -9,27 +9,36 @@ class Kafka < Dry::Struct # srv1:port1,srv2:port2,... SERVERS_REGEXP = /^[a-z\d.\-:]+(,[a-z\d.\-:]+)*$/.freeze + # https://github.com/karafka/waterdrop/blob/master/lib/waterdrop/config.rb + KAFKA_CONFIG_KEYS_REMAP = { + servers: :"bootstrap.servers", + connect_timeout: :"socket.connection.setup.timeout.ms", + message_timeout: :"message.timeout.ms", + ack_timeout: :"request.timeout.ms", + retry_backoff: :"retry.backoff.ms", + max_retries: :"message.send.max.retries", + required_acks: :"request.required.acks" + } + attribute :servers, Sbmt::KafkaProducer::Types::String.constrained(format: SERVERS_REGEXP) # defaults are rdkafka's # see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md - attribute :connect_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1) - attribute :ack_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1) + attribute :connect_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(2000) + attribute :ack_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1000) + attribute :retry_backoff, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1000) + attribute :message_timeout, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(55000) attribute :required_acks, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(-1) attribute :max_retries, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(2) - attribute :retry_backoff, Sbmt::KafkaProducer::Types::Coercible::Integer.optional.default(1) attribute :kafka_config, Sbmt::KafkaProducer::Types::ConfigAttrs.optional.default({}.freeze) def to_kafka_options - kafka_config.merge( - "bootstrap.servers": servers, - "socket.connection.setup.timeout.ms": connect_timeout.to_f * 1000, - "request.timeout.ms": ack_timeout.to_f * 1000, - "request.required.acks": required_acks, - "message.send.max.retries": max_retries, - "retry.backoff.ms": retry_backoff.to_f * 1000 - ).symbolize_keys + cfg = KAFKA_CONFIG_KEYS_REMAP.each_with_object({}) do |(key, kafka_key), hash| + hash[kafka_key] = self[key] + end + + kafka_config.symbolize_keys.merge(cfg) end end end diff --git a/lib/sbmt/kafka_producer/config/producer.rb b/lib/sbmt/kafka_producer/config/producer.rb index 69a8ff8..54e35c1 100644 --- a/lib/sbmt/kafka_producer/config/producer.rb +++ b/lib/sbmt/kafka_producer/config/producer.rb @@ -28,7 +28,7 @@ def instance config_name :kafka_producer attr_config :ignore_kafka_error, :deliver, :wait_on_queue_full, - :max_payload_size, :max_wait_timeout, :wait_timeout, + :max_payload_size, :max_wait_timeout, :wait_on_queue_full_timeout, auth: {}, kafka: {}, logger_class: "::Sbmt::KafkaProducer::Logger", @@ -37,7 +37,7 @@ def instance coerce_types ignore_kafka_error: :boolean, deliver: :boolean, wait_on_queue_full: :boolean, max_payload_size: :integer, max_wait_timeout: :integer, - wait_timeout: :float, wait_on_queue_full_timeout: :float + wait_on_queue_full_timeout: :integer coerce_types kafka: coerce_to(Kafka) coerce_types auth: coerce_to(Auth) diff --git a/lib/sbmt/kafka_producer/kafka_client_factory.rb b/lib/sbmt/kafka_producer/kafka_client_factory.rb index 7b5fada..e926300 100644 --- a/lib/sbmt/kafka_producer/kafka_client_factory.rb +++ b/lib/sbmt/kafka_producer/kafka_client_factory.rb @@ -15,13 +15,13 @@ def default_client end end - def build(kafka = {}) - return default_client if kafka.empty? + def build(kafka_options = {}) + return default_client if kafka_options.empty? - fetch_client(kafka) do + fetch_client(kafka_options) do ConnectionPool::Wrapper.new do WaterDrop::Producer.new do |config| - configure_client(config, kafka) + configure_client(config, kafka_options) end end end @@ -29,8 +29,8 @@ def build(kafka = {}) private - def fetch_client(kafka) - key = Digest::SHA1.hexdigest(Marshal.dump(kafka)) + def fetch_client(kafka_options) + key = Digest::SHA1.hexdigest(Marshal.dump(kafka_options)) return CLIENTS_REGISTRY[key] if CLIENTS_REGISTRY.key?(key) CLIENTS_REGISTRY_MUTEX.synchronize do @@ -49,22 +49,16 @@ def configure_client(kafka_config, kafka_options = {}) kafka_config.wait_on_queue_full = config.wait_on_queue_full if config.wait_on_queue_full.present? kafka_config.max_payload_size = config.max_payload_size if config.max_payload_size.present? kafka_config.max_wait_timeout = config.max_wait_timeout if config.max_wait_timeout.present? - kafka_config.wait_timeout = config.wait_timeout if config.wait_timeout.present? kafka_config.wait_on_queue_full_timeout = config.wait_on_queue_full_timeout if config.wait_on_queue_full_timeout.present? kafka_config.monitor.subscribe(config.metrics_listener_class.classify.constantize.new) end def custom_kafka_config(kafka_options) - result = {} - - result["socket.connection.setup.timeout.ms"] = kafka_options["connect_timeout"].to_f * 1000 if kafka_options.key?("connect_timeout") - result["request.timeout.ms"] = kafka_options["ack_timeout"].to_f * 1000 if kafka_options.key?("ack_timeout") - result["request.required.acks"] = kafka_options["required_acks"] if kafka_options.key?("required_acks") - result["message.send.max.retries"] = kafka_options["max_retries"] if kafka_options.key?("max_retries") - result["retry.backoff.ms"] = kafka_options["retry_backoff"].to_f * 1000 if kafka_options.key?("retry_backoff") - - result + kafka_options = kafka_options.symbolize_keys + short_options = kafka_options.extract!(*Config::Kafka::KAFKA_CONFIG_KEYS_REMAP.keys) + cfg = short_options.transform_keys(Config::Kafka::KAFKA_CONFIG_KEYS_REMAP) + kafka_options.merge!(cfg) end def config diff --git a/lib/sbmt/kafka_producer/version.rb b/lib/sbmt/kafka_producer/version.rb index ad5b252..dd7c233 100644 --- a/lib/sbmt/kafka_producer/version.rb +++ b/lib/sbmt/kafka_producer/version.rb @@ -2,6 +2,6 @@ module Sbmt module KafkaProducer - VERSION = "2.2.3" + VERSION = "3.0.0" end end diff --git a/sbmt-kafka_producer.gemspec b/sbmt-kafka_producer.gemspec index 83ca7c4..2619371 100644 --- a/sbmt-kafka_producer.gemspec +++ b/sbmt-kafka_producer.gemspec @@ -6,12 +6,12 @@ Gem::Specification.new do |spec| spec.name = "sbmt-kafka_producer" spec.license = "MIT" spec.version = Sbmt::KafkaProducer::VERSION - spec.authors = ["Sbermarket Ruby-Platform Team"] + spec.authors = ["Kuper Ruby-Platform Team"] spec.summary = "Ruby gem for producing Kafka messages" spec.description = "This gem is used for producing Kafka messages. It represents a wrapper over Waterdrop gem and is recommended for using as a transport with sbmt-outbox" - spec.homepage = "https://github.com/SberMarket-Tech/sbmt-kafka_producer" - spec.required_ruby_version = ">= 2.7.0" + spec.homepage = "https://github.com/Kuper-Tech/sbmt-kafka_producer" + spec.required_ruby_version = ">= 3.0.0" spec.metadata["allowed_push_host"] = "https://rubygems.org" @@ -32,12 +32,12 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency "anyway_config", "~> 2.4" - spec.add_dependency "connection_pool" + spec.add_dependency "connection_pool", "~> 2.0" spec.add_dependency "dry-initializer", "~> 3.0" - spec.add_dependency "dry-struct" - spec.add_dependency "waterdrop", "~> 2.5", "< 2.7" # BREAKING: undefined method `wait_timeout='' for # 1.5" + spec.add_dependency "waterdrop", "~> 2.7", "< 2.8" spec.add_dependency "zeitwerk", "~> 2.6" - spec.add_dependency "yabeda", ">= 0.11" + spec.add_dependency "yabeda", "~> 0.11" spec.add_development_dependency "appraisal", ">= 2.4" spec.add_development_dependency "bundler", ">= 2.1" @@ -47,7 +47,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "opentelemetry-common", ">= 0.17.0" spec.add_development_dependency "opentelemetry-instrumentation-base", ">= 0.17.0" spec.add_development_dependency "rake", ">= 13.0" - spec.add_development_dependency "rails", ">= 6.0" + spec.add_development_dependency "rails", ">= 6.1" spec.add_development_dependency "rspec", ">= 3.0" spec.add_development_dependency "rspec_junit_formatter", ">= 0.6" spec.add_development_dependency "rspec-rails", ">= 4.0" diff --git a/spec/internal/config/kafka_producer.yml b/spec/internal/config/kafka_producer.yml index 4060c05..8944071 100644 --- a/spec/internal/config/kafka_producer.yml +++ b/spec/internal/config/kafka_producer.yml @@ -2,8 +2,7 @@ default: &default deliver: true wait_on_queue_full: true max_payload_size: 1000012 - max_wait_timeout: 5 - wait_timeout: 0.005 + max_wait_timeout: 60000 ignore_kafka_error: true auth: kind: plaintext @@ -11,12 +10,13 @@ default: &default servers: "kafka:9092" max_retries: 2 required_acks: -1 - ack_timeout: 1 - retry_backoff: 1 - connect_timeout: 1 + ack_timeout: 1000 + retry_backoff: 1000 + connect_timeout: 2000 + message_timeout: 55000 kafka_config: queue.buffering.max.messages: 1 - queue.buffering.max.ms: 10_000 + queue.buffering.max.ms: 10000 development: <<: *default diff --git a/spec/sbmt/kafka_producer/config/kafka_config_spec.rb b/spec/sbmt/kafka_producer/config/kafka_config_spec.rb index 7430aa5..8d00fc3 100644 --- a/spec/sbmt/kafka_producer/config/kafka_config_spec.rb +++ b/spec/sbmt/kafka_producer/config/kafka_config_spec.rb @@ -3,7 +3,8 @@ describe Sbmt::KafkaProducer::Config::Kafka, type: :config do let(:kafka_config_defaults) do { - "socket.connection.setup.timeout.ms": 1000, + "socket.connection.setup.timeout.ms": 2000, + "message.timeout.ms": 55000, "request.timeout.ms": 1000, "request.required.acks": -1, "message.send.max.retries": 2, diff --git a/spec/sbmt/kafka_producer/config/producer_spec.rb b/spec/sbmt/kafka_producer/config/producer_spec.rb index d0c19aa..23b38f7 100644 --- a/spec/sbmt/kafka_producer/config/producer_spec.rb +++ b/spec/sbmt/kafka_producer/config/producer_spec.rb @@ -19,7 +19,8 @@ "request.timeout.ms": 1000, "request.required.acks": -1, "message.send.max.retries": 2, - "retry.backoff.ms": 1000 + "retry.backoff.ms": 1000, + "message.timeout.ms": 55000 } end @@ -35,9 +36,9 @@ # loaded from kafka_producer.yml "message.send.max.retries": 2, "request.required.acks": -1, - "request.timeout.ms": 1000.0, - "retry.backoff.ms": 1000.0, - "socket.connection.setup.timeout.ms": 1000.0, + "request.timeout.ms": 1000, + "retry.backoff.ms": 1000, + "socket.connection.setup.timeout.ms": 2000, # arbitrary parameters for section kafka_config file kafka_producer.yml "queue.buffering.max.messages": 1, "queue.buffering.max.ms": 10000 diff --git a/spec/sbmt/kafka_producer/kafka_client_factory_spec.rb b/spec/sbmt/kafka_producer/kafka_client_factory_spec.rb index 93c2b3b..0c888d0 100644 --- a/spec/sbmt/kafka_producer/kafka_client_factory_spec.rb +++ b/spec/sbmt/kafka_producer/kafka_client_factory_spec.rb @@ -29,27 +29,27 @@ end describe ".configure_client" do - let(:logger) { instance_double(Logger) } - - before do - allow(Sbmt::KafkaProducer).to receive(:logger).and_return(logger) - end - it "configures the client with the correct options" do - seed_brokers = "kafka://localhost:9092" - connect_timeout = "10s" - - ConnectionPool::Wrapper.new do |wrapper| - wrapper.with do |producer| - configure_client(producer) - expect(producer.config.deliver).to be(true) - expect(producer.config.logger).to eq(logger) - expect(producer.config.wait_on_queue_full).to be(false) - expect(producer.config.kafka).to include( - "bootstrap.servers": seed_brokers.sub("kafka://", ""), - "producer.connect.timeout.ms": connect_timeout.delete_suffix("s").to_i * 1000 - ) - end + # rubocop:disable Style/HashSyntax + kafka_opts = { + message_timeout: 54000, + "queue.buffering.max.messages": 14, + "ack_timeout" => 1555, + "queue.buffering.max.ms" => 1345 + } + # rubocop:enable Style/HashSyntax + + described_class.build(kafka_opts).with do |producer| + expect(producer.config.deliver).to be(true) + expect(producer.config.logger).to be_instance_of(Sbmt::KafkaProducer::Logger) + expect(producer.config.wait_on_queue_full).to be(true) + expect(producer.config.max_wait_timeout).to eq(60000) + expect(producer.config.kafka).to include( + "bootstrap.servers": "kafka:9092", + "message.timeout.ms": 54000, + "request.timeout.ms": 1555, + "queue.buffering.max.ms": 1345 + ) end end end