Skip to content

Commit 1f269dd

Browse files
authored
Refactor Xapi_event (#6306)
A code clarification effort, consisting of: - Accumulate lists of event-related objects within a record, instead of a tuple. Using a record permits functional record update, which means we avoid citing lists we have not changed. - Remove `last_generation`: this mutable variable is updated in a few places and complicates the code. Instead of relying it on being in scope (and mutated in other places), we explicitly pass it in and then thread the update to its value through the retry loop. - Factor out a routine to convert `(table, obj, time)` entries into event records, defining the accumulation of `add`, `mod`, and `del` events in terms of it: message events remain special cased because their contents are not in the database. This avoids duplicated code.
2 parents f146978 + 5ea74d9 commit 1f269dd

File tree

1 file changed

+100
-104
lines changed

1 file changed

+100
-104
lines changed

ocaml/xapi/xapi_event.ml

Lines changed: 100 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,59 @@ let rec next ~__context =
525525
else
526526
rpc_of_events relevant
527527

528+
type time = Xapi_database.Db_cache_types.Time.t
529+
530+
type entry = {table: string; obj: string; time: time}
531+
532+
type acc = {
533+
creates: entry list
534+
; mods: entry list
535+
; deletes: entry list
536+
; last: time
537+
}
538+
539+
let collect_events (subs, tables, last_generation) acc table =
540+
let open Xapi_database in
541+
let open Db_cache_types in
542+
let table_value = TableSet.find table tables in
543+
let prepend_recent obj stat _ ({creates; mods; last; _} as entries) =
544+
let Stat.{created; modified; deleted} = stat in
545+
if Subscription.object_matches subs table obj then
546+
let last = max last (max modified deleted) in
547+
let creates =
548+
if created > last_generation then
549+
{table; obj; time= created} :: creates
550+
else
551+
creates
552+
in
553+
let mods =
554+
if modified > last_generation && not (created > last_generation) then
555+
{table; obj; time= modified} :: mods
556+
else
557+
mods
558+
in
559+
{entries with creates; mods; last}
560+
else
561+
entries
562+
in
563+
let prepend_deleted obj stat ({deletes; last; _} as entries) =
564+
let Stat.{created; modified; deleted} = stat in
565+
if Subscription.object_matches subs table obj then
566+
let last = max last (max modified deleted) in
567+
let deletes =
568+
if created <= last_generation then
569+
{table; obj; time= deleted} :: deletes
570+
else
571+
deletes
572+
in
573+
{entries with deletes; last}
574+
else
575+
entries
576+
in
577+
acc
578+
|> Table.fold_over_recent last_generation prepend_recent table_value
579+
|> Table.fold_over_deleted last_generation prepend_deleted table_value
580+
528581
let from_inner __context session subs from from_t timer batching =
529582
let open Xapi_database in
530583
let open From in
@@ -541,159 +594,102 @@ let from_inner __context session subs from from_t timer batching =
541594
in
542595
List.filter (fun table -> Subscription.table_matches subs table) all
543596
in
544-
let last_generation = ref from in
545597
let last_msg_gen = ref from_t in
546-
let grab_range t =
598+
let grab_range ~since t =
547599
let tableset = Db_cache_types.Database.tableset (Db_ref.get_database t) in
548600
let msg_gen, messages =
549601
if Subscription.table_matches subs "message" then
550602
!Message.get_since_for_events ~__context !last_msg_gen
551603
else
552604
(0L, [])
553605
in
554-
( msg_gen
555-
, messages
556-
, tableset
557-
, List.fold_left
558-
(fun acc table ->
559-
(* Fold over the live objects *)
560-
let acc =
561-
Db_cache_types.Table.fold_over_recent !last_generation
562-
(fun objref {Db_cache_types.Stat.created; modified; deleted} _
563-
(creates, mods, deletes, last) ->
564-
if Subscription.object_matches subs table objref then
565-
let last = max last (max modified deleted) in
566-
(* mtime guaranteed to always be larger than ctime *)
567-
( ( if created > !last_generation then
568-
(table, objref, created) :: creates
569-
else
570-
creates
571-
)
572-
, ( if
573-
modified > !last_generation
574-
&& not (created > !last_generation)
575-
then
576-
(table, objref, modified) :: mods
577-
else
578-
mods
579-
)
580-
, (* Only have a mod event if we don't have a created event *)
581-
deletes
582-
, last
583-
)
584-
else
585-
(creates, mods, deletes, last)
586-
)
587-
(Db_cache_types.TableSet.find table tableset)
588-
acc
589-
in
590-
(* Fold over the deleted objects *)
591-
Db_cache_types.Table.fold_over_deleted !last_generation
592-
(fun objref {Db_cache_types.Stat.created; modified; deleted}
593-
(creates, mods, deletes, last) ->
594-
if Subscription.object_matches subs table objref then
595-
let last = max last (max modified deleted) in
596-
(* mtime guaranteed to always be larger than ctime *)
597-
if created > !last_generation then
598-
(creates, mods, deletes, last)
599-
(* It was created and destroyed since the last update *)
600-
else
601-
(creates, mods, (table, objref, deleted) :: deletes, last)
602-
(* It might have been modified, but we can't tell now *)
603-
else
604-
(creates, mods, deletes, last)
605-
)
606-
(Db_cache_types.TableSet.find table tableset)
607-
acc
608-
)
609-
([], [], [], !last_generation)
610-
tables
611-
)
606+
let events =
607+
let initial = {creates= []; mods= []; deletes= []; last= since} in
608+
let folder = collect_events (subs, tableset, since) in
609+
List.fold_left folder initial tables
610+
in
611+
(msg_gen, messages, tableset, events)
612612
in
613613
(* Each event.from should have an independent subscription record *)
614-
let msg_gen, messages, tableset, (creates, mods, deletes, last) =
614+
let msg_gen, messages, tableset, events =
615615
with_call session subs (fun sub ->
616616
let grab_nonempty_range =
617-
Throttle.Batching.with_recursive_loop batching @@ fun self arg ->
618-
let ( (msg_gen, messages, _tableset, (creates, mods, deletes, last))
619-
as result
620-
) =
621-
Db_lock.with_lock (fun () -> grab_range (Db_backend.make ()))
617+
Throttle.Batching.with_recursive_loop batching @@ fun self since ->
618+
let result =
619+
Db_lock.with_lock (fun () -> grab_range ~since (Db_backend.make ()))
622620
in
621+
let msg_gen, messages, _tables, events = result in
622+
let {creates; mods; deletes; last} = events in
623623
if
624624
creates = []
625625
&& mods = []
626626
&& deletes = []
627627
&& messages = []
628628
&& not (Clock.Timer.has_expired timer)
629629
then (
630-
last_generation := last ;
631-
(* Cur_id was bumped, but nothing relevent fell out of the db. Therefore the *)
630+
(* cur_id was bumped, but nothing relevent fell out of the database.
631+
Therefore the last ID the client got is equivalent to the current one. *)
632632
sub.cur_id <- last ;
633-
(* last id the client got is equivalent to the current one *)
634633
last_msg_gen := msg_gen ;
635634
wait2 sub last timer ;
636-
(self [@tailcall]) arg
635+
(* The next iteration will fold over events starting after
636+
the last database event that matched a subscription. *)
637+
let next = last in
638+
(self [@tailcall]) next
637639
) else
638640
result
639641
in
640-
grab_nonempty_range ()
642+
grab_nonempty_range from
641643
)
642644
in
643-
last_generation := last ;
644-
let event_of op ?snapshot (table, objref, time) =
645+
let {creates; mods; deletes; last} = events in
646+
let event_of op ?snapshot {table; obj; time} =
645647
{
646648
id= Int64.to_string time
647649
; ts= "0.0"
648650
; ty= String.lowercase_ascii table
649651
; op
650-
; reference= objref
652+
; reference= obj
651653
; snapshot
652654
}
653655
in
654-
let events =
655-
List.fold_left
656-
(fun acc x ->
657-
let ev = event_of `del x in
658-
if Subscription.event_matches subs ev then ev :: acc else acc
659-
)
660-
[] deletes
661-
in
662-
let events =
663-
List.fold_left
664-
(fun acc (table, objref, mtime) ->
656+
let events_of ~kind ?(with_snapshot = true) entries acc =
657+
let rec go events ({table; obj; time= _} as entry) =
658+
let snapshot =
665659
let serialiser = Eventgen.find_get_record table in
666-
try
667-
let xml = serialiser ~__context ~self:objref () in
668-
let ev = event_of `_mod ?snapshot:xml (table, objref, mtime) in
669-
if Subscription.event_matches subs ev then ev :: acc else acc
670-
with _ -> acc
671-
)
672-
events mods
660+
if with_snapshot then
661+
serialiser ~__context ~self:obj ()
662+
else
663+
None
664+
in
665+
let event = event_of kind ?snapshot entry in
666+
if Subscription.event_matches subs event then
667+
event :: events
668+
else
669+
events
670+
in
671+
List.fold_left go acc entries
673672
in
674673
let events =
675-
List.fold_left
676-
(fun acc (table, objref, ctime) ->
677-
let serialiser = Eventgen.find_get_record table in
678-
try
679-
let xml = serialiser ~__context ~self:objref () in
680-
let ev = event_of `add ?snapshot:xml (table, objref, ctime) in
681-
if Subscription.event_matches subs ev then ev :: acc else acc
682-
with _ -> acc
683-
)
684-
events creates
674+
[] (* Accumulate the events for objects stored in the database. *)
675+
|> events_of ~kind:`del ~with_snapshot:false deletes
676+
|> events_of ~kind:`_mod mods
677+
|> events_of ~kind:`add creates
685678
in
686679
let events =
680+
(* Messages require a special casing as their contents are not
681+
stored in the database. *)
687682
List.fold_left
688683
(fun acc mev ->
689684
let event =
685+
let table = "message" in
690686
match mev with
691687
| Message.Create (_ref, message) ->
692688
event_of `add
693689
?snapshot:(Some (API.rpc_of_message_t message))
694-
("message", Ref.string_of _ref, 0L)
690+
{table; obj= Ref.string_of _ref; time= 0L}
695691
| Message.Del _ref ->
696-
event_of `del ("message", Ref.string_of _ref, 0L)
692+
event_of `del {table; obj= Ref.string_of _ref; time= 0L}
697693
in
698694
event :: acc
699695
)

0 commit comments

Comments
 (0)