You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
🐛 Describe the bug
When running non-linear (fan-out) workflows, we encountered an error where a duplicate task ID is handled incorrectly. The issue occurs in the method _add_to_open_tasks() in the DurableOrchestrationContext. The first time an AtomicTask with a new task ID is encountered, it is stored in the open_tasks dictionary as a single AtomicTask object. If the same task ID is later encountered, the code attempts to call .append() on that object, expecting it to be a list which results in an AttributeError.
AttributeError: 'AtomicTask' object has no attribute 'append'
The crash occurs in the _add_to_open_tasks() function when attempting to call .append() on the task object stored in the dictionary.
def _add_to_open_tasks(self, task: TaskBase):
if task._is_scheduled:
return
if isinstance(task, AtomicTask):
if task.id is None:
task.id = self._sequence_number
self._sequence_number += 1
self.open_tasks[task.id] = task # <-- Bug: stored as single object instead of a list
elif task.id != -1:
self.open_tasks[task.id].append(task) # <-- Error occurs here
if task.id in self.deferred_tasks:
task_update_action = self.deferred_tasks[task.id]
task_update_action()
else:
for child in task.children:
self._add_to_open_tasks(child)
🤔 Expected behavior
The workflow should complete successfully without crashing. A temporary workaround in our environment has been to force the dictionary values to be lists by storing [task] instead of task.
☕ Steps to reproduce
Create an orchestration that uses a fan-out pattern (with task_any) to run multiple tasks in parallel. Ran the workflow using the latest version in local environment.
The text was updated successfully, but these errors were encountered:
🐛 Describe the bug
When running non-linear (fan-out) workflows, we encountered an error where a duplicate task ID is handled incorrectly. The issue occurs in the method _add_to_open_tasks() in the DurableOrchestrationContext. The first time an AtomicTask with a new task ID is encountered, it is stored in the open_tasks dictionary as a single AtomicTask object. If the same task ID is later encountered, the code attempts to call .append() on that object, expecting it to be a list which results in an AttributeError.
The crash occurs in the _add_to_open_tasks() function when attempting to call .append() on the task object stored in the dictionary.
🤔 Expected behavior
The workflow should complete successfully without crashing. A temporary workaround in our environment has been to force the dictionary values to be lists by storing [task] instead of task.
☕ Steps to reproduce
Create an orchestration that uses a fan-out pattern (with task_any) to run multiple tasks in parallel. Ran the workflow using the latest version in local environment.
The text was updated successfully, but these errors were encountered: