Skip to content

Commit 1b0cd1e

Browse files
committed
remove update handle as soon as listeners are gone
1 parent 05600f7 commit 1b0cd1e

File tree

2 files changed

+35
-73
lines changed

2 files changed

+35
-73
lines changed

crates/corro-agent/src/api/public/update.rs

+28-66
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use crate::api::public::pubsub::MatcherUpsertError;
2121
pub type UpdateBroadcastCache = HashMap<Uuid, broadcast::Sender<Bytes>>;
2222
pub type SharedUpdateBroadcastCache = Arc<TokioRwLock<UpdateBroadcastCache>>;
2323

24-
const MAX_UNSUB_TIME: Duration = Duration::from_secs(120);
2524
// this should be a fraction of the MAX_UNSUB_TIME
2625
const RECEIVERS_CHECK_INTERVAL: Duration = Duration::from_secs(30);
2726

@@ -107,88 +106,51 @@ pub async fn process_update_channel(
107106
) {
108107
let mut buf = BytesMut::new();
109108

110-
let mut deadline = if tx.receiver_count() == 0 {
111-
Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME)))
112-
} else {
113-
None
114-
};
115-
116-
// even if there are no more subscribers
109+
// interval check for receivers
117110
// useful for queries that don't change often so we can cleanup...
118111
let mut subs_check = tokio::time::interval(RECEIVERS_CHECK_INTERVAL);
119112

120113
loop {
121-
let deadline_check = async {
122-
if let Some(sleep) = deadline.as_mut() {
123-
sleep.await
124-
} else {
125-
futures::future::pending().await
126-
}
127-
};
128-
129-
let notify_evt = tokio::select! {
114+
tokio::select! {
130115
biased;
131-
Some(query_evt) = evt_rx.recv() => query_evt,
132-
_ = deadline_check => {
133-
if tx.receiver_count() == 0 {
134-
info!(sub_id = %id, "All listeners for subscription are gone and didn't come back within {MAX_UNSUB_TIME:?}");
135-
break;
136-
}
137-
138-
// reset the deadline if there are receivers!
139-
deadline = None;
140-
continue;
116+
Some(query_evt) = evt_rx.recv() => {
117+
match make_query_event_bytes(&mut buf, &query_evt) {
118+
Ok(b) => {
119+
if tx.send(b).is_err() {
120+
break;
121+
}
122+
},
123+
Err(e) => {
124+
match make_query_event_bytes(&mut buf, &NotifyEvent::Error(e.to_compact_string())) {
125+
Ok(b) => {
126+
let _ = tx.send(b);
127+
}
128+
Err(e) => {
129+
warn!(update_id = %id, "failed to send error in update channel: {e}");
130+
}
131+
}
132+
break;
133+
}
134+
};
141135
},
142136
_ = subs_check.tick() => {
143137
if tx.receiver_count() == 0 {
144-
if deadline.is_none() {
145-
deadline = Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME)));
146-
}
147-
} else {
148-
deadline = None;
138+
break;
149139
};
150-
continue;
151140
},
152-
else => {
153-
break;
154-
}
155141
};
156-
157-
let is_still_active = match make_query_event_bytes(&mut buf, &notify_evt) {
158-
Ok(b) => tx.send(b).is_ok(),
159-
Err(e) => {
160-
match make_query_event_bytes(&mut buf, &NotifyEvent::Error(e.to_compact_string())) {
161-
Ok(b) => {
162-
let _ = tx.send(b);
163-
}
164-
Err(e) => {
165-
warn!(update_id = %id, "error sending error: {e}");
166-
}
167-
}
168-
break;
169-
}
170-
};
171-
172-
if is_still_active {
173-
deadline = None;
174-
} else {
175-
debug!(sub_id = %id, "no active listeners to receive subscription event: {notify_evt:?}");
176-
if deadline.is_none() {
177-
deadline = Some(Box::pin(tokio::time::sleep(MAX_UNSUB_TIME)));
178-
}
179-
}
180142
}
181143

182-
warn!(sub_id = %id, "subscription query channel done");
144+
warn!(sub_id = %id, "updates channel done");
183145

184146
// remove and get handle from the agent's "matchers"
185147
let handle = match updates.remove(&id) {
186148
Some(h) => {
187-
info!(update_id = %id, "Removed update from process_update_channel");
149+
info!(update_id = %id, "Removed update handle from process_update_channel");
188150
h
189151
}
190152
None => {
191-
warn!(update_id = %id, "subscription handle was already gone. odd!");
153+
warn!(update_id = %id, "update handle was already gone. odd!");
192154
return;
193155
}
194156
};
@@ -234,7 +196,7 @@ async fn forward_update_bytes_to_body_sender(
234196
buf.extend_from_slice(&event_buf);
235197
if buf.len() >= 64 * 1024 {
236198
if let Err(e) = tx.send_data(buf.split().freeze()).await {
237-
warn!(update_id = %update.id(), "could not forward subscription query event to receiver: {e}");
199+
warn!(update_id = %update.id(), "could not forward update query event to receiver: {e}");
238200
return;
239201
}
240202
};
@@ -265,8 +227,8 @@ async fn forward_update_bytes_to_body_sender(
265227
}
266228
},
267229
_ = update.cancelled() => {
268-
// info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber");
269-
// return;
230+
info!(update_id = %update.id(), "update cancelled, aborting forwarding bytes to subscriber");
231+
return;
270232
},
271233
_ = &mut tripwire => {
272234
break;

crates/corro-types/src/updates.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -430,9 +430,9 @@ where
430430
}
431431

432432
// metrics...
433-
for (table, pks) in candidates.clone() {
433+
for (table, pks) in candidates.iter() {
434434
handle
435-
.get_counter(&table)
435+
.get_counter(table)
436436
.matched_count
437437
.increment(pks.len() as u64);
438438
}
@@ -441,14 +441,14 @@ where
441441

442442
if let Err(e) = handle
443443
.changes_tx()
444-
.try_send((candidates.clone(), db_version))
444+
.try_send((candidates, db_version))
445445
{
446446
error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}");
447447
match e {
448448
mpsc::error::TrySendError::Full(item) => {
449449
warn!("channel is full, falling back to async send");
450450

451-
let changes_tx = handle.changes_tx().clone();
451+
let changes_tx = handle.changes_tx();
452452
tokio::spawn(async move {
453453
_ = changes_tx.send(item).await;
454454
});
@@ -521,11 +521,11 @@ where
521521
// metrics...
522522
for (id, (candidates, handle)) in candidates {
523523
let mut match_count = 0;
524-
for (table, pks) in candidates.clone() {
524+
for (table, pks) in candidates.iter() {
525525
let count = pks.len();
526526
match_count += count;
527527
handle
528-
.get_counter(&table)
528+
.get_counter(table)
529529
.matched_count
530530
.increment(pks.len() as u64);
531531
}
@@ -534,7 +534,7 @@ where
534534

535535
if let Err(e) = handle
536536
.changes_tx()
537-
.try_send((candidates.clone(), db_version))
537+
.try_send((candidates, db_version))
538538
{
539539
error!(sub_id = %id, "could not send change candidates to {trait_type} handler: {e}");
540540
match e {

0 commit comments

Comments
 (0)