|
| 1 | +%% This Source Code Form is subject to the terms of the Mozilla Public |
| 2 | +%% License, v. 2.0. If a copy of the MPL was not distributed with this |
| 3 | +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. |
| 4 | +%% |
| 5 | +%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” |
| 6 | +%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. |
| 7 | +%% |
| 8 | + |
| 9 | +-module(rabbit_chaos). |
| 10 | + |
| 11 | +-behaviour(gen_server). |
| 12 | + |
| 13 | +-export([start_link/0]). |
| 14 | +-export([ |
| 15 | + begin_default/0, |
| 16 | + begin_default/1, |
| 17 | + begin_chaos/1 |
| 18 | + ]). |
| 19 | + |
| 20 | + |
| 21 | +-export([init/1, |
| 22 | + handle_call/3, |
| 23 | + handle_cast/2, |
| 24 | + handle_info/2, |
| 25 | + terminate/2, |
| 26 | + code_change/3]). |
| 27 | + |
| 28 | +-type chaos_event() :: {Name :: atom(), |
| 29 | + {kill_named_proc, Process :: atom()} | |
| 30 | + kill_quorum_queue_member}. |
| 31 | + |
| 32 | +-type chaos_cfg() :: #{interval := non_neg_integer(), |
| 33 | + events := [chaos_event()]}. |
| 34 | +-define(SERVER, ?MODULE). |
| 35 | + |
| 36 | +-record(?MODULE, {cfg :: chaos_cfg()}). |
| 37 | + |
| 38 | +-export_type([chaos_cfg/0, |
| 39 | + chaos_event/0]). |
| 40 | + |
| 41 | +%%---------------------------------------------------------------------------- |
| 42 | +%% A chaos server that can be enabled to create periodic configurable chaos |
| 43 | +%% inside the broker. |
| 44 | +%%---------------------------------------------------------------------------- |
| 45 | + |
| 46 | +begin_default() -> |
| 47 | + begin_default(20000). |
| 48 | + |
| 49 | +begin_default(Interval) -> |
| 50 | + Events = [ |
| 51 | + {kill_qq_wal, 5, {kill_named_proc, ra_log_wal}}, |
| 52 | + {kill_qq_seq_writer, 1, {kill_named_proc, ra_log_segment_writer}}, |
| 53 | + {kill_qq_member, 1, kill_ra_member}, |
| 54 | + {kill_qq_member, 1, restart_ra_member}, |
| 55 | + {multi_kill_qq_wal, 1, |
| 56 | + {multi, 3, 100, {kill_named_proc, ra_log_wal}}} |
| 57 | + ], |
| 58 | + begin_chaos(#{interval => Interval, |
| 59 | + events => Events}). |
| 60 | + |
| 61 | +begin_chaos(Cfg) -> |
| 62 | + gen_server:call(?SERVER, {begin_chaos, Cfg}). |
| 63 | + |
| 64 | +-spec start_link() -> rabbit_types:ok_pid_or_error(). |
| 65 | +start_link() -> |
| 66 | + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). |
| 67 | + |
| 68 | +init([]) -> |
| 69 | + process_flag(trap_exit, true), |
| 70 | + Cfg = #{interval => 20000, |
| 71 | + events => []}, |
| 72 | + {ok, #?MODULE{cfg = Cfg}}. |
| 73 | + |
| 74 | +handle_call({begin_chaos, #{interval := Interval} = Cfg}, _From, State) -> |
| 75 | + _ = erlang:send_after(Interval, self(), do_chaos), |
| 76 | + {reply, ok, State#?MODULE{cfg = Cfg}}. |
| 77 | + |
| 78 | +handle_cast(_Request, State) -> |
| 79 | + {noreply, State}. |
| 80 | + |
| 81 | +handle_info(do_chaos, #?MODULE{cfg = #{interval := Interval} = Cfg} = State) -> |
| 82 | + Events = maps:get(events, Cfg), |
| 83 | + {Name, _, Event} = pick_event(Events), |
| 84 | + do_event(Name, Event), |
| 85 | + _ = erlang:send_after(Interval, self(), do_chaos), |
| 86 | + {noreply, State}; |
| 87 | +handle_info(_, #?MODULE{} = State) -> |
| 88 | + {noreply, State}. |
| 89 | + |
| 90 | +terminate(_Reason, #?MODULE{}) -> |
| 91 | + ok. |
| 92 | + |
| 93 | +code_change(_OldVsn, State, _Extra) -> |
| 94 | + {ok, State}. |
| 95 | + |
| 96 | +%% internal |
| 97 | + |
| 98 | +do_event(Name, {kill_named_proc, ProcName}) -> |
| 99 | + rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]), |
| 100 | + catch exit(whereis(ProcName), chaos), |
| 101 | + ok; |
| 102 | +do_event(Name, kill_ra_member) -> |
| 103 | + rabbit_log:info("~s: doing event ~s...", [?MODULE, Name]), |
| 104 | + Procs = ets:tab2list(ra_leaderboard), |
| 105 | + At = rand:uniform(length(Procs)), |
| 106 | + Selected = lists:nth(At, Procs), |
| 107 | + catch exit(whereis(element(1, Selected)), kill), |
| 108 | + ok; |
| 109 | +do_event(Name, restart_ra_member = Type) -> |
| 110 | + rabbit_log:info("~s: doing event ~s of type ~s", [?MODULE, Name, Type]), |
| 111 | + Queues = rabbit_amqqueue:list_local_quorum_queues(), |
| 112 | + At = rand:uniform(length(Queues)), |
| 113 | + Selected = lists:nth(At, Queues), |
| 114 | + ServerId = amqqueue:get_pid(Selected), |
| 115 | + ra:stop_server(quorum_queues, ServerId), |
| 116 | + ra:restart_server(quorum_queues, ServerId), |
| 117 | + ok; |
| 118 | +do_event(Name, {multi, Num, Interval, Event}) -> |
| 119 | + rabbit_log:info("~s: doing multi event ~s...", |
| 120 | + [?MODULE, Name]), |
| 121 | + catch [begin |
| 122 | + do_event(Name, Event), |
| 123 | + timer:sleep(Interval) |
| 124 | + end || _ <- lists:seq(1, Num)], |
| 125 | + ok. |
| 126 | + |
| 127 | +pick_event(Events) -> |
| 128 | + TotalWeight = lists:sum([element(2, E) || E <- Events]), |
| 129 | + Pick = rand:uniform(TotalWeight), |
| 130 | + event_at_weight_point(Pick, 0, Events). |
| 131 | + |
| 132 | + |
| 133 | +event_at_weight_point(_Pick, _Cur, []) -> |
| 134 | + undefined; |
| 135 | +event_at_weight_point(Pick, Cur0, [{_, W, _} = E | Events]) -> |
| 136 | + Cur = Cur0 + W, |
| 137 | + case Pick =< Cur of |
| 138 | + true -> |
| 139 | + E; |
| 140 | + false -> |
| 141 | + event_at_weight_point(Pick, Cur, Events) |
| 142 | + end. |
0 commit comments