Skip to content

Commit b33dd61

Browse files
committed
Add correct offline state
1 parent bcb3c1b commit b33dd61

File tree

6 files changed

+50
-30
lines changed

6 files changed

+50
-30
lines changed

crates/core/src/sync/storage_adapter.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
1616
streaming_sync::{OwnedStreamDescription, RequestedStreamSubscriptions},
1717
subscriptions::{LocallyTrackedSubscription, StreamKey},
18-
sync_status::SyncPriorityStatus,
18+
sync_status::{ActiveStreamSubscription, DownloadSyncStatus, SyncPriorityStatus},
1919
},
2020
sync_local::{PartialSyncOperation, SyncOperation},
2121
util::{JsonString, column_nullable},
@@ -87,30 +87,45 @@ impl StorageAdapter {
8787
Ok(requests)
8888
}
8989

90-
pub fn collect_sync_state(&self) -> Result<Vec<SyncPriorityStatus>, PowerSyncError> {
91-
// language=SQLite
92-
let statement = self
90+
pub fn offline_sync_state(&self) -> Result<DownloadSyncStatus, PowerSyncError> {
91+
let priority_items = {
92+
// language=SQLite
93+
let statement = self
9394
.db
9495
.prepare_v2(
9596
"SELECT priority, unixepoch(last_synced_at) FROM ps_sync_state ORDER BY priority",
9697
)
9798
.into_db_result(self.db)?;
9899

99-
let mut items = Vec::<SyncPriorityStatus>::new();
100-
while statement.step()? == ResultCode::ROW {
101-
let priority = BucketPriority {
102-
number: statement.column_int(0),
103-
};
104-
let timestamp = statement.column_int64(1);
105-
106-
items.push(SyncPriorityStatus {
107-
priority,
108-
last_synced_at: Some(Timestamp(timestamp)),
109-
has_synced: Some(true),
110-
});
111-
}
100+
let mut items = Vec::<SyncPriorityStatus>::new();
101+
while statement.step()? == ResultCode::ROW {
102+
let priority = BucketPriority {
103+
number: statement.column_int(0),
104+
};
105+
let timestamp = statement.column_int64(1);
112106

113-
return Ok(items);
107+
items.push(SyncPriorityStatus {
108+
priority,
109+
last_synced_at: Some(Timestamp(timestamp)),
110+
has_synced: Some(true),
111+
});
112+
}
113+
114+
items
115+
};
116+
117+
let mut streams = Vec::new();
118+
self.iterate_local_subscriptions(|sub| {
119+
streams.push(ActiveStreamSubscription::from_local(&sub));
120+
})?;
121+
122+
Ok(DownloadSyncStatus {
123+
connected: false,
124+
connecting: false,
125+
priority_status: priority_items,
126+
downloading: None,
127+
streams,
128+
})
114129
}
115130

116131
pub fn delete_buckets<'a>(

crates/core/src/sync/streaming_sync.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -784,12 +784,13 @@ impl StreamingSyncIteration {
784784
if matches!(&result, SyncLocalResult::ChangesApplied) {
785785
// Update affected stream subscriptions to mark them as synced.
786786
let mut status = self.status.inner().borrow_mut();
787-
if let Some(ref mut streams) = status.streams {
787+
788+
if !status.streams.is_empty() {
788789
let stmt = self.adapter.db.prepare_v2(
789790
"UPDATE ps_stream_subscriptions SET last_synced_at = unixepoch() WHERE id = ? RETURNING last_synced_at",
790791
)?;
791792

792-
for stream in streams {
793+
for stream in &mut status.streams {
793794
if stream.is_in_priority(priority) {
794795
stmt.bind_int64(1, stream.id)?;
795796
if stmt.step()? == ResultCode::ROW {
@@ -822,9 +823,12 @@ impl StreamingSyncIteration {
822823
));
823824
};
824825

825-
let sync_state = self.adapter.collect_sync_state()?;
826+
let offline_state = self.adapter.offline_sync_state()?;
826827
self.status.update(
827-
move |s| s.start_connecting(sync_state),
828+
move |s| {
829+
*s = offline_state;
830+
s.start_connecting();
831+
},
828832
&mut event.instructions,
829833
);
830834

crates/core/src/sync/sync_status.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub struct DownloadSyncStatus {
4141
/// When a download is active (that is, a `checkpoint` or `checkpoint_diff` line has been
4242
/// received), information about how far the download has progressed.
4343
pub downloading: Option<SyncDownloadProgress>,
44-
pub streams: Option<Vec<ActiveStreamSubscription>>,
44+
pub streams: Vec<ActiveStreamSubscription>,
4545
}
4646

4747
impl DownloadSyncStatus {
@@ -58,11 +58,10 @@ impl DownloadSyncStatus {
5858
self.downloading = None;
5959
}
6060

61-
pub fn start_connecting(&mut self, status: Vec<SyncPriorityStatus>) {
61+
pub fn start_connecting(&mut self) {
6262
self.connected = false;
6363
self.downloading = None;
6464
self.connecting = true;
65-
self.priority_status = status;
6665
self.debug_assert_priority_status_is_sorted();
6766
}
6867

@@ -82,7 +81,7 @@ impl DownloadSyncStatus {
8281
self.mark_connected();
8382

8483
self.downloading = Some(progress);
85-
self.streams = Some(subscriptions);
84+
self.streams = subscriptions;
8685
}
8786

8887
/// Increments [SyncDownloadProgress] progress for the given [DataLine].
@@ -126,7 +125,7 @@ impl Default for DownloadSyncStatus {
126125
connecting: false,
127126
downloading: None,
128127
priority_status: Vec::new(),
129-
streams: None,
128+
streams: Vec::new(),
130129
}
131130
}
132131
}

dart/test/goldens/simple_iteration.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"connecting": true,
1111
"priority_status": [],
1212
"downloading": null,
13-
"streams": null
13+
"streams": []
1414
}
1515
}
1616
},

dart/test/goldens/starting_stream.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"connecting": true,
1515
"priority_status": [],
1616
"downloading": null,
17-
"streams": null
17+
"streams": []
1818
}
1919
}
2020
},

dart/test/sync_test.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,9 @@ void _syncTests<T>({
703703
"Checksums didn't match, failed for: a (expected 0x000004d2, got 0x000010e1 = 0x000010e1 (op) + 0x00000000 (add))")
704704
}
705705
},
706-
{'CloseSyncStream': {}},
706+
{
707+
'CloseSyncStream': {'hide_disconnect': false}
708+
},
707709
],
708710
);
709711

0 commit comments

Comments
 (0)