diff --git a/agents/anda_bot/nitro_enclave/amd64.Dockerfile b/agents/anda_bot/nitro_enclave/amd64.Dockerfile index f9b2683..9771d27 100644 --- a/agents/anda_bot/nitro_enclave/amd64.Dockerfile +++ b/agents/anda_bot/nitro_enclave/amd64.Dockerfile @@ -26,7 +26,7 @@ RUN mv linux-amd64/dnsproxy ./ && chmod +x dnsproxy RUN wget -O ic_tee_nitro_gateway https://github.com/ldclabs/ic-tee/releases/download/v0.2.11/ic_tee_nitro_gateway RUN chmod +x ic_tee_nitro_gateway -RUN wget -O anda_bot https://github.com/ldclabs/anda/releases/download/v0.2.16/anda_bot +RUN wget -O anda_bot https://github.com/ldclabs/anda/releases/download/v0.3.0/anda_bot RUN chmod +x anda_bot FROM --platform=linux/amd64 debian:bookworm-slim AS runtime diff --git a/agents/anda_bot/src/handler.rs b/agents/anda_bot/src/handler.rs index 6f52ede..05af32f 100644 --- a/agents/anda_bot/src/handler.rs +++ b/agents/anda_bot/src/handler.rs @@ -91,7 +91,7 @@ pub async fn add_proposal( ct: Content, ) -> impl IntoResponse { match ct { - Content::CBOR(req, _) => { + Content::CBOR(req, _) | Content::JSON(req, _) => { let is_manager = app.is_manager(&headers).await; if !is_manager { return StatusCode::FORBIDDEN.into_response(); diff --git a/agents/anda_bot/src/twitter.rs b/agents/anda_bot/src/twitter.rs index fa7fd7b..325b4ac 100644 --- a/agents/anda_bot/src/twitter.rs +++ b/agents/anda_bot/src/twitter.rs @@ -1,10 +1,18 @@ use agent_twitter_client::{models::Tweet, scraper::Scraper, search::SearchMode}; -use anda_core::{Agent, BoxError, CacheExpiry, CacheFeatures, CompletionFeatures, StateFeatures}; +use anda_core::{ + Agent, BoxError, CacheFeatures, CompletionFeatures, Path, PutMode, StateFeatures, + StoreFeatures, +}; use anda_engine::{ - context::AgentCtx, engine::Engine, extension::character::CharacterAgent, rand_number, + context::AgentCtx, + engine::Engine, + extension::character::CharacterAgent, + rand_number, }; use anda_lancedb::knowledge::KnowledgeStore; -use std::{collections::BTreeSet, sync::Arc}; +use ciborium::from_reader; +use ic_cose_types::to_cbor_bytes; +use std::sync::Arc; use tokio::{ sync::RwLock, time::{sleep, Duration}, @@ -13,8 +21,8 @@ use tokio_util::sync::CancellationToken; use crate::handler::ServiceStatus; -const MAX_TWEET_LENGTH: usize = 280; const MAX_HISTORY_TWEETS: i64 = 21; +const MAX_SEEN_TWEET_IDS: usize = 1000; static LOG_TARGET: &str = "twitter"; @@ -40,7 +48,50 @@ impl TwitterDaemon { } } + async fn init_seen_tweet_ids(&self, ctx: &F) + where + F: CacheFeatures + StoreFeatures, + { + // load seen_tweet_ids from store + let seen_tweet_ids: Vec = ctx + .store_get(&Path::from("seen_tweet_ids")) + .await + .map(|(v, _)| from_reader(&v[..]).unwrap_or_default()) + .unwrap_or_default(); + + ctx.cache_set("seen_tweet_ids", (seen_tweet_ids, None)) + .await; + } + + async fn get_seen_tweet_ids(&self, ctx: &F) -> Vec + where + F: CacheFeatures + StoreFeatures, + { + ctx.cache_get("seen_tweet_ids").await.unwrap_or_default() + } + + async fn set_seen_tweet_ids(&self, ctx: F, val: Vec) + where + F: CacheFeatures + StoreFeatures + Send + Sync + 'static, + { + ctx.cache_set("seen_tweet_ids", (val.clone(), None)).await; + tokio::spawn(async move { + let _ = ctx + .store_put( + &Path::from("seen_tweet_ids"), + PutMode::Overwrite, + to_cbor_bytes(&val).into(), + ) + .await; + }); + } + pub async fn run(&self, cancel_token: CancellationToken) -> Result<(), BoxError> { + let ctx = self.engine.ctx_with(self.agent.as_ref(), None, None)?; + + // load seen_tweet_ids from store + self.init_seen_tweet_ids(&ctx).await; + log::info!(target: LOG_TARGET, "starting Twitter bot"); loop { @@ -89,7 +140,7 @@ impl TwitterDaemon { } } - if rand_number(0..=7) == 0 { + if rand_number(0..=5) == 0 { if let Err(err) = self.handle_home_timeline().await { log::error!(target: LOG_TARGET, "handle_home_timeline error: {err:?}"); } @@ -157,10 +208,14 @@ impl TwitterDaemon { None, )?; - let mut seen_tweet_ids: BTreeSet = - ctx.cache_get("seen_tweet_ids").await.unwrap_or_default(); - let seen: Vec = seen_tweet_ids.iter().cloned().collect(); - let tweets = self.scraper.get_home_timeline(1, seen).await?; + let mut seen_tweet_ids: Vec = self.get_seen_tweet_ids(&ctx).await; + if seen_tweet_ids.len() >= MAX_SEEN_TWEET_IDS { + seen_tweet_ids.drain(0..MAX_SEEN_TWEET_IDS / 2); + } + let tweets = self + .scraper + .get_home_timeline(1, seen_tweet_ids.clone()) + .await?; log::info!(target: LOG_TARGET, "process home timeline, {} tweets", tweets.len()); let mut likes = 0; @@ -191,7 +246,7 @@ impl TwitterDaemon { if seen_tweet_ids.contains(&tweet_id) { continue; } - seen_tweet_ids.insert(tweet_id.clone()); + seen_tweet_ids.push(tweet_id.clone()); let res: Result<(), BoxError> = async { if self.handle_like(&ctx, &tweet_content, &tweet_id).await? { @@ -215,8 +270,7 @@ impl TwitterDaemon { sleep(Duration::from_secs(rand_number(3..=10))).await; } - ctx.cache_set("seen_tweet_ids", (seen_tweet_ids, None)) - .await; + self.set_seen_tweet_ids(ctx, seen_tweet_ids).await; log::info!(target: LOG_TARGET, "home timeline: likes {}, retweets {}, quotes {}", likes, retweets, quotes); Ok(()) } @@ -235,20 +289,13 @@ impl TwitterDaemon { let ctx = self .engine .ctx_with(self.agent.as_ref(), Some(tweet_user.clone()), None)?; + let mut seen_tweet_ids: Vec = self.get_seen_tweet_ids(&ctx).await; - let handle_key = format!("D_{}", tweet_id); - if ctx.cache_contains(&handle_key) { + if seen_tweet_ids.contains(&tweet_id) { return Ok(()); } - ctx.cache_set( - &handle_key, - ( - true, - Some(CacheExpiry::TTL(Duration::from_secs(3600 * 24 * 3))), - ), - ) - .await; + seen_tweet_ids.push(tweet_id.clone()); let thread = self.build_conversation_thread(&tweet).await?; let messages: Vec = thread @@ -268,29 +315,17 @@ impl TwitterDaemon { messages.join("\n") }; - let res = self.agent.run(ctx, tweet_text, None).await?; + let res = self.agent.run(ctx.clone(), tweet_text, None).await?; + if res.failed_reason.is_none() { + // Reply to the original tweet + let tweet: Option<&str> = tweet.id.as_deref(); + let _ = self.scraper.send_tweet(&res.content, tweet, None).await?; - if res.failed_reason.is_some() { - return Ok(()); + log::info!(target: LOG_TARGET, "handle mention: {} - {}, {} chars", tweet_user, tweet_id, res.content.chars().count()); } - // Split response into tweet-sized chunks if necessary - let chunks: Vec = res - .content - .chars() - .collect::>() - .chunks(MAX_TWEET_LENGTH) - .map(|chunk| chunk.iter().collect::()) - .collect(); - - // Reply to the original tweet - let tweet: Option<&str> = tweet.id.as_deref(); - for chunk in &chunks { - let _ = self.scraper.send_tweet(chunk.as_str(), tweet, None).await?; - sleep(Duration::from_secs(rand_number(1..=3))).await; - } + self.set_seen_tweet_ids(ctx, seen_tweet_ids.clone()).await; - log::info!(target: LOG_TARGET, "handle mention: {} - {}, {} chunks", tweet_user, tweet_id, chunks.len()); Ok(()) }