Skip to content

Conversation

@alcholiclg
Copy link
Collaborator

Change Summary

This PR adds comprehensive support for the Financial Research project, enabling automated analysis and report generation for financial market tasks.

  1. Multi-Agent Architecture

    • Orchestrator for task decomposition
    • Collector & Analyst agents for financial data collection and analysis
    • Deep Research Agent for public sentiment and news analysis
    • Aggregator for chart/report synthesis
  2. FinancialDataFetcher

    • Introduced FinancialDataFetcher with hybrid data routing via AkShare and BaoStock
    • Supports A-shares and partial HK/US stock data
    • Built-in rate limiting and failover mechanisms for stable high-frequency access
  3. Workflow Enhancements

    • Added DagWorkflow for DAG-style multi-agent orchestration
    • Integrated RateLimiter for generic API access control
  4. Bug Fixes & Improvements

    • Fixed OpenAI._merge_stream_message parsing issue for multiple tool calls
    • Added concurrency limit support in ToolManager.single_call_tool

Related issue number

Checklist

  • The pull request title is a good summary of the changes - it will be used in the changelog
  • Unit tests for the changes exist
  • Run pre-commit install and pre-commit run --all-files before git commit, and passed lint check.
  • Documentation reflects the changes where applicable

@gemini-code-assist
Copy link

Summary of Changes

Hello @alcholiclg, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly expands the system's capabilities by integrating a sophisticated multi-agent framework designed for financial research. It introduces specialized agents for data collection, analysis, and sentiment research, orchestrated through a new DAG-based workflow. A key addition is a versatile financial data fetching tool with adaptive rate limiting, alongside a secure Docker-based code execution environment. These enhancements aim to provide a robust and scalable platform for automated financial market analysis.

Highlights

  • New Financial Research Project Support: Introduced comprehensive support for a Financial Research project, enabling automated analysis and report generation for financial market tasks through a multi-agent architecture.
  • Multi-Agent Orchestration: Implemented a multi-agent architecture featuring an Orchestrator for task decomposition, Collector and Analyst agents for data handling, a Deep Research Agent for sentiment analysis, and an Aggregator for report synthesis. This is facilitated by a new DagWorkflow.
  • Advanced Financial Data Fetcher: Added a FinancialDataFetcher tool with hybrid data routing capabilities, leveraging both AkShare and BaoStock for A-shares, and partial HK/US stock data. It includes built-in rate limiting and failover mechanisms for stable high-frequency access.
  • Code Execution Sandbox: Integrated a CodeExecutionTool that allows executing Python code in isolated Docker sandboxes (Jupyter kernel or plain Docker), complete with port management, resource limits, and file operations.
  • API Rate Limiting and Concurrency Control: Introduced generic RateLimiter and AdaptiveRateLimiter classes for robust API access control, and added concurrency limit support in ToolManager.single_call_tool to prevent API overload.
  • LLM Tool Call Parsing Fix: Fixed a parsing issue in OpenAI._merge_stream_message to correctly handle multiple tool calls, improving the reliability of LLM interactions.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive financial research feature, including a multi-agent architecture orchestrated by a new DAG workflow, a FinancialDataFetcher with hybrid data sources, and a sandboxed CodeExecutionTool. The changes are substantial and well-structured, adding significant new capabilities. My review focuses on improving the robustness and maintainability of these new components. I've identified a potential crash in the FinancialDataFetcher due to incorrect handling of an optional rate limiter, a bug in BaoStockDataSource's argument processing, and several areas for improvement regarding mutable default arguments, error handling, and code consistency.

Comment on lines 75 to 78
self.thread_pool = ThreadPoolExecutor(
max_workers=self.rate_limiter.max_concurrent,
thread_name_prefix='financial_data_fetcher_',
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The _create_rate_limiter method can return None, but self.rate_limiter.max_concurrent is accessed unconditionally when creating the ThreadPoolExecutor. This will raise a critical AttributeError at runtime if no rate limiter is configured. You must handle the case where self.rate_limiter is None and provide a default value for max_workers.

        self.rate_limiter = self._create_rate_limiter(config)
        max_workers = self.rate_limiter.max_concurrent if self.rate_limiter else 3
        self.thread_pool = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix='financial_data_fetcher_',
        )

