Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 61 additions & 36 deletions crates/database/src/issues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,19 +886,26 @@ pub struct IncidentStats {
}

impl Incident {
/// Bulk-fetch stats for a set of incidents in four grouped queries —
/// issues, events, incident-notes, issue-notes — run concurrently on
/// four pool connections. Missing incident_ids get `Default` (zero).
/// Bulk-fetch stats for a set of incidents. Missing incident_ids get
/// `Default` (zero).
///
/// Takes the pool (`&Db`) rather than a single connection so the four
/// futures don't fight over one mutable handle.
/// `incident_issues` is keyed on `(incident_id, issue_id, joined_at)`,
/// so an issue that leaves and rejoins the same incident produces
/// multiple rows for the same pair. Counts must dedupe on the pair
/// before joining to `events` or `issue_notes`, otherwise every event
/// or note gets multiplied by the rejoin count.
///
/// Strategy: pull the distinct `(incident_id, issue_id)` pairs first,
/// then run the per-issue event/note counts and the direct
/// incident_notes count concurrently. Takes the pool (`&Db`) so the
/// parallel futures don't fight over one mutable conn handle.
pub async fn stats_for(
pool: &crate::Db,
incident_ids: &[Uuid],
) -> Result<std::collections::HashMap<Uuid, IncidentStats>> {
use crate::schema::{events, incident_issues, incident_notes, issue_notes};
use diesel::dsl::count_star;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

let mut out: HashMap<Uuid, IncidentStats> = incident_ids
.iter()
Expand All @@ -908,28 +915,45 @@ impl Incident {
return Ok(out);
}

// Each future grabs its own pool connection so the four queries
// run in parallel rather than serialised on one mutable conn.
let ids = incident_ids.to_vec();
let f_issues = async {
let pairs: Vec<(Uuid, Uuid)> = {
let mut c = pool.get().await?;
incident_issues::table
.group_by(incident_issues::incident_id)
.select((incident_issues::incident_id, count_star()))
.filter(incident_issues::incident_id.eq_any(&ids))
.load::<(Uuid, i64)>(&mut c)
.await
.map_err(AppError::from)
.select((incident_issues::incident_id, incident_issues::issue_id))
.distinct()
.load(&mut c)
.await?
};

let mut issues_by_incident: HashMap<Uuid, HashSet<Uuid>> = HashMap::new();
for (incident_id, issue_id) in &pairs {
issues_by_incident
.entry(*incident_id)
.or_default()
.insert(*issue_id);
}
for (incident_id, issues) in &issues_by_incident {
out.entry(*incident_id).or_default().issue_count = issues.len() as i64;
}

let unique_issue_ids: Vec<Uuid> = issues_by_incident
.values()
.flatten()
.copied()
.collect::<HashSet<_>>()
.into_iter()
.collect();

let f_events = async {
if unique_issue_ids.is_empty() {
return Result::<Vec<(Uuid, i64)>>::Ok(Vec::new());
}
let mut c = pool.get().await?;
events::table
.inner_join(
incident_issues::table.on(events::issue_id.eq(incident_issues::issue_id)),
)
.group_by(incident_issues::incident_id)
.select((incident_issues::incident_id, count_star()))
.filter(incident_issues::incident_id.eq_any(&ids))
.group_by(events::issue_id)
.select((events::issue_id, count_star()))
.filter(events::issue_id.eq_any(&unique_issue_ids))
.load::<(Uuid, i64)>(&mut c)
.await
.map_err(AppError::from)
Expand All @@ -945,33 +969,34 @@ impl Incident {
.map_err(AppError::from)
};
let f_jnotes = async {
if unique_issue_ids.is_empty() {
return Result::<Vec<(Uuid, i64)>>::Ok(Vec::new());
}
let mut c = pool.get().await?;
issue_notes::table
.inner_join(
incident_issues::table.on(issue_notes::issue_id.eq(incident_issues::issue_id)),
)
.group_by(incident_issues::incident_id)
.select((incident_issues::incident_id, count_star()))
.filter(incident_issues::incident_id.eq_any(&ids))
.group_by(issue_notes::issue_id)
.select((issue_notes::issue_id, count_star()))
.filter(issue_notes::issue_id.eq_any(&unique_issue_ids))
.load::<(Uuid, i64)>(&mut c)
.await
.map_err(AppError::from)
};
let (issue_rows, event_rows, inote_rows, jnote_rows) =
futures::try_join!(f_issues, f_events, f_inotes, f_jnotes)?;
let (event_rows, inote_rows, jnote_rows) =
futures::try_join!(f_events, f_inotes, f_jnotes)?;

for (id, n) in issue_rows {
out.entry(id).or_default().issue_count = n;
}
for (id, n) in event_rows {
out.entry(id).or_default().event_count = n;
let events_per_issue: HashMap<Uuid, i64> = event_rows.into_iter().collect();
let notes_per_issue: HashMap<Uuid, i64> = jnote_rows.into_iter().collect();

for (incident_id, issues) in &issues_by_incident {
let entry = out.entry(*incident_id).or_default();
for issue_id in issues {
entry.event_count += events_per_issue.get(issue_id).copied().unwrap_or(0);
entry.note_count += notes_per_issue.get(issue_id).copied().unwrap_or(0);
}
}
for (id, n) in inote_rows {
out.entry(id).or_default().note_count += n;
}
for (id, n) in jnote_rows {
out.entry(id).or_default().note_count += n;
}

Ok(out)
}
Expand Down
93 changes: 93 additions & 0 deletions crates/database/tests/incident_stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! `Incident::stats_for` must dedupe `(incident_id, issue_id)` before
//! counting. `incident_issues` is keyed on `(incident_id, issue_id,
//! joined_at)`, so an issue that leaves and rejoins the same incident
//! produces multiple rows for the same pair. Without dedup, the issue
//! count inflates by the number of rejoins, and the event/note counts
//! get multiplied by the same factor.

use database::issues::Incident;
use diesel_async::SimpleAsyncConnection as _;
use uuid::Uuid;

#[tokio::test(flavor = "multi_thread")]
async fn stats_for_dedupes_repeat_join_rows() {
commons_tests::db::TestDb::run(async |mut conn, url| {
let server_id = Uuid::new_v4();
let device_id = Uuid::new_v4();
let issue_id = Uuid::new_v4();
let incident_id = Uuid::new_v4();
let event_a = Uuid::new_v4();
let event_b = Uuid::new_v4();
let issue_note = Uuid::new_v4();
let incident_note = Uuid::new_v4();

conn.batch_execute(&format!(
"INSERT INTO devices (id, role) VALUES ('{device_id}', 'server'); \
INSERT INTO servers (id, host, kind, device_id) VALUES \
('{server_id}', 'https://example.com', 'central', '{device_id}'); \
INSERT INTO issues \
(id, server_id, device_id, source, ref, severity, message, active, first_seen, last_seen) \
VALUES \
('{issue_id}', '{server_id}', '{device_id}', 'test', 'r', 'error', 'm', true, NOW(), NOW()); \
INSERT INTO incidents (id, server_id, opened_at) \
VALUES ('{incident_id}', '{server_id}', NOW()); \
INSERT INTO events \
(id, issue_id, severity, message, active, hash, occurrences, last_seen) \
VALUES \
('{event_a}', '{issue_id}', 'error', 'one', true, '\\x01'::bytea, 7, NOW()), \
('{event_b}', '{issue_id}', 'error', 'two', true, '\\x02'::bytea, 1, NOW()); \
INSERT INTO issue_notes (id, issue_id, author, body) \
VALUES ('{issue_note}', '{issue_id}', 'op', 'jn'); \
INSERT INTO incident_notes (id, incident_id, author, body) \
VALUES ('{incident_note}', '{incident_id}', 'op', 'in'); \
INSERT INTO incident_issues (incident_id, issue_id, joined_at, left_at) VALUES \
('{incident_id}', '{issue_id}', NOW() - interval '5 min', NOW() - interval '4 min'), \
('{incident_id}', '{issue_id}', NOW() - interval '4 min', NOW() - interval '3 min'), \
('{incident_id}', '{issue_id}', NOW() - interval '3 min', NOW() - interval '2 min'), \
('{incident_id}', '{issue_id}', NOW() - interval '2 min', NOW() - interval '1 min'), \
('{incident_id}', '{issue_id}', NOW(), NULL);"
))
.await
.expect("seed");
drop(conn);

let pool = database::init_to(&url);
let stats = Incident::stats_for(&pool, &[incident_id])
.await
.expect("stats_for");
let s = stats.get(&incident_id).expect("stats present");
assert_eq!(s.issue_count, 1, "one distinct issue despite 5 join rows");
assert_eq!(
s.event_count, 2,
"two event rows, not multiplied by 5 join rows"
);
assert_eq!(
s.note_count, 2,
"one incident_note + one issue_note, not 1 + 5"
);
})
.await
}

#[tokio::test(flavor = "multi_thread")]
async fn stats_for_handles_missing_and_empty_inputs() {
commons_tests::db::TestDb::run(async |_conn, url| {
let pool = database::init_to(&url);
assert!(
Incident::stats_for(&pool, &[])
.await
.expect("empty")
.is_empty()
);

let phantom = Uuid::new_v4();
let stats = Incident::stats_for(&pool, &[phantom])
.await
.expect("phantom");
let s = stats.get(&phantom).expect("phantom entry");
assert_eq!(s.issue_count, 0);
assert_eq!(s.event_count, 0);
assert_eq!(s.note_count, 0);
})
.await
}