-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: adds default tools that can use MCP resources #619
Changes from 12 commits
b2820b6
16ab2f9
35ef405
003c02a
1eacaa1
a6875da
03fe456
39752e5
a2dfeda
46b704f
fcec628
00b4203
428e03d
377f51f
c588729
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
use chrono::{DateTime, TimeZone, Utc}; | ||
use futures::stream::{FuturesUnordered, StreamExt}; | ||
use mcp_client::McpService; | ||
use rust_decimal_macros::dec; | ||
use std::collections::HashMap; | ||
use std::collections::{HashMap, HashSet}; | ||
use std::sync::Arc; | ||
use std::sync::LazyLock; | ||
use std::time::Duration; | ||
|
@@ -14,6 +15,7 @@ use crate::providers::base::{Provider, ProviderUsage}; | |
use mcp_client::client::{ClientCapabilities, ClientInfo, McpClient, McpClientTrait}; | ||
use mcp_client::transport::{SseTransport, StdioTransport, Transport}; | ||
use mcp_core::{Content, Tool, ToolCall, ToolError, ToolResult}; | ||
use serde_json::Value; | ||
|
||
// By default, we set it to Jan 1, 2020 if the resource does not have a timestamp | ||
// This is to ensure that the resource is considered less important than resources with a more recent timestamp | ||
|
@@ -24,6 +26,7 @@ static DEFAULT_TIMESTAMP: LazyLock<DateTime<Utc>> = | |
pub struct Capabilities { | ||
clients: HashMap<String, Arc<Mutex<Box<dyn McpClientTrait>>>>, | ||
instructions: HashMap<String, String>, | ||
resource_capable_systems: HashSet<String>, | ||
provider: Box<dyn Provider>, | ||
provider_usage: Mutex<Vec<ProviderUsage>>, | ||
} | ||
|
@@ -81,11 +84,16 @@ impl Capabilities { | |
Self { | ||
clients: HashMap::new(), | ||
instructions: HashMap::new(), | ||
resource_capable_systems: HashSet::new(), | ||
provider, | ||
provider_usage: Mutex::new(Vec::new()), | ||
} | ||
} | ||
|
||
pub fn supports_resources(&self) -> bool { | ||
!self.resource_capable_systems.is_empty() | ||
} | ||
|
||
/// Add a new MCP system based on the provided client type | ||
// TODO IMPORTANT need to ensure this times out if the system command is broken! | ||
pub async fn add_system(&mut self, config: SystemConfig) -> SystemResult<()> { | ||
|
@@ -142,6 +150,12 @@ impl Capabilities { | |
.insert(init_result.server_info.name.clone(), instructions); | ||
} | ||
|
||
// if the server is capable if resources we track it | ||
if init_result.capabilities.resources.is_some() { | ||
self.resource_capable_systems | ||
.insert(init_result.server_info.name.clone()); | ||
} | ||
|
||
// Store the client | ||
self.clients.insert( | ||
sanitize(init_result.server_info.name.clone()), | ||
|
@@ -284,7 +298,8 @@ impl Capabilities { | |
.keys() | ||
.map(|name| { | ||
let instructions = self.instructions.get(name).cloned().unwrap_or_default(); | ||
SystemInfo::new(name, "", &instructions) | ||
let has_resources = self.resource_capable_systems.contains(name); | ||
SystemInfo::new(name, "", &instructions, has_resources) | ||
}) | ||
.collect(); | ||
|
||
|
@@ -303,25 +318,209 @@ impl Capabilities { | |
.map(Arc::clone) | ||
} | ||
|
||
/// Dispatch a single tool call to the appropriate client | ||
#[instrument(skip(self, tool_call), fields(input, output))] | ||
pub async fn dispatch_tool_call(&self, tool_call: ToolCall) -> ToolResult<Vec<Content>> { | ||
// Function that gets executed for read_resource tool | ||
async fn read_resource(&self, params: Value) -> Result<Vec<Content>, ToolError> { | ||
let uri = params | ||
.get("uri") | ||
.and_then(|v| v.as_str()) | ||
.ok_or_else(|| ToolError::InvalidParameters("Missing 'uri' parameter".to_string()))?; | ||
|
||
let system_name = params.get("system_name").and_then(|v| v.as_str()); | ||
|
||
// If system name is provided, we can just look it up | ||
if system_name.is_some() { | ||
let result = self | ||
.read_resource_from_system(uri, system_name.unwrap()) | ||
.await?; | ||
return Ok(result); | ||
} | ||
|
||
// If system name is not provided, we need to search for the resource across all systems | ||
// Loop through each system and try to read the resource, don't raise an error if the resource is not found | ||
// TODO: do we want to find if a provided uri is in multiple systems? | ||
// currently it will reutrn the first match and skip any systems | ||
for system_name in self.clients.keys() { | ||
let result = self.read_resource_from_system(uri, system_name).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: either make a note that we assume no risk of collision for generic names or ensure num_matches == 1 before proceeding? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm good point, if two systems have the same resource uri, it would find the first match and exit the loop |
||
match result { | ||
Ok(result) => return Ok(result), | ||
Err(_) => continue, | ||
} | ||
} | ||
|
||
// None of the systems had the resource so we raise an error | ||
let available_systems = self | ||
.clients | ||
.keys() | ||
.map(|s| s.as_str()) | ||
.collect::<Vec<&str>>() | ||
.join(", "); | ||
let error_msg = format!( | ||
"Resource with uri '{}' not found. Here are the available systems: {}", | ||
uri, available_systems | ||
); | ||
|
||
Err(ToolError::InvalidParameters(error_msg)) | ||
} | ||
|
||
async fn read_resource_from_system( | ||
&self, | ||
uri: &str, | ||
system_name: &str, | ||
) -> Result<Vec<Content>, ToolError> { | ||
let available_systems = self | ||
.clients | ||
.keys() | ||
.map(|s| s.as_str()) | ||
.collect::<Vec<&str>>() | ||
.join(", "); | ||
let error_msg = format!( | ||
"System '{}' not found. Here are the available systems: {}", | ||
system_name, available_systems | ||
); | ||
|
||
let client = self | ||
.get_client_for_tool(&tool_call.name) | ||
.ok_or_else(|| ToolError::NotFound(tool_call.name.clone()))?; | ||
.clients | ||
.get(system_name) | ||
.ok_or(ToolError::InvalidParameters(error_msg))?; | ||
|
||
let client_guard = client.lock().await; | ||
let read_result = client_guard.read_resource(uri).await.map_err(|_| { | ||
ToolError::ExecutionError(format!("Could not read resource with uri: {}", uri)) | ||
})?; | ||
|
||
let mut result = Vec::new(); | ||
for content in read_result.contents { | ||
// Only reading the text resource content; skipping the blob content cause it's too long | ||
if let mcp_core::resource::ResourceContents::TextResourceContents { text, .. } = content | ||
{ | ||
let content_str = format!("{}\n\n{}", uri, text); | ||
result.push(Content::text(content_str)); | ||
} | ||
} | ||
|
||
Ok(result) | ||
} | ||
|
||
let tool_name = tool_call | ||
.name | ||
.split("__") | ||
.nth(1) | ||
.ok_or_else(|| ToolError::NotFound(tool_call.name.clone()))?; | ||
async fn list_resources_from_system( | ||
&self, | ||
system_name: &str, | ||
) -> Result<Vec<Content>, ToolError> { | ||
let client = self.clients.get(system_name).ok_or_else(|| { | ||
ToolError::InvalidParameters(format!("System {} is not valid", system_name)) | ||
})?; | ||
|
||
let client_guard = client.lock().await; | ||
let result = client_guard | ||
.call_tool(tool_name, tool_call.clone().arguments) | ||
client_guard | ||
.list_resources(None) | ||
.await | ||
.map(|result| result.content) | ||
.map_err(|e| ToolError::ExecutionError(e.to_string())); | ||
.map_err(|e| { | ||
ToolError::ExecutionError(format!( | ||
"Unable to list resources for {}, {:?}", | ||
system_name, e | ||
)) | ||
}) | ||
.map(|lr| { | ||
let resource_list = lr | ||
.resources | ||
.into_iter() | ||
.map(|r| format!("{}, uri: ({})", r.name, r.uri)) | ||
.collect::<Vec<String>>() | ||
.join("\n"); | ||
|
||
vec![Content::text(resource_list)] | ||
}) | ||
} | ||
|
||
async fn list_resources(&self, params: Value) -> Result<Vec<Content>, ToolError> { | ||
let system = params.get("system").and_then(|v| v.as_str()); | ||
|
||
match system { | ||
Some(system_name) => { | ||
// Handle single system case | ||
self.list_resources_from_system(system_name).await | ||
} | ||
None => { | ||
// Handle all systems case using FuturesUnordered | ||
let mut futures = FuturesUnordered::new(); | ||
|
||
// Create futures for each system | ||
for (system_name, client) in &self.clients { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can skip some steps by iterating over |
||
let client = Arc::clone(client); | ||
|
||
futures.push(async move { | ||
let guard = client.lock().await; | ||
guard | ||
.list_resources(None) | ||
.await | ||
.map(|r| (system_name.clone(), r)) | ||
.map_err(|e| (system_name.clone(), e)) | ||
}); | ||
} | ||
|
||
let mut all_resources = Vec::new(); | ||
let mut errors = Vec::new(); | ||
|
||
// Process results as they complete | ||
while let Some(result) = futures.next().await { | ||
match result { | ||
Ok((system_name, resource_list)) => { | ||
all_resources.extend(resource_list.resources.into_iter().map(|r| { | ||
format!("{} - {}, uri: ({})", system_name, r.name, r.uri) | ||
})); | ||
} | ||
Err((system_name, e)) => { | ||
errors.push((system_name, e)); | ||
} | ||
} | ||
} | ||
|
||
// Log any errors that occurred | ||
if !errors.is_empty() { | ||
tracing::error!( | ||
errors = ?errors | ||
.into_iter() | ||
.map(|(sys, e)| format!("{}: {:?}", sys, e)) | ||
.collect::<Vec<_>>(), | ||
"errors from listing resources" | ||
); | ||
} | ||
|
||
// Sort resources for consistent output | ||
all_resources.sort(); | ||
|
||
Ok(vec![Content::text(all_resources.join("\n"))]) | ||
} | ||
} | ||
} | ||
|
||
/// Dispatch a single tool call to the appropriate client | ||
#[instrument(skip(self, tool_call), fields(input, output))] | ||
pub async fn dispatch_tool_call(&self, tool_call: ToolCall) -> ToolResult<Vec<Content>> { | ||
let result = if tool_call.name == "platform__read_resource" { | ||
// Check if the tool is read_resource and handle it separately | ||
self.read_resource(tool_call.arguments.clone()).await | ||
} else if tool_call.name == "platform__list_resources" { | ||
self.list_resources(tool_call.arguments.clone()).await | ||
} else { | ||
// Else, dispatch tool call based on the prefix naming convention | ||
let client = self | ||
.get_client_for_tool(&tool_call.name) | ||
.ok_or_else(|| ToolError::NotFound(tool_call.name.clone()))?; | ||
|
||
let tool_name = tool_call | ||
.name | ||
.split("__") | ||
.nth(1) | ||
.ok_or_else(|| ToolError::NotFound(tool_call.name.clone()))?; | ||
|
||
let client_guard = client.lock().await; | ||
|
||
client_guard | ||
.call_tool(tool_name, tool_call.clone().arguments) | ||
.await | ||
.map(|result| result.content) | ||
.map_err(|e| ToolError::ExecutionError(e.to_string())) | ||
}; | ||
|
||
debug!( | ||
"input" = serde_json::to_string(&tool_call).unwrap(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,10 @@ use crate::providers::base::Provider; | |
use crate::providers::base::ProviderUsage; | ||
use crate::register_agent; | ||
use crate::token_counter::TokenCounter; | ||
use serde_json::Value; | ||
use indoc::indoc; | ||
use mcp_core::tool::Tool; | ||
use serde_json::{json, Value}; | ||
|
||
/// Reference implementation of an Agent | ||
pub struct ReferenceAgent { | ||
capabilities: Mutex<Capabilities>, | ||
|
@@ -65,7 +68,62 @@ impl Agent for ReferenceAgent { | |
let mut messages = messages.to_vec(); | ||
let reply_span = tracing::Span::current(); | ||
let mut capabilities = self.capabilities.lock().await; | ||
let tools = capabilities.get_prefixed_tools().await?; | ||
let mut tools = capabilities.get_prefixed_tools().await?; | ||
// we add in the read_resource tool by default | ||
// TODO: make sure there is no collision with another system's tool name | ||
let read_resource_tool = Tool::new( | ||
"platform__read_resource".to_string(), | ||
indoc! {r#" | ||
Read a resource from a system. | ||
|
||
Resources allow systems to share data that provide context to LLMs, such as | ||
files, database schemas, or application-specific information. This tool searches for the | ||
resource URI in the provided system, and reads in the resource content. If no system | ||
is provided, the tool will search all systems for the resource. | ||
|
||
The read_resource tool is typically used with a search query (can be before or after). Here are two examples: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: i'd remove these extra tips unless we find they are needed tips? not sure if it was tested. i've been noticing these specific examples don't necessarily make it better at using the tools but can confuse it into hallucinating usage that isn't relevant, with frontier models at least |
||
1. Search for files in Google Drive MCP Server, then call read_resource(gdrive:///<file_id>) to read the Google Drive file. | ||
2. You need to gather schema information about a Postgres table before creating a query. So you call read_resource(postgres://<host>/<table>/schema) | ||
to get the schema information for a table and then use to construct your query. | ||
"#}.to_string(), | ||
json!({ | ||
"type": "object", | ||
"required": ["uri"], | ||
"properties": { | ||
"uri": {"type": "string", "description": "Resource URI"}, | ||
"system_name": {"type": "string", "description": "Optional system name"} | ||
} | ||
}), | ||
); | ||
|
||
let list_resources_tool = Tool::new( | ||
"platform__list_resources".to_string(), | ||
indoc! {r#" | ||
List resources from a system(s). | ||
|
||
Resources allow systems to share data that provide context to LLMs, such as | ||
files, database schemas, or application-specific information. This tool lists resources | ||
in the provided system, and returns a list for the user to browse. If no system | ||
is provided, the tool will search all systems for the resource. | ||
|
||
The list_resources tool is typically used before a read_resource tool call. Here are two examples: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: i'd remove these extra tips unless we find they are needed tips? not sure if it was tested. i've been noticing these specific examples don't necessarily make it better at using the tools but can confuse it into hallucinating usage that isn't relevant, with frontier models at least |
||
1. List files in Google Drive MCP Server, then call read_resource(gdrive:///<file_id>) to read the Google Drive file. | ||
2. You want to see what tables exist in Postgre so you can find schema information about that table before creating a query. So you call list_resources to see whats available then you call read_resource(postgres://<host>/<table>/schema) | ||
to get the schema information for a table and then use to construct your query. | ||
"#}.to_string(), | ||
json!({ | ||
"type": "object", | ||
"properties": { | ||
"system_name": {"type": "string", "description": "Optional system name"} | ||
} | ||
}), | ||
); | ||
|
||
if capabilities.supports_resources() { | ||
tools.push(read_resource_tool); | ||
tools.push(list_resources_tool); | ||
} | ||
|
||
let system_prompt = capabilities.get_system_prompt().await; | ||
let _estimated_limit = capabilities | ||
.provider() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can skip some steps by searching only self.resource_capable_system (like below)