Skip to content
This repository was archived by the owner on Feb 3, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,4 @@ openapi.json

*.db
*.db-*
/tentacles
2 changes: 2 additions & 0 deletions octobot_node/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import argparse
import sys
import logging

try:
import uvicorn
Expand All @@ -30,6 +31,7 @@


def start_server(args):
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)-8s %(name)-24s %(message)s")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the right place, we just need to enable logs to see what happens when minibot runs tasks

port = args.port or 8000

# This must be done before the scheduler module is imported
Expand Down
163 changes: 163 additions & 0 deletions octobot_node/scheduler/octobot_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# This file is part of OctoBot Node (https://github.com/Drakkar-Software/OctoBot-Node)
# Copyright (c) 2025 Drakkar-Software, All rights reserved.
#
# OctoBot is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either
# version 3.0 of the License, or (at your option) any later version.
#
# OctoBot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.
import typing
import dataclasses
import json
import logging

import octobot_commons.list_util as list_util
import octobot_commons.dataclasses

try:
import mini_octobot
import mini_octobot.environment
import mini_octobot.parsers

# Requires mini_octobot import and importable tentacles folder

# ensure environment is initialized
mini_octobot.environment.initialize_environment(True)

except ImportError:
logging.getLogger("octobot_node.scheduler.octobot_lib").warning("OctoBot is not installed, OctoBot actions will not be available")
# mocks to allow import
class mini_octobot_mock:
class BotActionDetails:
def from_dict(self, *args, **kwargs):
raise NotImplementedError("BotActionDetails.from_dict is not implemented")
class SingleBotActionsJob:
def __init__(self, *args, **kwargs):
raise NotImplementedError("SingleBotActionsJob.__init__ is not implemented")
async def __aenter__(self):
raise NotImplementedError("SingleBotActionsJob.__aenter__ is not implemented")
async def __aexit__(self, *args, **kwargs):
raise NotImplementedError("SingleBotActionsJob.__aexit__ is not implemented")
class parsers:
class BotActionBundleParser:
def __init__(self, *args, **kwargs):
raise NotImplementedError("BotActionBundleParser.__init__ is not implemented")
def parse(self, *args, **kwargs):
raise NotImplementedError("BotActionBundleParser.parse is not implemented")
mini_octobot = mini_octobot_mock()


@dataclasses.dataclass
class OctoBotActionsJobDescription(octobot_commons.dataclasses.MinimizableDataclass):
state: dict = dataclasses.field(default_factory=dict)
auth_details: dict = dataclasses.field(default_factory=dict)
params: dict = dataclasses.field(default_factory=dict)
immediate_actions: list[mini_octobot.BotActionDetails] = dataclasses.field(default_factory=list)
pending_actions: list[list[mini_octobot.BotActionDetails]] = dataclasses.field(default_factory=list)

def __post_init__(self):
if self.immediate_actions and isinstance(self.immediate_actions[0], dict):
self.immediate_actions = [
mini_octobot.BotActionDetails.from_dict(action) for action in self.immediate_actions
]
if self.pending_actions and self.pending_actions[0] and isinstance(self.pending_actions[0][0], dict):
self.pending_actions = [
[mini_octobot.BotActionDetails.from_dict(action) for action in bundle]
for bundle in self.pending_actions
]
if self.params:
if self.immediate_actions or self.pending_actions:
raise ValueError("adding extra actions to a task is not yet supported")
self._parse_actions_plan(self.params)

def _parse_actions_plan(self, params: dict) -> None:
action_bundles: list[list[mini_octobot.BotActionDetails]] = mini_octobot.parsers.BotActionBundleParser(params).parse()
if not action_bundles:
raise ValueError("No action bundles found in params")
self.immediate_actions = action_bundles[0]
self.pending_actions = action_bundles[1:]

def get_next_execution_time(self) -> float:
return min(
bot["execution"]["current_execution"]["scheduled_to"]
for bot in self.state["bots"]
)


@dataclasses.dataclass
class OctoBotActionsJobResult:
processed_actions: list[mini_octobot.BotActionDetails]
next_actions_description: typing.Optional[OctoBotActionsJobDescription] = None

def get_created_orders(self) -> list[dict]:
if self.processed_actions is None:
raise ValueError("No bot actions were executed yet")
order_lists = [
action.result.get("orders", [])
for action in self.processed_actions
if action.result
]
return list_util.flatten_list(order_lists) if order_lists else []


