Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nicarq committed Feb 8, 2025
1 parent 8de6579 commit 8e2f0fb
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 60 deletions.
76 changes: 28 additions & 48 deletions shinkai-bin/shinkai-node/src/network/v2_api/api_v2_commands_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,22 @@ use serde_json::{json, Value};
use shinkai_http_api::node_api_router::{APIError, SendResponseBody, SendResponseBodyData};
use shinkai_message_primitives::{
schemas::{
identity::Identity,
inbox_name::InboxName,
job::{ForkedJob, JobLike},
job_config::JobConfig,
llm_providers::serialized_llm_provider::SerializedLLMProvider,
shinkai_name::{ShinkaiName, ShinkaiSubidentityType},
smart_inbox::{SmartInbox, V2SmartInbox},
},
shinkai_message::{
shinkai_message::{MessageBody, MessageData},
shinkai_message_schemas::{
APIChangeJobAgentRequest, ExportInboxMessagesFormat, JobCreationInfo, JobMessage, MessageSchemaType,
V2ChatMessage,
},
},
shinkai_utils::{
job_scope::MinimalJobScope, shinkai_message_builder::ShinkaiMessageBuilder, shinkai_path::ShinkaiPath,
signatures::clone_signature_secret_key,
},
identity::Identity, inbox_name::InboxName, job::{ForkedJob, JobLike}, job_config::JobConfig, llm_providers::serialized_llm_provider::SerializedLLMProvider, shinkai_name::{ShinkaiName, ShinkaiSubidentityType}, smart_inbox::{SmartInbox, V2SmartInbox}
}, shinkai_message::{
shinkai_message::{MessageBody, MessageData}, shinkai_message_schemas::{
APIChangeJobAgentRequest, ExportInboxMessagesFormat, JobCreationInfo, JobMessage, MessageSchemaType, V2ChatMessage
}
}, shinkai_utils::{
job_scope::MinimalJobScope, shinkai_message_builder::ShinkaiMessageBuilder, shinkai_path::ShinkaiPath, signatures::clone_signature_secret_key
}
};

use shinkai_sqlite::SqliteManager;
use tokio::sync::Mutex;
use x25519_dalek::PublicKey as EncryptionPublicKey;

use crate::{
llm_provider::job_manager::JobManager,
managers::IdentityManager,
network::{node_error::NodeError, Node},
llm_provider::job_manager::JobManager, managers::IdentityManager, network::{node_error::NodeError, Node}
};

use x25519_dalek::StaticSecret as EncryptionStaticKey;
Expand Down Expand Up @@ -1257,30 +1244,6 @@ impl Node {
})?
.0;

// Get the main identity from the identity manager
let main_identity = identity_manager
.lock()
.await
.get_main_identity()
.map_or(
Err(APIError {
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
error: "Internal Server Error".to_string(),
message: "Failed to get main identity".to_string(),
}),
|identity| Ok(identity.clone()),
)?
.clone();

let sender = ShinkaiName::new(main_identity.get_full_identity_name())?;

let recipient = ShinkaiName::from_node_and_profile_names_and_type_and_name(
node_name.node_name,
"main".to_string(),
ShinkaiSubidentityType::Agent,
source_job.parent_agent_or_llm_provider_id.clone(),
)?;

// Retrieve the messages from the inbox
let inbox_name = source_job.conversation_inbox_name.to_string();
let last_messages = db
Expand Down Expand Up @@ -1325,6 +1288,23 @@ impl Node {
message: format!("Failed to deserialize job message: {}", err),
})?;

// Extract original sender and recipient from the message
let original_sender = ShinkaiName::from_shinkai_message_using_sender_subidentity(message)
.or_else(|_| ShinkaiName::from_shinkai_message_only_using_sender_node_name(message))
.map_err(|err| APIError {
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
error: "Internal Server Error".to_string(),
message: format!("Failed to extract sender from message: {}", err),
})?;

let original_recipient = ShinkaiName::from_shinkai_message_using_recipient_subidentity(message)
.or_else(|_| ShinkaiName::from_shinkai_message_only_using_recipient_node_name(message))
.map_err(|err| APIError {
code: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
error: "Internal Server Error".to_string(),
message: format!("Failed to extract recipient from message: {}", err),
})?;

