Skip to content

Commit a0cdef9

Browse files
authored
fix(sync): added client.wait_for_start() and improve error handling in sync daemon (#495)
* fix(sync): improve error handling in daemon and add exponential backoff for server connection - Added error logging in daemon before re-throwing errors - Implemented exponential backoff when checking if the server is running - Addresses issue ActivityWatch/aw-qt#105 * Apply suggestions from code review * refactor: move wait_for_server() into client * refactor: renamed wait_for_server to wait_for_start (same as in aw-client-python)
1 parent 656f3c9 commit a0cdef9

File tree

4 files changed

+56
-16
lines changed

4 files changed

+56
-16
lines changed

aw-client-rust/src/blocking.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,8 @@ impl AwClient {
7979
proxy_method!(delete_event, (), bucketname: &str, event_id: i64);
8080
proxy_method!(get_event_count, i64, bucketname: &str);
8181
proxy_method!(get_info, aw_models::Info,);
82+
83+
pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
84+
self.client.wait_for_start()
85+
}
8286
}

aw-client-rust/src/lib.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use std::{collections::HashMap, error::Error};
1111

1212
use chrono::{DateTime, Utc};
1313
use serde_json::{json, Map};
14+
use std::net::TcpStream;
15+
use std::time::Duration;
1416

1517
pub use aw_models::{Bucket, BucketMetadata, Event};
1618

@@ -221,4 +223,38 @@ impl AwClient {
221223
let url = format!("{}/api/0/info", self.baseurl);
222224
self.client.get(url).send().await?.json().await
223225
}
226+
227+
// TODO: make async
228+
pub fn wait_for_start(&self) -> Result<(), Box<dyn Error>> {
229+
let socket_addrs = self.baseurl.socket_addrs(|| None)?;
230+
let socket_addr = socket_addrs
231+
.first()
232+
.ok_or("Unable to resolve baseurl into socket address")?;
233+
234+
// Check if server is running with exponential backoff
235+
let mut retry_delay = Duration::from_millis(100);
236+
let max_wait = Duration::from_secs(10);
237+
let mut total_wait = Duration::from_secs(0);
238+
239+
while total_wait < max_wait {
240+
match TcpStream::connect_timeout(socket_addr, retry_delay) {
241+
Ok(_) => break,
242+
Err(_) => {
243+
std::thread::sleep(retry_delay);
244+
total_wait += retry_delay;
245+
retry_delay *= 2;
246+
}
247+
}
248+
}
249+
250+
if total_wait >= max_wait {
251+
return Err(format!(
252+
"Local server {} not running after 10 seconds of retrying",
253+
socket_addr
254+
)
255+
.into());
256+
}
257+
258+
Ok(())
259+
}
224260
}

aw-sync/src/main.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,23 @@ fn main() -> Result<(), Box<dyn Error>> {
208208

209209
fn daemon(client: &AwClient) -> Result<(), Box<dyn Error>> {
210210
loop {
211-
info!("Pulling from all hosts");
212-
sync_wrapper::pull_all(client)?;
213-
214-
info!("Pushing local data");
215-
sync_wrapper::push(client)?;
211+
if let Err(e) = daemon_sync_cycle(client) {
212+
error!("Error during sync cycle: {}", e);
213+
// Re-throw the error
214+
return Err(e);
215+
}
216216

217217
info!("Sync pass done, sleeping for 5 minutes");
218-
219218
std::thread::sleep(std::time::Duration::from_secs(300));
220219
}
221220
}
221+
222+
fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box<dyn Error>> {
223+
info!("Pulling from all hosts");
224+
sync_wrapper::pull_all(client)?;
225+
226+
info!("Pushing local data");
227+
sync_wrapper::push(client)?;
228+
229+
Ok(())
230+
}

aw-sync/src/sync_wrapper.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::error::Error;
22
use std::fs;
3-
use std::net::TcpStream;
43

54
use crate::sync::{sync_run, SyncMode, SyncSpec};
65
use aw_client_rust::blocking::AwClient;
@@ -14,15 +13,7 @@ pub fn pull_all(client: &AwClient) -> Result<(), Box<dyn Error>> {
1413
}
1514

1615
pub fn pull(host: &str, client: &AwClient) -> Result<(), Box<dyn Error>> {
17-
let socket_addrs = client.baseurl.socket_addrs(|| None)?;
18-
let socket_addr = socket_addrs
19-
.get(0)
20-
.ok_or("Unable to resolve baseurl into socket address")?;
21-
22-
// Check if server is running
23-
if TcpStream::connect(socket_addr).is_err() {
24-
return Err(format!("Local server {} not running", &client.baseurl).into());
25-
}
16+
client.wait_for_start()?;
2617

2718
// Path to the sync folder
2819
// Sync folder is structured ./{hostname}/{device_id}/test.db

0 commit comments

Comments
 (0)