Skip to content

Commit

Permalink
refactor: split out list_resources_from_system and refactor futures
Browse files Browse the repository at this point in the history
usage
  • Loading branch information
kalvinnchau committed Jan 17, 2025
1 parent 06388d2 commit fea18c1
Showing 1 changed file with 84 additions and 53 deletions.
137 changes: 84 additions & 53 deletions crates/goose/src/agents/capabilities.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use chrono::{DateTime, TimeZone, Utc};
use futures::stream;
use futures::stream::StreamExt;
use futures::stream::{FuturesUnordered, StreamExt};
use mcp_client::McpService;
use rust_decimal_macros::dec;
use std::collections::HashMap;
Expand Down Expand Up @@ -370,64 +369,96 @@ impl Capabilities {
return Ok(result);
}

async fn list_resources(&self, params: Value) -> Result<Vec<Content>, ToolError> {
let system = params.get("system").and_then(|v| v.as_str());
async fn list_resources_from_system(
&self,
system_name: &str,
) -> Result<Vec<Content>, ToolError> {
let client = self.clients.get(system_name).ok_or_else(|| {
ToolError::InvalidParameters(format!("System {} is not valid", system_name))
})?;

if system.is_some() {
// grab the system
let client = self.clients.get(system.unwrap()).ok_or_else(|| {
ToolError::InvalidParameters(format!("System {} is not valid", system.unwrap()))
})?;
let client_guard = client.lock().await;
client_guard
.list_resources(None)
.await
.map_err(|e| {
ToolError::ExecutionError(format!(
"Unable to list resources for {}, {:?}",
system_name, e
))
})
.map(|lr| {
let resource_list = lr
.resources
.into_iter()
.map(|r| format!("{}, uri: ({})", r.name, r.uri))
.collect::<Vec<String>>()
.join("\n");

vec![Content::text(resource_list)]
})
}

let client_guard = client.lock().await;
let result = client_guard
.list_resources(None)
.await
.map_err(|e| {
ToolError::ExecutionError(format!(
"Unable to list resources for {}, {:?}",
system.unwrap(),
e
))
})
.map(|lr| {
let resource_list = lr
.resources
.into_iter()
.map(|r| format!("{}, uri: ({})", r.name, r.uri))
.collect::<Vec<String>>()
.join("\n");

vec![Content::text(resource_list)]
});

return result;
}
async fn list_resources(&self, params: Value) -> Result<Vec<Content>, ToolError> {
let system = params.get("system").and_then(|v| v.as_str());

// if no system name, loop through all systems
let results: Vec<_> = stream::iter(self.clients.clone().into_iter())
.then(|(_system_name, client)| async move {
let guard = client.lock().await;
guard.list_resources(None).await
})
.collect()
.await;
match system {
Some(system_name) => {
// Handle single system case
self.list_resources_from_system(system_name).await
}
None => {
// Handle all systems case using FuturesUnordered
let mut futures = FuturesUnordered::new();

// Create futures for each system
for (system_name, client) in &self.clients {
let client = Arc::clone(client);

futures.push(async move {
let guard = client.lock().await;
guard
.list_resources(None)
.await
.map(|r| (system_name.clone(), r))
.map_err(|e| (system_name.clone(), e))
});
}

let (resources, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
let mut all_resources = Vec::new();
let mut errors = Vec::new();

// Process results as they complete
while let Some(result) = futures.next().await {
match result {
Ok((system_name, resource_list)) => {
all_resources.extend(resource_list.resources.into_iter().map(|r| {
format!("{} - {}, uri: ({})", system_name, r.name, r.uri)
}));
}
Err((system_name, e)) => {
errors.push((system_name, e));
}
}
}

let errs: Vec<_> = errs.into_iter().map(Result::unwrap_err).collect();
tracing::error!(errors = ?errs, "errors from listing rsources");
// Log any errors that occurred
if !errors.is_empty() {
tracing::error!(
errors = ?errors
.into_iter()
.map(|(sys, e)| format!("{}: {:?}", sys, e))
.collect::<Vec<_>>(),
"errors from listing resources"
);
}

// take all resources and convert to Content as Tool response
let all_resources_str: String = resources
.into_iter()
.map(Result::unwrap)
.flat_map(|lr| lr.resources)
.map(|resource| format!("{}, uri: ({})", resource.name, resource.uri))
.collect::<Vec<_>>()
.join("\n");
// Sort resources for consistent output
all_resources.sort();

Ok(vec![Content::text(all_resources_str)])
Ok(vec![Content::text(all_resources.join("\n"))])
}
}
}

/// Dispatch a single tool call to the appropriate client
Expand Down

0 comments on commit fea18c1

Please sign in to comment.