job_message.job_id = forked_job_id.clone();
job_message.parent = job_message.parent.map(|parent| {
forked_message_map
Expand All @@ -1334,8 +1314,8 @@ impl Node {
});

let forked_message = Self::api_v2_create_shinkai_message(
sender.clone(),
recipient.clone(),
original_sender,
original_recipient,
&serde_json::to_string(&job_message).unwrap(),
MessageSchemaType::JobMessageSchema,
node_encryption_sk.clone(),
Expand Down
92 changes: 80 additions & 12 deletions shinkai-bin/shinkai-node/tests/it/job_fork_messages_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,94 @@ fn test_fork_job_messages() {

// Verify the forked conversation
let (res2_sender, res2_receiver) = async_channel::bounded(1);
node1_commands_sender
.send(NodeCommand::V2ApiGetAllSmartInboxes {
bearer: node1_api_key.clone(),
limit: None,
offset: None,
show_hidden: None,
res: res2_sender,
})
.await
.unwrap();
let inboxes = res2_receiver.recv().await.unwrap();
eprintln!("Inboxes: {:?}", inboxes);

// Find the two job inboxes (original and forked)
let job_inboxes: Vec<_> = match inboxes {
Ok(inboxes) => inboxes
.iter()
.filter(|inbox| inbox.inbox_id.starts_with("job_inbox::"))
.cloned()
.collect(),
Err(_) => vec![],
};

assert!(
job_inboxes.len() >= 2,
"Should have at least two job inboxes (original and forked)"
);
let original_inbox = &job_inboxes[0];
let forked_inbox = &job_inboxes[1];

// Get messages from original inbox
let (res3_sender, res3_receiver) = async_channel::bounded(1);
node1_commands_sender
.send(NodeCommand::V2ApiGetLastMessagesFromInbox {
bearer: node1_api_key.to_string(),
inbox_name: InboxName::get_job_inbox_name_from_params(job_fork_id)
.unwrap()
.to_string(),
bearer: node1_api_key.clone(),
inbox_name: original_inbox.inbox_id.clone(),
limit: 8,
offset_key: None,
res: res2_sender,
res: res3_sender,
})
.await
.unwrap();
let forked_messages = res2_receiver.recv().await.unwrap();
println!("Forked messages: {:?}", forked_messages);
let original_messages = res3_receiver.recv().await.unwrap();

assert_eq!(
forked_messages.unwrap().len(),
4,
"Forked messages should match original message count"
);
// Get messages from forked inbox
let (res4_sender, res4_receiver) = async_channel::bounded(1);
node1_commands_sender
.send(NodeCommand::V2ApiGetLastMessagesFromInbox {
bearer: node1_api_key.clone(),
inbox_name: forked_inbox.inbox_id.clone(),
limit: 8,
offset_key: None,
res: res4_sender,
})
.await
.unwrap();
let forked_messages = res4_receiver.recv().await.unwrap();

// Compare messages from both inboxes
if let (Ok(original_messages), Ok(forked_messages)) = (original_messages, forked_messages) {
assert_eq!(
original_messages.len(),
forked_messages.len(),
"Original and forked messages should have the same length"
);
assert!(original_messages.len() == 4, "Should have exactly 4 messages");

for (original_message, forked_message) in original_messages.iter().zip(forked_messages.iter()) {
assert_eq!(
original_message.sender_subidentity, forked_message.sender_subidentity,
"Original and forked messages should have the same sender subidentity"
);
assert_eq!(
original_message.sender, forked_message.sender,
"Original and forked messages should have the same sender"
);
assert_eq!(
original_message.job_message.content, forked_message.job_message.content,
"Original and forked messages should have the same content"
);
assert_eq!(
original_message.job_message.parent, forked_message.job_message.parent,
"Original and forked messages should have the same parent"
);
}
} else {
panic!("Failed to get messages from inboxes");
}

eprintln!("Job fork messages test completed");
node1_abort_handler.abort();
Expand Down

0 comments on commit 8e2f0fb

Please sign in to comment.