Skip to content

Commit bcb3c1b

Browse files
committed
Instruction to update expiry
1 parent 8ca9bfc commit bcb3c1b

File tree

5 files changed

+153
-51
lines changed

5 files changed

+153
-51
lines changed

crates/core/src/sync/interface.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub struct StartSyncStream {
3838
///
3939
/// We will increase the expiry date for those streams at the time we connect and disconnect.
4040
#[serde(default)]
41-
pub active_streams: Vec<StreamKey>,
41+
pub active_streams: Rc<Vec<StreamKey>>,
4242
}
4343

4444
impl StartSyncStream {
@@ -89,6 +89,12 @@ pub enum SyncEvent<'a> {
8989
TextLine { data: &'a str },
9090
/// Forward a binary line (BSON) received from the sync service.
9191
BinaryLine { data: &'a [u8] },
92+
/// The active stream subscriptions (as in, `SyncStreamSubscription` instances active right now)
93+
/// have changed.
94+
///
95+
/// The client will compare the new active subscriptions with the current one and will issue a
96+
/// request to restart the sync iteration if necessary.
97+
DidUpdateSubscriptions { active_streams: Rc<Vec<StreamKey>> },
9298
}
9399

94100
/// An instruction sent by the core extension to the SDK.
@@ -114,13 +120,20 @@ pub enum Instruction {
114120
// These are defined like this because deserializers in Kotlin can't support either an
115121
// object or a literal value
116122
/// Close the websocket / HTTP stream to the sync service.
117-
CloseSyncStream {},
123+
CloseSyncStream(CloseSyncStream),
118124
/// Flush the file-system if it's non-durable (only applicable to the Dart SDK).
119125
FlushFileSystem {},
120126
/// Notify that a sync has been completed, prompting client SDKs to clear earlier errors.
121127
DidCompleteSync {},
122128
}
123129

130+
#[derive(Serialize, Default)]
131+
pub struct CloseSyncStream {
132+
/// Whether clients should hide the brief disconnected status from the public sync status and
133+
/// reconnect immediately.
134+
pub hide_disconnect: bool,
135+
}
136+
124137
#[derive(Serialize)]
125138
pub enum LogSeverity {
126139
DEBUG,
@@ -136,16 +149,16 @@ pub struct StreamingSyncRequest {
136149
pub binary_data: bool,
137150
pub client_id: String,
138151
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
139-
pub streams: StreamSubscriptionRequest,
152+
pub streams: Rc<StreamSubscriptionRequest>,
140153
}
141154

142-
#[derive(Serialize)]
155+
#[derive(Debug, Serialize, PartialEq)]
143156
pub struct StreamSubscriptionRequest {
144157
pub include_defaults: bool,
145158
pub subscriptions: Vec<RequestedStreamSubscription>,
146159
}
147160

148-
#[derive(Serialize)]
161+
#[derive(Debug, Serialize, PartialEq)]
149162
pub struct RequestedStreamSubscription {
150163
/// The name of the sync stream to subscribe to.
151164
pub stream: String,
@@ -223,6 +236,12 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
223236
}),
224237
"refreshed_token" => SyncControlRequest::SyncEvent(SyncEvent::DidRefreshToken),
225238
"completed_upload" => SyncControlRequest::SyncEvent(SyncEvent::UploadFinished),
239+
"update_subscriptions" => {
240+
SyncControlRequest::SyncEvent(SyncEvent::DidUpdateSubscriptions {
241+
active_streams: serde_json::from_str(payload.text())
242+
.map_err(PowerSyncError::as_argument_error)?,
243+
})
244+
}
226245
"subscriptions" => {
227246
let request = serde_json::from_str(payload.text())
228247
.map_err(PowerSyncError::as_argument_error)?;

crates/core/src/sync/storage_adapter.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use core::{assert_matches::debug_assert_matches, fmt::Display};
22

3-
use alloc::{string::ToString, vec::Vec};
3+
use alloc::{rc::Rc, string::ToString, vec::Vec};
44
use serde::Serialize;
55
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
66

@@ -13,7 +13,7 @@ use crate::{
1313
sync::{
1414
checkpoint::{ChecksumMismatch, validate_checkpoint},
1515
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
16-
streaming_sync::OwnedStreamDescription,
16+
streaming_sync::{OwnedStreamDescription, RequestedStreamSubscriptions},
1717
subscriptions::{LocallyTrackedSubscription, StreamKey},
1818
sync_status::SyncPriorityStatus,
1919
},
@@ -287,7 +287,7 @@ impl StorageAdapter {
287287
pub fn collect_subscription_requests(
288288
&self,
289289
include_defaults: bool,
290-
) -> Result<(StreamSubscriptionRequest, Vec<i64>), PowerSyncError> {
290+
) -> Result<RequestedStreamSubscriptions, PowerSyncError> {
291291
self.delete_outdated_subscriptions()?;
292292

293293
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
@@ -310,13 +310,13 @@ impl StorageAdapter {
310310
index_to_local_id.push(subscription.id);
311311
}
312312

313-
Ok((
314-
StreamSubscriptionRequest {
313+
Ok(RequestedStreamSubscriptions {
314+
request: Rc::new(StreamSubscriptionRequest {
315315
include_defaults,
316316
subscriptions,
317-
},
318-
index_to_local_id,
319-
))
317+
}),
318+
subscription_ids: Rc::new(index_to_local_id),
319+
})
320320
}
321321

322322
pub fn now(&self) -> Result<Timestamp, ResultCode> {

0 commit comments

Comments
 (0)