From 0e289efcd5a59398bcadc29750c1b84bdbe7339b Mon Sep 17 00:00:00 2001 From: Christian Lindig Date: Tue, 11 Oct 2016 11:13:07 +0100 Subject: [PATCH] CA-224730 back off exponentially after failed read from plugin This commit introduces a skip count for plugins. After a read error, the skip count is incremented and the plugin is not read for that number of read cycles. When reading succeeds, the skip is reset to 0. Currently, at most 256 cycles are skipped and the skip count increases exponentially from 1 with every read error. Signed-off-by: Christian Lindig --- rrdd/rrdd_server.ml | 101 ++++++++++++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 37 deletions(-) diff --git a/rrdd/rrdd_server.ml b/rrdd/rrdd_server.ml index 20e4b382e..63f11a853 100644 --- a/rrdd/rrdd_server.ml +++ b/rrdd/rrdd_server.ml @@ -401,10 +401,28 @@ module Plugin = struct module Make = functor(P: PLUGIN) -> struct (* A type to represent a registered plugin. *) + + (* 11 October 2016 + * This module needs a re-write when the next major addition comes + * along : + * - it would be convenient, not to pass the uid in addition to the + * plugin around to facilitate error reporting + * - the back-off mechanism needs to be better encapsulated. In the + * ideal case, we can use a wrap() function that turns a reader + * that can fail into one that backs off in the presence of errors + * and retries. + * - The error reporting could be moved out of get_payload to the + * caller. + * - The lock-protected hash table could be made more abstract such + * that locking is not spread over the module. + * - Can the code for backwards compatibility be expunged? + *) + type plugin = { info: P.info; reader: Rrd_reader.reader; - mutable muted: bool; (** don't log errors if muted *) + mutable skip_init: int; (** initial value for skip after read err *) + mutable skip: int; (** number of cycles to skip b/f next read *) } (* A map storing currently registered plugins, and any data required to @@ -415,36 +433,43 @@ module Plugin = struct * conditions and data corruption. *) let registered_m = Mutex.create () - (* Helper function to find plugin info from a uid. *) - let find ~(uid: P.uid) : plugin = - try - Mutex.execute registered_m (fun () -> Hashtbl.find registered uid) - with Not_found -> - let msg = Printf.sprintf "failed to find plugin %s at %s" - (P.string_of_uid uid) __LOC__ in - error "internal error: %s" msg; - failwith msg - - let mute uid plugin = - Mutex.execute registered_m (fun () -> plugin.muted <- true); - warn "muting plugin %s" (P.string_of_uid uid) - - let unmute uid plugin = - if plugin.muted then begin - Mutex.execute registered_m (fun () -> plugin.muted <- false); - warn "unmuting plugin %s" (P.string_of_uid uid) - end else - () - - let get_payload ~(uid: P.uid) : Rrd_protocol.payload = - let plugin = find uid in + (* we hit an error - increase skip count exponentially until max *) + let incr_skip_count uid plugin = + let skip_max = 256 in (* about 21.3 min on 5sec cycle *) + Mutex.execute registered_m (fun () -> + let skips = min (plugin.skip_init * 2) skip_max in + warn "setting skip-cycles-after-error for plugin %s to %d" + (P.string_of_uid uid) skips; + plugin.skip_init <- skips; + plugin.skip <- skips) + + (* success - set skip to 0, reset initial value *) + let reset_skip_count uid plugin = + if plugin.skip_init > 1 then begin + warn "re-setting skip-cycles-after-error for plugin %s to 1" + (P.string_of_uid uid); + Mutex.execute registered_m (fun () -> + plugin.skip_init <- 1; + plugin.skip <- 0) + end + + (* true, iff the plugin skips the next reading *) + let skip (uid, plugin) = plugin.skip > 0 + + (* we are skipping a reading *) + let decr_skip_count (uid, plugin as p) = + if skip p then + Mutex.execute registered_m (fun () -> + plugin.skip <- plugin.skip - 1) + + let get_payload ~(uid: P.uid) plugin : Rrd_protocol.payload = try let payload = plugin.reader.Rrd_reader.read_payload () in - unmute uid plugin; + reset_skip_count uid plugin; (* reset skip counts *) payload with - | e when not plugin.muted -> - mute uid plugin; + | e -> + incr_skip_count uid plugin; (* increase skip count *) let log e = warn "Failed to process plugin: %s (%s)" (P.string_of_uid uid) @@ -461,8 +486,6 @@ module Plugin = struct log e; raise Read_error end - | _ -> raise Rrd_protocol.Read_error (* don't log, we are muted *) - (* Returns the number of seconds until the next reading phase for the * sampling frequency given at registration by the plugin with the specified @@ -488,9 +511,10 @@ module Plugin = struct if not (Hashtbl.mem registered uid) then let reader = P.make_reader uid info (choose_protocol protocol) in Hashtbl.add registered uid - { info = info - ; reader = reader - ; muted = false + { info = info + ; reader = reader + ; skip_init = 1 + ; skip = 0 } ); next_reading ~uid () @@ -508,15 +532,18 @@ module Plugin = struct (* Read, parse, and combine metrics from all registered plugins. *) let read_stats () : (Rrd.ds_owner * Ds.ds) list = - let uids = - Mutex.execute registered_m (fun _ -> Hashtblext.fold_keys registered) in - let process_plugin acc uid = + let plugins = Mutex.execute registered_m + (fun _ -> Hashtblext.to_list registered) in + let process_plugin acc (uid, plugin) = try - let payload = get_payload uid in + let payload = get_payload uid plugin in List.rev_append payload.Rrd_protocol.datasources acc with _ -> acc in - List.fold_left process_plugin [] uids + List.iter decr_skip_count plugins; + plugins + |> List.filter (fun p -> not (skip p)) + |> List.fold_left process_plugin [] end module Local = Make(struct