Copy link
Collaborator Author

@alcholiclg alcholiclg Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 349 to 352
if not extra_kwargs or not extra_kwargs.get('yearType'):
parsed_extra_kwargs['yearType'] = '0'
else:
parsed_extra_kwargs.update(extra_kwargs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's a bug in how extra_kwargs are handled for required_reserve_ratio. If extra_kwargs is provided but does not contain yearType, any other arguments within it will be lost because parsed_extra_kwargs is not initialized with them. This will lead to incorrect API calls. You should copy extra_kwargs first and then set the default for yearType if it's missing.

Suggested change
if not extra_kwargs or not extra_kwargs.get('yearType'):
parsed_extra_kwargs['yearType'] = '0'
else:
parsed_extra_kwargs.update(extra_kwargs)
if extra_kwargs:
parsed_extra_kwargs.update(extra_kwargs)
if 'yearType' not in parsed_extra_kwargs:
parsed_extra_kwargs['yearType'] = '0'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 114 to 120
def get_macro_data(self,
start_date: str,
end_date: str,
data_types: List[str] = [],
extra_kwargs: dict = {}) -> Dict[str, pd.DataFrame]:
"""Get macroeconomic data for multiple categories in one call"""
pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using mutable default arguments like [] and {} is a common pitfall in Python as they are shared across all function calls. If one call modifies the list or dictionary, subsequent calls will receive the modified object. The standard and safer practice is to use None as the default and initialize an empty container inside the function body.

Suggested change
def get_macro_data(self,
start_date: str,
end_date: str,
data_types: List[str] = [],
extra_kwargs: dict = {}) -> Dict[str, pd.DataFrame]:
"""Get macroeconomic data for multiple categories in one call"""
pass
def get_macro_data(self,
start_date: str,
end_date: str,
data_types: Optional[List[str]] = None,
extra_kwargs: Optional[Dict[str, Any]] = None) -> Dict[str, pd.DataFrame]:
"""Get macroeconomic data for multiple categories in one call"""
pass

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

'success': True,
'sandbox_id': info.id,
'status': info.status.value,
'type': info.type,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with how info.status is handled just above, it's better to explicitly use .value to get the string representation of the enum. While default=str in json.dumps might work, being explicit improves clarity and robustness against future changes.

Suggested change
'type': info.type,
'type': info.type.value,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be fixed in another pr.

Comment on lines 31 to 34
if isinstance(config, DictConfig):
sandbox_config = config.get('sandbox', {})
else:
raise ValueError(f'Unknown config type: {type(config)}')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hint for config is Union[DictConfig, dict], but the implementation only handles DictConfig. It should also handle standard dictionaries to match the type hint and be more robust.

Suggested change
if isinstance(config, DictConfig):
sandbox_config = config.get('sandbox', {})
else:
raise ValueError(f'Unknown config type: {type(config)}')
if isinstance(config, (DictConfig, dict)):
sandbox_config = config.get('sandbox', {})
else:
raise ValueError(f'Unknown config type: {type(config)}')

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be fixed in another pr.

Comment on lines 323 to 325
return pd.concat([result_df, df_business_info],
axis=1,
ignore_index=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using ignore_index=True with axis=1 in pd.concat is likely not the intended behavior. When concatenating columns, ignore_index=True has no effect on column labels, and the resulting DataFrame's columns will be re-labeled with integers (0, 1, 2, ...), which is probably not what you want. You should remove ignore_index=True to preserve the original column names from both DataFrames. This is also inconsistent with _get_hk_basic_info, which correctly omits this parameter.

            return pd.concat([result_df, df_business_info],
                             axis=1)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 472 to 473
except Exception:
out['XSJLL_calc'] = pd.NA

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Catching a broad Exception can hide unrelated bugs and make debugging more difficult. It's better to catch more specific exceptions that you expect here, such as ValueError or TypeError if the float() conversion fails.

                    except (ValueError, TypeError):
                        out['XSJLL_calc'] = pd.NA

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add logger.warning to log the exception.

"""
if self.rate_limiter is None:
# No rate limiting, execute directly in thread pool
return await asyncio.to_thread(func, *args, **kwargs)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When self.rate_limiter is None, asyncio.to_thread is used, which runs the function in the default ThreadPoolExecutor. However, when a rate limiter is present, loop.run_in_executor is used with self.thread_pool. This is inconsistent. For consistency and to respect the max_workers configured in __init__, you should use self.thread_pool in both cases.

            return await asyncio.get_event_loop().run_in_executor(self.thread_pool, partial(func, *args, **kwargs))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching to using partial(func, *args, **kwargs) combined with run_in_executor would introduce additional performance overhead. The original logic distinguishes between scenarios with and without rate limiting; when a rate limiter is not used, it leverages the more modern asyncio.to_thread (introduced in Python 3.9+) to run the function more efficiently.

Comment on lines 43 to 44
if not self.roots:
raise ValueError('No root task found for BranchWorkflow')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message refers to BranchWorkflow, but the class is DagWorkflow. This appears to be a copy-paste error and should be corrected to avoid confusion during debugging.

Suggested change
if not self.roots:
raise ValueError('No root task found for BranchWorkflow')
if not self.roots:
raise ValueError('No root task found for DagWorkflow')

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if isinstance(inputs, list):
if isinstance(inputs[0], str):
refractory_inputs = [[Message(role='user', content=input)]
for input in inputs]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The variable name input shadows the built-in function input(). While it works in this context, it's considered bad practice and can lead to confusing bugs if the built-in is needed elsewhere in the scope. It's better to use a different name, such as item or inp.

Suggested change
for input in inputs]
for item in inputs]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@gemini-code-assist
Copy link

Summary of Changes

Hello @alcholiclg, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly expands the system's capabilities by introducing a robust framework for automated financial research. It establishes a sophisticated multi-agent architecture designed to streamline the entire process from data collection and analysis to comprehensive report generation. The changes enhance data acquisition reliability and improve the overall efficiency and scalability of complex analytical workflows.

Highlights

  • Multi-Agent Architecture for Financial Research: Introduced a comprehensive multi-agent system including Orchestrator for task decomposition, Collector and Analyst agents for financial data, a Deep Research Agent for sentiment analysis, and an Aggregator for report synthesis.
  • Enhanced Financial Data Fetching: A new FinancialDataFetcher tool has been added, featuring hybrid data routing via AkShare and BaoStock. It supports A-shares and partial HK/US stock data, with built-in rate limiting and failover mechanisms for stable high-frequency access.
  • Improved Workflow Orchestration: The DagWorkflow has been integrated to enable Directed Acyclic Graph (DAG)-style multi-agent orchestration, allowing for more complex and flexible task flows. A generic RateLimiter has also been added for API access control.
  • Core System Bug Fixes and Improvements: Resolved a parsing issue in OpenAI._merge_stream_message for multiple tool calls and implemented concurrency limit support within ToolManager.single_call_tool for better resource management.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive financial research feature, including a multi-agent DAG workflow, a FinancialDataFetcher with hybrid data source support, and a sandboxed code execution environment. The architecture is well-thought-out, with good features like rate limiting and failover for data fetching. The bug fix for handling multiple tool calls in openai_llm.py is also a welcome improvement. My review focuses on improving the robustness and correctness of the new modules. I've identified a critical bug that could lead to a crash on startup, a few high-severity issues related to fragile code and incorrect imports that could cause runtime failures, and some medium-severity issues to improve code quality and prevent future bugs. Addressing these points will make the new financial research capability more stable and maintainable.

Comment on lines 74 to 78
self.rate_limiter = self._create_rate_limiter(config)
self.thread_pool = ThreadPoolExecutor(
max_workers=self.rate_limiter.max_concurrent,
thread_name_prefix='financial_data_fetcher_',
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The ThreadPoolExecutor is initialized using self.rate_limiter.max_concurrent. However, _create_rate_limiter can return None, in which case self.rate_limiter will be None. Accessing .max_concurrent will then raise an AttributeError, crashing the application. The initialization needs to handle the case where rate_limiter is not configured.

Suggested change
self.rate_limiter = self._create_rate_limiter(config)
self.thread_pool = ThreadPoolExecutor(
max_workers=self.rate_limiter.max_concurrent,
thread_name_prefix='financial_data_fetcher_',
)
self.rate_limiter = self._create_rate_limiter(config)
max_workers = self.rate_limiter.max_concurrent if self.rate_limiter else 5
self.thread_pool = ThreadPoolExecutor(
max_workers=max_workers,
thread_name_prefix='financial_data_fetcher_',
)

ValueError: If sandbox mode is unknown
"""
# Extract sandbox configuration
if isinstance(config, DictConfig):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type hint for config is Union[DictConfig, dict], but the implementation only checks for DictConfig. This will cause a ValueError to be raised if a dict is passed, which contradicts the type hint.

Suggested change
if isinstance(config, DictConfig):
if isinstance(config, (DictConfig, dict)):

Comment on lines 56 to 61
report_path = json.loads(message.content).get(
'report_path', '')
if not report_path:
report_path = extract_code_blocks(
message.content)[0][0].get('code').get(
'report_path', '')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for extracting report_path is fragile. It can fail with an IndexError if extract_code_blocks returns no results, or a JSONDecodeError if the content is not valid JSON. This should be made more robust with try-except blocks and checks for empty lists.

Suggested change
report_path = json.loads(message.content).get(
'report_path', '')
if not report_path:
report_path = extract_code_blocks(
message.content)[0][0].get('code').get(
'report_path', '')
report_path = ''
try:
report_path = json.loads(message.content).get('report_path', '')
except json.JSONDecodeError:
pass # Not a JSON string, try extracting from code block
if not report_path:
try:
code_blocks, _ = extract_code_blocks(message.content)
if code_blocks:
code_content = code_blocks[0].get('code', '{}')
report_path = json.loads(code_content).get('report_path', '')
except (json.JSONDecodeError, IndexError):
logger.warning(f"Could not extract report_path from message: {message.content}")

# Copyright (c) Alibaba, Inc. and its affiliates.
from typing import List

from file_parser import extract_code_blocks

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The import from file_parser import extract_code_blocks is likely to fail with a ModuleNotFoundError. Since file_parser.py is in the same callbacks directory, a relative import from .file_parser import extract_code_blocks should be used.

Suggested change
from file_parser import extract_code_blocks
from .file_parser import extract_code_blocks

Comment on lines 323 to 325
return pd.concat([result_df, df_business_info],
axis=1,
ignore_index=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of ignore_index=True with axis=1 in pd.concat is incorrect. This parameter is intended for concatenation along axis=0. When used with axis=1, it is deprecated and will raise an error in future versions of pandas. It should be removed.

            return pd.concat([result_df, df_business_info],
                             axis=1)

t for t in tasks if 'next' in self.config[t] and indegree[t] == 0
]
if not self.roots:
raise ValueError('No root task found for BranchWorkflow')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a typo in the ValueError message. It should refer to DagWorkflow instead of BranchWorkflow.

Suggested change
raise ValueError('No root task found for BranchWorkflow')
raise ValueError('No root task found for DagWorkflow')

@@ -0,0 +1,93 @@
# Copyright (c) Alibaba, Inc. and its affiliates.
import os
from tkinter import N

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import from tkinter import N is unused and appears to be a leftover from development. It should be removed to keep the code clean.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant