Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow blocking the queue and retrying messages from the failed message's offset #103

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions lib/broadway_kafka/acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,20 @@ defmodule BroadwayKafka.Acknowledger do
@doc """
The ack callback. It simply sends messages to the annotated producer.
"""
@spec ack({pid, key}, [Broadway.Message.t()], [Broadway.Message.t()]) ::
{:ack, key, [non_neg_integer], [non_neg_integer]}
def ack({producer_pid, key}, successful, failed) do
offsets =
Enum.map(successful ++ failed, fn %{acknowledger: {_, _, %{offset: offset}}} -> offset end)
successful_offsets = fetch_offsets(successful)
failed_offsets = fetch_offsets(failed)

send(producer_pid, {:ack, key, Enum.sort(offsets)})
offsets = successful_offsets ++ failed_offsets

send(producer_pid, {:ack, key, Enum.sort(offsets), Enum.sort(failed_offsets)})
end

@spec fetch_offsets([Broadway.Message.t()]) :: [non_neg_integer]
defp fetch_offsets([]), do: []

defp fetch_offsets(messages),
do: Enum.map(messages, fn %{acknowledger: {_, _, %{offset: offset}}} -> offset end)
end
8 changes: 8 additions & 0 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ defmodule BroadwayKafka.BrodClient do

@default_offset_reset_policy :latest

@default_retry_on_failure false

@impl true
def init(opts) do
with {:ok, hosts} <- validate(opts, :hosts, required: true),
Expand All @@ -54,6 +56,8 @@ defmodule BroadwayKafka.BrodClient do
validate(opts, :offset_commit_on_ack, default: @default_offset_commit_on_ack),
{:ok, offset_reset_policy} <-
validate(opts, :offset_reset_policy, default: @default_offset_reset_policy),
{:ok, retry_on_failure} <-
validate(opts, :retry_on_failure, default: @default_retry_on_failure),
{:ok, group_config} <- validate_group_config(opts),
{:ok, fetch_config} <- validate_fetch_config(opts),
{:ok, client_config} <- validate_client_config(opts) do
Expand All @@ -66,6 +70,7 @@ defmodule BroadwayKafka.BrodClient do
reconnect_timeout: reconnect_timeout,
offset_commit_on_ack: offset_commit_on_ack,
offset_reset_policy: offset_reset_policy,
retry_on_failure: retry_on_failure,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config
Expand Down Expand Up @@ -229,6 +234,9 @@ defmodule BroadwayKafka.BrodClient do
defp validate_option(:offset_commit_on_ack, value) when not is_boolean(value),
do: validation_error(:offset_commit_on_ack, "a boolean", value)

defp validate_option(:retry_on_failure, value) when not is_boolean(value),
do: validation_error(:retry_on_failure, "a boolean", value)

defp validate_option(:offset_reset_policy, value)
when value not in @offset_reset_policy_values do
validation_error(
Expand Down
1 change: 1 addition & 0 deletions lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule BroadwayKafka.KafkaClient do
group_id: :brod.group_id(),
reconnect_timeout: non_neg_integer,
offset_commit_on_ack: boolean,
retry_on_failure: boolean,
topics: [:brod.topic()],
group_config: keyword,
client_config: keyword
Expand Down
20 changes: 20 additions & 0 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ defmodule BroadwayKafka.Producer do
offset in Kafka or if the current offset has expired. Possible values are `:earliest` or
`:latest`. Default is `:latest`.

* `:retry_on_failure` - Optional.

* `:group_config` - Optional. A list of options used to configure the group
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available
options.
Expand Down Expand Up @@ -406,6 +408,24 @@ defmodule BroadwayKafka.Producer do
{:noreply, [], new_state}
end

@impl GenStage
def handle_info({:ack, key, offsets, []}, state), do: handle_info({:ack, key, offsets}, state)

def handle_info({:ack, key, offsets, failed_offsets}, state) do
%{acks: acks, config: config} = state

if config.retry_on_failure do
%{^key => {_pending, last_offset, seen}} = acks

min_failed_offset = Enum.min(failed_offsets)
new_offset = Enum.min([last_offset, min_failed_offset])

{:noreply, [], %{state | acks: %{key => {[], new_offset, seen}}}}
else
handle_info({:ack, key, offsets}, state)
end
end

def handle_info({:DOWN, _ref, _, {client_id, _}, _reason}, %{client_id: client_id} = state) do
state.client.stop_group_coordinator(state.group_coordinator)
state = reset_buffer(state)
Expand Down
28 changes: 27 additions & 1 deletion test/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ defmodule BroadwayKafka.ProducerTest do
defmodule Forwarder do
use Broadway

def handle_message(_, %Broadway.Message{data: "failed"} = message, %{test_pid: _test_pid}) do
Broadway.Message.failed(message, "this message failed")
end

def handle_message(_, message, %{test_pid: test_pid}) do
meta = message.metadata

Expand Down Expand Up @@ -606,11 +610,32 @@ defmodule BroadwayKafka.ProducerTest do
stop_broadway(pid)
end

test "with :retry_on_failure, retries from the failed offset" do
{:ok, message_server} = MessageServer.start_link()
{:ok, pid} = start_broadway(message_server, retry_on_failure: true)

producer = get_producer(pid)
put_assignments(producer, [[topic: "topic", partition: 0]])

assert_receive {:messages_fetched, 0}

MessageServer.push_messages(message_server, ["success", "failed"],
topic: "topic",
partition: 0
)

assert_receive {:messages_fetched, 2}
assert_receive {:messages_fetched, 1}

stop_broadway(pid)
end

defp start_broadway(message_server, opts \\ []) do
producers_concurrency = Keyword.get(opts, :producers_concurrency, 1)
processors_concurrency = Keyword.get(opts, :processors_concurrency, 1)
batchers_concurrency = Keyword.get(opts, :batchers_concurrency)
ack_raises_on_offset = Keyword.get(opts, :ack_raises_on_offset, nil)
retry_on_failure = Keyword.get(opts, :retry_on_failure, false)

batchers =
if batchers_concurrency do
Expand All @@ -634,7 +659,8 @@ defmodule BroadwayKafka.ProducerTest do
receive_interval: 0,
reconnect_timeout: 10,
max_bytes: 10,
ack_raises_on_offset: ack_raises_on_offset
ack_raises_on_offset: ack_raises_on_offset,
retry_on_failure: retry_on_failure
]},
concurrency: producers_concurrency
],
Expand Down