Skip to content

Commit

Permalink
CA-224730 back off exponentially after failed read from plugin
Browse files Browse the repository at this point in the history
This commit introduces a skip count for plugins. After a read error, the
skip count is incremented and the plugin is not read for that number of
read cycles. When reading succeeds, the skip is reset to 0. Currently,
at most 256 cycles are skipped and the skip count increases
exponentially from 1 with every read error.

Signed-off-by: Christian Lindig <[email protected]>
  • Loading branch information
lindig authored and robhoes committed Oct 11, 2016
1 parent 4542e92 commit 0e289ef
Showing 1 changed file with 64 additions and 37 deletions.
101 changes: 64 additions & 37 deletions rrdd/rrdd_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,28 @@ module Plugin = struct

module Make = functor(P: PLUGIN) -> struct
(* A type to represent a registered plugin. *)

(* 11 October 2016
* This module needs a re-write when the next major addition comes
* along :
* - it would be convenient, not to pass the uid in addition to the
* plugin around to facilitate error reporting
* - the back-off mechanism needs to be better encapsulated. In the
* ideal case, we can use a wrap() function that turns a reader
* that can fail into one that backs off in the presence of errors
* and retries.
* - The error reporting could be moved out of get_payload to the
* caller.
* - The lock-protected hash table could be made more abstract such
* that locking is not spread over the module.
* - Can the code for backwards compatibility be expunged?
*)

type plugin = {
info: P.info;
reader: Rrd_reader.reader;
mutable muted: bool; (** don't log errors if muted *)
mutable skip_init: int; (** initial value for skip after read err *)
mutable skip: int; (** number of cycles to skip b/f next read *)
}

(* A map storing currently registered plugins, and any data required to
Expand All @@ -415,36 +433,43 @@ module Plugin = struct
* conditions and data corruption. *)
let registered_m = Mutex.create ()

(* Helper function to find plugin info from a uid. *)
let find ~(uid: P.uid) : plugin =
try
Mutex.execute registered_m (fun () -> Hashtbl.find registered uid)
with Not_found ->
let msg = Printf.sprintf "failed to find plugin %s at %s"
(P.string_of_uid uid) __LOC__ in
error "internal error: %s" msg;
failwith msg

let mute uid plugin =
Mutex.execute registered_m (fun () -> plugin.muted <- true);
warn "muting plugin %s" (P.string_of_uid uid)

let unmute uid plugin =
if plugin.muted then begin
Mutex.execute registered_m (fun () -> plugin.muted <- false);
warn "unmuting plugin %s" (P.string_of_uid uid)
end else
()

let get_payload ~(uid: P.uid) : Rrd_protocol.payload =
let plugin = find uid in
(* we hit an error - increase skip count exponentially until max *)
let incr_skip_count uid plugin =
let skip_max = 256 in (* about 21.3 min on 5sec cycle *)
Mutex.execute registered_m (fun () ->
let skips = min (plugin.skip_init * 2) skip_max in
warn "setting skip-cycles-after-error for plugin %s to %d"
(P.string_of_uid uid) skips;
plugin.skip_init <- skips;
plugin.skip <- skips)

(* success - set skip to 0, reset initial value *)
let reset_skip_count uid plugin =
if plugin.skip_init > 1 then begin
warn "re-setting skip-cycles-after-error for plugin %s to 1"
(P.string_of_uid uid);
Mutex.execute registered_m (fun () ->
plugin.skip_init <- 1;
plugin.skip <- 0)
end

(* true, iff the plugin skips the next reading *)
let skip (uid, plugin) = plugin.skip > 0

(* we are skipping a reading *)
let decr_skip_count (uid, plugin as p) =
if skip p then
Mutex.execute registered_m (fun () ->
plugin.skip <- plugin.skip - 1)

let get_payload ~(uid: P.uid) plugin : Rrd_protocol.payload =
try
let payload = plugin.reader.Rrd_reader.read_payload () in
unmute uid plugin;
reset_skip_count uid plugin; (* reset skip counts *)
payload
with
| e when not plugin.muted ->
mute uid plugin;
| e ->
incr_skip_count uid plugin; (* increase skip count *)
let log e =
warn "Failed to process plugin: %s (%s)"
(P.string_of_uid uid)
Expand All @@ -461,8 +486,6 @@ module Plugin = struct
log e;
raise Read_error
end
| _ -> raise Rrd_protocol.Read_error (* don't log, we are muted *)


(* Returns the number of seconds until the next reading phase for the
* sampling frequency given at registration by the plugin with the specified
Expand All @@ -488,9 +511,10 @@ module Plugin = struct
if not (Hashtbl.mem registered uid) then
let reader = P.make_reader uid info (choose_protocol protocol) in
Hashtbl.add registered uid
{ info = info
; reader = reader
; muted = false
{ info = info
; reader = reader
; skip_init = 1
; skip = 0
}
);
next_reading ~uid ()
Expand All @@ -508,15 +532,18 @@ module Plugin = struct

(* Read, parse, and combine metrics from all registered plugins. *)
let read_stats () : (Rrd.ds_owner * Ds.ds) list =
let uids =
Mutex.execute registered_m (fun _ -> Hashtblext.fold_keys registered) in
let process_plugin acc uid =
let plugins = Mutex.execute registered_m
(fun _ -> Hashtblext.to_list registered) in
let process_plugin acc (uid, plugin) =
try
let payload = get_payload uid in
let payload = get_payload uid plugin in
List.rev_append payload.Rrd_protocol.datasources acc
with _ -> acc
in
List.fold_left process_plugin [] uids
List.iter decr_skip_count plugins;
plugins
|> List.filter (fun p -> not (skip p))
|> List.fold_left process_plugin []
end

module Local = Make(struct
Expand Down

0 comments on commit 0e289ef

Please sign in to comment.