class OctoBotActionsJob:
def __init__(self, description: str):
parsed_description = self._parse_description(description)
self.description: OctoBotActionsJobDescription = OctoBotActionsJobDescription.from_dict(
parsed_description
)
self.after_execution_state = None

def _parse_description(self, description: str) -> dict:
# TODO update this method once the decision about description is made (is it a dict or a string key=val; string)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method might become useless, depending on the initial actions format

try:
if isinstance(description, dict):
parsed_description = description
else:
# normal case: description is a JSON string
parsed_description = json.loads(description)
except json.JSONDecodeError:
# legacy case: description is a string of key-value pairs
parsed_description = {
"params": mini_octobot.parsers.key_val_to_dict(description),
}
parsed_description["params"]["SIMULATED_PORTFOLIO"] = {
"ETH": 1,
}
Comment on lines +131 to +133
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a hard coded simulated portfolio, we will see if we keep this mechanism or not later on

return parsed_description

async def run(self) -> OctoBotActionsJobResult:
selected_actions = self.description.immediate_actions
async with mini_octobot.SingleBotActionsJob(
self.description.state, self.description.auth_details, selected_actions
) as single_bot_actions_job:
logging.getLogger(self.__class__.__name__).info(f"Running single bot actions job actions: {selected_actions}")
await single_bot_actions_job.run()
self.after_execution_state = single_bot_actions_job.exchange_account_details
post_execution_state_dump = single_bot_actions_job.dump()
return OctoBotActionsJobResult(
processed_actions=single_bot_actions_job.bot_actions,
next_actions_description=self.get_next_actions_description(post_execution_state_dump)
)

def get_next_actions_description(
self, post_execution_state: dict
) -> typing.Optional[OctoBotActionsJobDescription]:
if not self.description.pending_actions:
# completed all actions
return None
return OctoBotActionsJobDescription(
state=post_execution_state,
auth_details=self.description.auth_details,
# next immediate actions are the first remaining pending actions
immediate_actions=self.description.pending_actions[0],
# next pending actions are the remaining pending actions
pending_actions=self.description.pending_actions[1:]
)
11 changes: 7 additions & 4 deletions octobot_node/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,18 @@ def get_results(self) -> list[dict]:
if result_obj is None:
description = f"Task completed (unable to parse result)"
status = TaskStatus.COMPLETED
result = ""
metadata = ""
elif isinstance(result_obj, HueyError):
description = f"Task failed: {result_obj.error}"
description = f"Task failed: {result_obj.metadata.get('error')}"
status = TaskStatus.FAILED
result = ""
metadata = ""
else:
description = f"Task completed"
status = TaskStatus.COMPLETED

result = result_obj.get(TaskResultKeys.RESULT.value)
metadata = result_obj.get(TaskResultKeys.METADATA.value)
result = result_obj.get(TaskResultKeys.RESULT.value)
metadata = result_obj.get(TaskResultKeys.METADATA.value)
Comment on lines +118 to +129
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a fix of a crash related to tasks with results or raised errors


