Skip to content

feat: sequence task workforce #2055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion camel/societies/workforce/workforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import json
import logging
from collections import deque
from typing import Deque, Dict, List, Optional
from typing import Any, Deque, Dict, List, Optional

from colorama import Fore

Expand Down Expand Up @@ -79,6 +79,8 @@ def __init__(
new_worker_agent_kwargs: Optional[Dict] = None,
) -> None:
super().__init__(description)
self.shared_context: Dict[str, Dict[str, Any]] = {}
self.shared_channel: TaskChannel = TaskChannel()
self._child_listening_tasks: Deque[asyncio.Task] = deque()
self._children = children or []
self.new_worker_agent_kwargs = new_worker_agent_kwargs
Expand Down Expand Up @@ -154,6 +156,70 @@ def process_task(self, task: Task) -> Task:

return task

@check_if_running(False)
async def process_task_sequence_async(
self, tasks: List[Task]
) -> List[Task]:
r"""Asynchronous method to process a sequence of tasks
with shared context.
This method will initialize the shared channel,
maintain a shared context
across tasks, and process each task
and its subtasks in order.

The result of each task will be stored in `self.shared_context`
under the key `task_{task.id}`.

Args:
tasks (List[Task]): A list of tasks to be processed sequentially.

Returns:
List[Task]: A list of updated tasks with results populated.
"""
self.reset()
self.set_channel(self.shared_channel)

results = []

for task in tasks:
self._task = task
task.state = TaskState.FAILED

subtasks = self._decompose_task(task)
self._pending_tasks.clear()
self._pending_tasks.append(task)
self._pending_tasks.extendleft(reversed(subtasks))

await self.start()

# share context between tasks in list
self.shared_context[f"task_{task.id}"] = {
"content": task.content,
"result": task.result,
}

results.append(task)

return results

@check_if_running(False)
def process_task_sequence(self, tasks: List[Task]) -> List[Task]:
r"""The main entry point for the workforce to process
a sequence of tasks.
It will process all tasks in order, sharing context
across them, and return the updated list of tasks.

Internally it runs the asynchronous method
`process_task_sequence_async`.

Args:
tasks (List[Task]): A list of tasks to be processed sequentially.

Returns:
List[Task]: A list of updated tasks with results populated.
"""
return asyncio.run(self.process_task_sequence_async(tasks))

