Skip to content

Commit

Permalink
Merge pull request #505 from rabbitmq/log-effect-2-fix
Browse files Browse the repository at this point in the history
Fix log/2 effect regression.
  • Loading branch information
kjnilsson authored Jan 31, 2025
2 parents f85db78 + 51523d6 commit 130c01a
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 46 deletions.
7 changes: 3 additions & 4 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
ok = prim_file:delete(filename:join(Dir, WalFile)),
T2 = erlang:monotonic_time(),
Diff = erlang:convert_time_unit(T2 - T1, native, millisecond),
?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file ~s in ~bms",
?DEBUG("segment_writer in '~w': completed flush of ~b writers from wal file "
"~s in ~bms",
[System, length(RangesList), WalFile, Diff]),
{noreply, State};
handle_cast({truncate_segments, Who, {_Range, Name} = SegRef},
Expand Down Expand Up @@ -315,8 +316,6 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}},
segment_conf = SegConf} = State) ->
Dir = filename:join(DataDir, binary_to_list(ServerUId)),
StartIdx = start_index(ServerUId, StartIdx0),
?DEBUG("~s ~s ~b:~b to ~b",
[?FUNCTION_NAME, ServerUId, StartIdx0, StartIdx, EndIdx]),
case open_file(Dir, SegConf) of
enoent ->
?DEBUG("segment_writer: skipping segment as directory ~ts does "
Expand All @@ -325,7 +324,7 @@ flush_mem_table_range(ServerUId, {Tid, {StartIdx0, EndIdx}},
%% clean up the tables for this process
[];
Segment0 ->
case append_to_segment(ServerUId, Tid, StartIdx0, EndIdx,
case append_to_segment(ServerUId, Tid, StartIdx, EndIdx,
Segment0, State) of
undefined ->
?WARN("segment_writer: skipping segments for ~w as
Expand Down
71 changes: 36 additions & 35 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1383,16 +1383,6 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
ok
end,
{State, Actions};
handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType,
State, Actions)
when LogOrLogExt == log orelse LogOrLogExt == log_ext ->
case can_execute_locally(RaftState, Node, State) of
true ->
handle_effect(RaftState, {LogOrLogExt, Idxs, Fun}, EvtType,
State, Actions);
false ->
{State, Actions}
end;
handle_effect(leader, {append, Cmd}, _EvtType, State, Actions) ->
Evt = {command, normal, {'$usr', Cmd, noreply}},
{State, [{next_event, cast, Evt} | Actions]};
Expand All @@ -1405,33 +1395,29 @@ handle_effect(_RaftState, {try_append, Cmd, ReplyMode}, _EvtType, State, Actions
%% limited to the leader
Evt = {command, normal, {'$usr', Cmd, ReplyMode}},
{State, [{next_event, cast, Evt} | Actions]};
handle_effect(RaftState, {log, Idxs, Fun}, EvtType,
State = #state{server_state = SS0}, Actions)
when is_list(Idxs) ->
%% Useful to implement a batch send of data obtained from the log.
%% 1) Retrieve all data from the list of indexes
{ok, Cmds, SS} = ra_server:log_read(Idxs, SS0),
%% 2) Apply the fun to the list of commands as a whole and deal with any effects
case Fun(Cmds) of
[] ->
{State#state{server_state = SS}, Actions};
Effects ->
%% recurse with the new effects
handle_effects(RaftState, Effects, EvtType,
State#state{server_state = SS}, Actions)
end;
handle_effect(RaftState, {log_ext, Idxs, Fun}, EvtType,
State = #state{server_state = SS0}, Actions)
when is_list(Idxs) ->
ReadState = ra_server:log_partial_read(Idxs, SS0),
case Fun(ReadState) of
[] ->
{State, Actions};
Effects ->
%% recurse with the new effects
handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType,
State0, Actions)
when LogOrLogExt == log orelse
LogOrLogExt == log_ext ->
case can_execute_locally(RaftState, Node, State0) of
true ->
{Effects, State} = handle_log_effect(LogOrLogExt, Idxs, Fun,
State0),
handle_effects(RaftState, Effects, EvtType,
State, Actions)
State, Actions);
false ->
{State0, Actions}
end;
handle_effect(leader, {LogOrLogExt, Idxs, Fun}, EvtType, State0, Actions)
when is_list(Idxs) andalso
(LogOrLogExt == log orelse
LogOrLogExt == log_ext) ->
%% Useful to implement a batch send of data obtained from the log.
%% 1) Retrieve all data from the list of indexes
{Effects, State} = handle_log_effect(LogOrLogExt, Idxs, Fun,
State0),
handle_effects(leader, Effects, EvtType,
State, Actions);
handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) ->
{_, ServerState, Effects} = ra_server:handle_aux(RaftState, cast, Cmd,
State0#state.server_state),
Expand Down Expand Up @@ -2122,3 +2108,18 @@ schedule_command_flush(Delayed) ->
_ ->
ok = gen_statem:cast(self(), flush_commands)
end.


handle_log_effect(log, Idxs, Fun,
#state{server_state = SS0} = State)
when is_list(Idxs) ->
%% Useful to implement a batch send of data obtained from the log.
%% 1) Retrieve all data from the list of indexes
{ok, Cmds, SS} = ra_server:log_read(Idxs, SS0),
%% 2) Apply the fun to the list of commands as a whole and deal with any effects
{Fun(Cmds), State#state{server_state = SS}};
handle_log_effect(log_ext, Idxs, Fun,
#state{server_state = SS0} = State)
when is_list(Idxs) ->
ReadState = ra_server:log_partial_read(Idxs, SS0),
{Fun(ReadState), State}.
26 changes: 19 additions & 7 deletions test/ra_machine_int_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -610,19 +610,31 @@ log_effect(Config) ->
{log, lists:reverse(Idxs),
fun (Cmds) ->
Datas = [D || {_, D} <- Cmds],
[{send_msg, Self,
{datas, Datas}}]
%% using a plain send here to
%% ensure this effect is only
%% evaluated on leader
Self ! {datas, Datas},
[]
end}}
end),
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(server_id, Config),
ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]),
{ok, _, ServerId} = ra:process_command(ServerId, {cmd, <<"hi1">>}),
ServerId1 = ?config(server_id, Config),
ServerId2 = ?config(server_id2, Config),
ServerId3 = ?config(server_id3, Config),
ok = start_cluster(ClusterName, {module, Mod, #{}},
[ServerId1, ServerId2, ServerId3]),
{ok, _, ServerId} = ra:process_command(ServerId1, {cmd, <<"hi1">>}),
{ok, _, ServerId} = ra:process_command(ServerId, {cmd, <<"hi2">>}),
{ok, _, ServerId} = ra:process_command(ServerId, get_data),
{ok, ok, ServerId} = ra:process_command(ServerId, get_data),
receive
{datas, [<<"hi1">>, <<"hi2">>]} ->
ok
receive
{datas, [<<"hi1">>, <<"hi2">>]} ->
ct:fail("unexpected second log effect execution"),
ok
after 100 ->
ok
end
after 5000 ->
flush(),
exit(data_timeout)
Expand Down

0 comments on commit 130c01a

Please sign in to comment.