tasks.append({
"id": task_id,
Expand Down
51 changes: 47 additions & 4 deletions octobot_node/scheduler/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,38 @@
# You should have received a copy of the GNU General Public
# License along with OctoBot. If not, see <https://www.gnu.org/licenses/>.

import functools
import datetime
import asyncio
import json

from octobot_node.scheduler import SCHEDULER
from octobot_node.scheduler.task_context import encrypted_task
from octobot_node.app.models import Task, TaskType
from octobot_node.app.enums import TaskResultKeys
from octobot_node.app.models import TaskStatus

import octobot_node.scheduler.octobot_lib as octobot_lib


def async_task(func):
"""
Decorator to ensure that the function it wraps is a non-async function that can then use asyncio.run(), e.g. Huey tasks.
Huey tasks will be called in one of 2 contexts: either they are the top-level function(ish) in the process, and there is no loop yet, or we are running tests in an an async context already and we need to re-use the current loop.
"""

@functools.wraps(func)
def wrapper_decorator(*args, **kwargs):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(func(*args, **kwargs))
return loop.run_until_complete(task)

return wrapper_decorator


@SCHEDULER.INSTANCE.task()
def start_octobot(task: Task):
Expand All @@ -35,17 +61,34 @@ def start_octobot(task: Task):
}


def _reshedule_octobot_execution(task: Task, next_actions_description: octobot_lib.OctoBotActionsJobDescription):
task.content = json.dumps(next_actions_description.to_dict(include_default_values=False))
if next_actions_description.get_next_execution_time() == 0:
next_execution_time = datetime.datetime.now(tz=datetime.timezone.utc)
else:
next_execution_time = datetime.datetime.fromtimestamp(
next_actions_description.get_next_execution_time(),
tz=datetime.timezone.utc
)
execute_octobot.schedule(args=[task], eta=next_execution_time)


@SCHEDULER.INSTANCE.task()
def execute_octobot(task: Task):
@async_task
async def execute_octobot(task: Task):
with encrypted_task(task):
if task.type == TaskType.EXECUTE_ACTIONS.value:
# TODO start_octobot with actions
print(f"Executing actions with content: {task.content}...")
print(f"Executing actions with content: {task.content} ...")
result: octobot_lib.OctoBotActionsJobResult = await octobot_lib.OctoBotActionsJob(
task.content
).run()
task.result = {
"state": {
"orders": [], # WIP
"orders": result.get_created_orders()
}
}
if result.next_actions_description:
_reshedule_octobot_execution(task, result.next_actions_description)
else:
raise ValueError(f"Invalid task type: {task.type}")
return {
Expand Down
12 changes: 6 additions & 6 deletions tests/scheduler/task_import/test-tasks.csv
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated this file with a few required params

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name,content,type,actions,order_type,order_price,order_symbol,order_amount,order_leverage,exchange_from,exchange_to,blockchain_asset,blockchain_amount,blockchain_source_address,blockchain_destination_address,blockchain_from,blockchain_to
"Deposit 1 Bitcoin","","execute_actions","deposit","","","","","","","binance","BTC",1,"","","Bitcoin",""
"Trade 1 ETH vs Bitcoin","","execute_actions","trade","market_order","","ETH/BTC",1,"","","","","","","","",""
"Open long position on Binance","","execute_actions","trade","limit",50000,"","",10,"","","binance","","","","",""
"Buy YES to $150k what-price-will-bitcoin-hit-in-january-2026 on polymarket","","execute_actions","trade","market_order","","what-price-will-bitcoin-hit-in-january-2026/USDC:USDC-260131-0-YES","","","","","polymarket","","","","",""
"Decentralized trading example","EXCHANGE_TO=Binance;BLOCKCHAIN_ASSET=BTC;BLOCKCHAIN_AMOUNT=1;BLOCKCHAIN_FROM=Bitcoin;ORDER_SYMBOL=ETH/BTC;ORDER_AMOUNT=1;ORDER_TYPE=market;EXCHANGE_FROM=Binance;BLOCKCHAIN_TO=Ethereum;BLOCKCHAIN_ASSET=ETH;BLOCKCHAIN_AMOUNT=1","execute_actions","deposit,wait,trade,wait,withdraw","","","","","","","","","","","",""
"name","content","type","actions","order_type","order_side","order_price","order_symbol","order_amount","order_leverage","exchange_from","exchange_to","blockchain_from_asset","blockchain_from_amount","blockchain_to_address","blockchain_to_address","blockchain_from","blockchain_to"
"Deposit 1 Bitcoin",,"execute_actions","deposit",,,,,,,,"binance","BTC",1,,,"SIMULATED",
"Trade 1 ETH vs Bitcoin",,"execute_actions","trade","market_order","sell",,"ETH/BTC",1,,"binance",,,,,,,
"Open long position on Binance",,"execute_actions","trade","limit",,50000,,,10,,"binance",,,,,,
"Buy YES to $150k what-price-will-bitcoin-hit-in-january-2026 on polymarket",,"execute_actions","trade","market_order",,,"what-price-will-bitcoin-hit-in-january-2026/USDC:USDC-260131-0-YES",,,,,"polymarket",,,,,
"Decentralized trading example","EXCHANGE_TO=binance;BLOCKCHAIN_FROM_ASSET=BTC;BLOCKCHAIN_FROM_AMOUNT=1;BLOCKCHAIN_FROM=SIMULATED;ORDER_SYMBOL=ETH/BTC;ORDER_AMOUNT=1;ORDER_TYPE=market;EXCHANGE_FROM=binance;BLOCKCHAIN_TO=Ethereum;BLOCKCHAIN_TO_ASSET=ETH;BLOCKCHAIN_TO_AMOUNT=1;BLOCKCHAIN_TO_ADDRESS=0x123456","execute_actions","deposit,wait,trade,wait,withdraw",,,,,,,,,,,,,,
Loading