@check_if_running(False)
def add_single_agent_worker(
self, description: str, worker: ChatAgent
Expand Down
153 changes: 153 additions & 0 deletions examples/workforce/task_sequence_hackathon_judges.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import textwrap

from camel.agents import ChatAgent
from camel.messages import BaseMessage
from camel.models import ModelFactory
from camel.societies.workforce import Workforce
from camel.tasks import Task
from camel.toolkits import FunctionTool, SearchToolkit
from camel.types import ModelPlatformType, ModelType


def make_judge(
persona: str, example_feedback: str, criteria: str
) -> ChatAgent:
content = textwrap.dedent(
f"""\
You are a judge in a hackathon.
Persona: {persona}
Example feedback: {example_feedback}
Criteria:
{criteria}
Give a score like 3/4, 4/4, etc. Be true to your persona.
"""
)
return ChatAgent(
system_message=BaseMessage.make_assistant_message(
role_name="Hackathon Judge", content=content
),
model=ModelFactory.create(
ModelPlatformType.DEFAULT, ModelType.DEFAULT
),
)


def make_head_judge() -> ChatAgent:
content = textwrap.dedent(
"""\
You are the head judge in a hackathon.
Your task is to read the reviews and scores from other judges,
and summarize them into one unified final evaluation.
You should include each judge's score, identity, and feedback,
and then offer a final paragraph summary for the project.
"""
)
return ChatAgent(
system_message=BaseMessage.make_assistant_message(
role_name="Head Judge", content=content
),
model=ModelFactory.create(
ModelPlatformType.DEFAULT, ModelType.DEFAULT
),
)


def main():
# Judges
vc_agent = make_judge(
"A buzzword-loving VC looking for unicorn potential.",
"Absolutely disruptive! Perfect for the FinTech ecosystem!",
"### Market Potential (1-4):\n- 4: Unicorn-ready\n- 1: No market",
)

eng_agent = make_judge(
"A perfectionist systems engineer who hates inefficiencies.",
"Architecture is unstable and barely performs under load.",
"### Technical Quality (1-4):\n- 4: Flawless\n- 1: Broken",
)

contributor_agent = make_judge(
"A CAMEL contributor who loves to see CAMEL used creatively.",
"Love how CAMEL-AI is integrated! So much potential!",
"### CAMEL Integration (1-4):\n- 4: Advanced use\n- 1: Basic use",
)

researcher_agent = ChatAgent(
system_message=BaseMessage.make_assistant_message(
role_name="Researcher",
content="You search the web to provide supporting context.",
),
model=ModelFactory.create(
ModelPlatformType.DEFAULT, ModelType.DEFAULT
),
tools=[
FunctionTool(SearchToolkit().search_google),
],
)

head_judge = make_head_judge()

# Create workforce
workforce = Workforce("Hackathon Judging Committee")
workforce.add_single_agent_worker("VC Veronica", vc_agent)
workforce.add_single_agent_worker("Engineer Ethan", eng_agent)
workforce.add_single_agent_worker("Contributor Clara", contributor_agent)
workforce.add_single_agent_worker("Researcher Rachel", researcher_agent)
workforce.add_single_agent_worker("Head Judge Henry", head_judge)

# Build task sequence
task_list = [
Task(
id="research",
content=(
"Do a brief research on the hackathon project. Summarize the "
"core idea, "
"technology used, innovation, and potential impact."
),
),
Task(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments on these?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like step by step run all task ....

id="judge_A",
content=(
"As Judge Alice, review the project based on the research "
"summary. "
"Provide your opinion and give a score from 1 to 10. Be "
"specific about "
"what you liked or disliked."
),
),
Task(
id="judge_B",
content=(
"As Judge Bob, evaluate the project using your own criteria. "
"Based on the research summary, give a thoughtful critique "
"and assign "
"a score from 1 to 10."
),
),
]

results = workforce.process_task_sequence(task_list)

for t in results:
print(f"Task {t.id} result:\n{t.result}\n")

print("=== Shared Context ===")
for k, v in workforce.shared_context.items():
print(f"{k}: {v['result']}\n")


if __name__ == "__main__":
main()
93 changes: 93 additions & 0 deletions test/workforce/test_workforce_single_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
from unittest.mock import AsyncMock, MagicMock

import pytest

from camel.agents.chat_agent import ChatAgent
from camel.messages.base import BaseMessage
from camel.societies.workforce.single_agent_worker import SingleAgentWorker
from camel.societies.workforce.workforce import Workforce
from camel.tasks.task import Task


Expand All @@ -36,3 +39,93 @@ async def test_get_dep_tasks_info():
await test_worker._process_task(
human_task, subtasks
) # TODO: use MagicMock


@pytest.mark.asyncio
async def test_task_sequence_workforce():
"""Tests that the Workforce correctly processes a sequence of tasks
and propagates results to the shared context."""

sys_msg = BaseMessage.make_assistant_message(
role_name="researcher",
content="You are a researcher who can analyze projects.",
)

researcher_agent = ChatAgent(sys_msg)

mock_agent = MagicMock()
mock_agent.step = AsyncMock(return_value=("Score: 8/10", None))

workforce = Workforce("Review Committee")
workforce.add_single_agent_worker("Researcher", researcher_agent)
workforce.add_single_agent_worker("Judge A", mock_agent)
workforce.add_single_agent_worker("Judge B", mock_agent)

original_start = workforce.start

async def mock_start():
if workforce._task:
workforce._task.result = (
f"Processed result: " f"{workforce._task.content}"
)

workforce.start = AsyncMock(side_effect=mock_start)
workforce._decompose_task = MagicMock(return_value=[])

task_list = [
Task(
id="research",
content="Conduct brief research on the test project. Summarize "
"core ideas, technology used, and potential impact.",
),
Task(
id="judge_A",
content="As Judge A, review the project based on the research "
"summary. Provide feedback and score from 1 to 10.",
),
Task(
id="judge_B",
content="As Judge B, evaluate the project using your own "
"criteria. Based on research, provide critique and score "
"from 1 to 10.",
),
]

results = await workforce.process_task_sequence_async(task_list)

assert len(results) == 3
assert all(task.result is not None for task in results)
assert "task_research" in workforce.shared_context
assert "task_judge_A" in workforce.shared_context
assert "task_judge_B" in workforce.shared_context

for task in results:
assert (
workforce.shared_context[f"task_{task.id}"]["result"]
== f"Processed result: {task.content}"
)
assert (
workforce.shared_context[f"task_{task.id}"]["content"]
== task.content
)

single_task = Task(id="test_task", content="Content that needs processing")

workforce.shared_context.clear()

single_result = await workforce.process_task_sequence_async([single_task])

assert (
single_result[0].result
== "Processed result: Content that needs processing"
)
assert (
workforce.shared_context["task_test_task"]["result"]
== "Processed result: Content that needs processing"
)
assert (
workforce.shared_context["task_test_task"]["content"]
== "Content that needs processing"
)

workforce.start = original_start