Skip to content

Commit

Permalink
Add a new testcase reproducing the problem
Browse files Browse the repository at this point in the history
Fixes #369
  • Loading branch information
k32 committed Mar 13, 2020
1 parent 943ddb6 commit eb725c0
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 7 deletions.
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
%% -*- mode:erlang -*-
{deps, [ {supervisor3, "1.1.8"}
, {kafka_protocol, "2.3.3"}
]}.
Expand Down
2 changes: 1 addition & 1 deletion rebar.config.script
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ IsRebar3 = erlang:function_exported(rebar3, main, 1),
DocoptUrl = "https://github.com/zmstone/docopt-erl.git",
DocOptTag = "0.1.3",
DocoptDep = {docopt, {git, DocoptUrl, {branch, DocOptTag}}},
Snabbkaffe = {snabbkaffe, {git, "https://github.com/klarna/snabbkaffe.git", {branch, "master"}}}, % TODO: use release tag
Snabbkaffe = {snabbkaffe, "0.3.3"},
Profiles =
{profiles, [
{brod_cli, [
Expand Down
1 change: 1 addition & 0 deletions scripts/setup-test-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ create_topic "brod-group-subscriber-1" 3 2
create_topic "brod-group-subscriber-2" 3 2
create_topic "brod-group-subscriber-3" 3 2
create_topic "brod-group-subscriber-4"
create_topic "brod-group-subscriber-5"
create_topic "brod-demo-topic-subscriber" 3 2
create_topic "brod-demo-group-subscriber-koc" 3 2
create_topic "brod-demo-group-subscriber-loc" 3 2
Expand Down
45 changes: 44 additions & 1 deletion test/brod_group_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
, t_async_commit/1
, t_consumer_crash/1
, t_assign_partitions_handles_updating_state/1
, t_subscribe_same_partition/1
]).


Expand Down Expand Up @@ -85,6 +86,7 @@ groups() ->
, t_2_members_one_partition
, t_async_commit
, t_assign_partitions_handles_updating_state
, t_subscribe_same_partition
]}
].

Expand All @@ -106,6 +108,7 @@ common_init_per_testcase(Case, Config) ->
ok = brod:start_producer(ClientId, ?TOPIC2, _ProducerConfig = []),
ok = brod:start_producer(ClientId, ?TOPIC3, _ProducerConfig = []),
ok = brod:start_producer(ClientId, ?TOPIC4, _ProducerConfig = []),
ok = brod:start_producer(ClientId, ?TOPIC5, _ProducerConfig = []),
Config.

common_end_per_testcase(Case, Config) when is_list(Config) ->
Expand All @@ -123,18 +126,20 @@ common_end_per_testcase(Case, Config) when is_list(Config) ->

%%%_* Group subscriber callbacks ===============================================

init(_GroupId, Config) ->
init(GroupId, Config) ->
IsAsyncAck = maps:get(async_ack, Config, false),
IsAsyncCommit = maps:get(async_commit, Config, false),
IsAssignPartitions = maps:get(assign_partitions, Config, false),
{ok, #state{ is_async_ack = IsAsyncAck
, is_async_commit = IsAsyncCommit
, is_assign_partitions = IsAssignPartitions
, group_id = GroupId
}}.

handle_message(Topic, Partition, Message,
#state{ is_async_ack = IsAsyncAck
, is_async_commit = IsAsyncCommit
, group_id = GroupId
} = State) ->
#kafka_message{ offset = Offset
, value = Value
Expand All @@ -145,6 +150,7 @@ handle_message(Topic, Partition, Message,
, offset => Offset
, value => Value
, worker => self()
, group_id => GroupId
}),
case {IsAsyncAck, IsAsyncCommit} of
{true, _} -> {ok, State};
Expand Down Expand Up @@ -396,6 +402,43 @@ t_2_members_one_partition(Config) when is_list(Config) ->
)
end).

%% Check that two group subscrivers can consume from the same
%% topic-partition.
t_subscribe_same_partition(Config) when is_list(Config) ->
Topic = ?TOPIC5,
Group1 = <<"subscribe_same_partition_grp1">>,
Group2 = <<"subscribe_same_partition_grp2">>,
MaxSeqNo = 100,
InitArgs = #{},
L = payloads(Config),
SendFun =
fun(Value) ->
ok = brod:produce_sync(?CLIENT_ID, Topic, 0, <<>>, Value)
end,
?check_trace(
#{ timeout => 5000 },
%% Run stage:
begin
%% Start two subscribers consuming from the same partition:
{ok, _, _} = start_subscriber( [{group_id, Group1} | Config]
, [Topic]
, InitArgs
),
{ok, _, _} = start_subscriber( [{group_id, Group2} | Config]
, [Topic]
, InitArgs
),
%% Send messages:
timer:sleep(20000),
lists:foreach(SendFun, L)
end,
fun(_, Trace) ->
TraceSub1 = [Msg || Msg = #{group_id := GID} <- Trace, GID == Group1],
TraceSub2 = [Msg || Msg = #{group_id := GID} <- Trace, GID == Group2],
check_all_messages_were_received_once(TraceSub1, L),
check_all_messages_were_received_once(TraceSub2, L)
end).

t_async_commit({init, Config}) ->
meck:new(brod_group_coordinator,
[passthrough, no_passthrough_cover, no_history]),
Expand Down
2 changes: 2 additions & 0 deletions test/brod_group_subscriber_test.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
, is_assign_partitions
, topic
, partition
, group_id
}).

-define(MSG(Ref, Pid, Topic, Partition, Offset, Value),
Expand All @@ -15,5 +16,6 @@
-define(TOPIC2, <<"brod-group-subscriber-2">>).
-define(TOPIC3, <<"brod-group-subscriber-3">>).
-define(TOPIC4, <<"brod-group-subscriber-4">>).
-define(TOPIC5, <<"brod-group-subscriber-5">>).
-define(GROUP_ID, list_to_binary(atom_to_list(?MODULE))).
-define(config(Name), proplists:get_value(Name, Config)).
16 changes: 11 additions & 5 deletions test/brod_test_group_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
]).

init(InitInfo, Config) ->
#{topic := Topic, partition := Partition} = InitInfo,
#{ topic := Topic
, partition := Partition
, group_id := GroupId
} = InitInfo,
IsAsyncAck = maps:get(async_ack, Config, false),
IsAsyncCommit = maps:get(async_commit, Config, false),
IsAssignPartitions = maps:get(assign_partitions, Config, false),
Expand All @@ -42,13 +45,15 @@ init(InitInfo, Config) ->
, is_assign_partitions = IsAssignPartitions
, topic = Topic
, partition = Partition
, group_id = GroupId
}}.

handle_message(Message,
#state{ is_async_ack = IsAsyncAck
, is_async_commit = IsAsyncCommit
, topic = Topic
, partition = Partition
#state{ is_async_ack = IsAsyncAck
, is_async_commit = IsAsyncCommit
, topic = Topic
, partition = Partition
, group_id = GroupId
} = State) ->
#kafka_message{ offset = Offset
, value = Value
Expand All @@ -59,6 +64,7 @@ handle_message(Message,
, offset => Offset
, value => Value
, worker => self()
, group_id => GroupId
}),
case {IsAsyncAck, IsAsyncCommit} of
{true, _} -> {ok, State};
Expand Down

0 comments on commit eb725c0

Please sign in to comment.