Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions temporalio/contrib/langgraph/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,46 @@ await g.ainvoke({...}, context=Context(user_id="alice"))

Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary.

## Summaries

Summaries are short, human-readable labels that show up in the Temporal UI and CLI, making it easier to see what each step of a run is doing.

### Static summary

`summary` is an ordinary Activity option, so a fixed per-node label works today — pass it like any other option:

```python
g.add_node("plan", plan, metadata={"execute_in": "activity", "summary": "Planning step"})
```

It is attached to the node's scheduled-activity event (`execute_in="activity"` only).

### Dynamic summary (`summary_fn`)

To derive the label from the node's input at runtime, supply a `summary_fn`. It receives the node's `(args, kwargs)` and returns a summary string, or `None`/`""` for no summary. For a `StateGraph` node `args[0]` is the state; for a Functional `@task` it is the task's arguments.

```python
def summarize(args, kwargs) -> str | None:
state = args[0]
return f"stage={state['stage']} doc={state['doc_id']}"

# Graph API: per-node
g.add_node("plan", plan, metadata={"execute_in": "activity", "summary_fn": summarize})

# Functional API: per-task
plugin = LangGraphPlugin(
Comment thread
DABH marked this conversation as resolved.
tasks=[plan],
activity_options={"plan": {"execute_in": "activity", "summary_fn": summarize}},
)
```

`summary_fn` is set per node/task (like the static `summary`), so different nodes — which receive different inputs — can compute their summaries independently.

- For `execute_in="activity"` nodes the result sets the activity `summary` (one per scheduled-activity event, visible in history).
- For `execute_in="workflow"` nodes there is no activity, so the result updates the workflow's current details via [`workflow.set_current_details()`](https://python.temporal.io/temporalio.workflow.html#set_current_details). This is a single workflow-level slot (last-writer-wins) reflecting the most recent workflow-bound node that defines a `summary_fn`; a `None`/`""` result clears it. It is queryable via `__temporal_workflow_metadata`.

`summary_fn` runs in workflow context on every replay, so it **must be deterministic and must not raise** (an exception fails the workflow task). Setting both a static `summary` and a `summary_fn` on the same node raises `ValueError`.

## Streaming

When `streaming_topic` is set on `LangGraphPlugin`, calls to `langgraph.config.get_stream_writer()` inside a node publish to the named topic on the workflow's [`WorkflowStream`](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams). Activity-side nodes publish via `WorkflowStreamClient` (a signal carrying batched items, controlled by `streaming_batch_interval`); workflow-side nodes publish synchronously to the in-workflow stream (no signal). External subscribers consume the stream with `WorkflowStreamClient.create(...).topic(...).subscribe(...)`.
Expand Down
13 changes: 10 additions & 3 deletions temporalio/contrib/langgraph/_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def thread_safe_writer(value: Any) -> None:
def wrap_execute_activity(
afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]],
task_id: str = "",
summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None] | None = None,
**execute_activity_kwargs: Any,
) -> Callable[..., Any]:
"""Wrap an activity function to be called via workflow.execute_activity with caching."""
Expand Down Expand Up @@ -156,9 +157,15 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
input = ActivityInput(
args=args, kwargs=kwargs, langgraph_config=langgraph_config
)
output = await workflow.execute_activity(
afunc, input, **execute_activity_kwargs
)
# Compute a dynamic activity summary (if configured) on the schedule
# path only; a cache hit above returns before reaching here, so no
# activity is scheduled and no summary is needed.
call_kwargs = dict(execute_activity_kwargs)
if summary_fn is not None:
summary = summary_fn(args, kwargs)
if summary:
call_kwargs["summary"] = summary
output = await workflow.execute_activity(afunc, input, **call_kwargs)
if output.langgraph_interrupts is not None:
raise GraphInterrupt(output.langgraph_interrupts)

Expand Down
39 changes: 35 additions & 4 deletions temporalio/contrib/langgraph/_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@
_ACTIVITY_OPTION_KEYS: frozenset[str] = frozenset(
{"execute_in", *inspect.signature(workflow.execute_activity).parameters}
)
# Node/task option keys beyond the raw execute_activity parameters:
# 'summary_fn' is a callable consumed in the workflow (not a Temporal
# option), so it must be split out of Graph API metadata too.
_LANGGRAPH_OPTION_KEYS: frozenset[str] = _ACTIVITY_OPTION_KEYS | frozenset(
{"summary_fn"}
)


def _constant_summary_fn(
value: str,
) -> Callable[[tuple[Any, ...], dict[str, Any]], str]:
"""Adapt a static summary string to the summary_fn interface."""
return lambda args, kwargs: value


class LangGraphPlugin(SimplePlugin):
Expand Down Expand Up @@ -168,12 +181,14 @@ def __init__(
# the node function via config["metadata"].
node_meta = node.metadata or {}
node_opts = {
k: v for k, v in node_meta.items() if k in _ACTIVITY_OPTION_KEYS
k: v
for k, v in node_meta.items()
if k in _LANGGRAPH_OPTION_KEYS
}
node.metadata = {
k: v
for k, v in node_meta.items()
if k not in _ACTIVITY_OPTION_KEYS
if k not in _LANGGRAPH_OPTION_KEYS
}
if "execute_in" not in node_opts:
raise ValueError(
Expand Down Expand Up @@ -253,6 +268,18 @@ def execute(
"""Prepare a node or task to execute as an activity or inline in the workflow."""
opts = kwargs or {}
execute_in = opts.pop("execute_in")
# Normalize the node's summary to a single summary_fn. Both keys are
# popped so neither reaches workflow.execute_activity (which takes no
# summary_fn, and would get a duplicate summary kwarg); a static
# summary becomes a summary_fn that ignores its input.
summary_fn = opts.pop("summary_fn", None)
static_summary = opts.pop("summary", None)
if summary_fn is not None and static_summary is not None:
raise ValueError(
f"{activity_name}: set either 'summary' or 'summary_fn', not both."
)
if static_summary is not None:
summary_fn = _constant_summary_fn(static_summary)

if execute_in == "activity":
wrapped = wrap_activity(
Expand All @@ -262,9 +289,13 @@ def execute(
)
a = activity.defn(name=activity_name)(wrapped)
self.activities.append(a)
return wrap_execute_activity(a, task_id=task_id(func), **opts)
return wrap_execute_activity(
a, task_id=task_id(func), summary_fn=summary_fn, **opts
)
elif execute_in == "workflow":
return wrap_workflow(func, streaming_topic=self._streaming_topic)
return wrap_workflow(
func, streaming_topic=self._streaming_topic, summary_fn=summary_fn
)
else:
raise ValueError(f"Invalid execute_in value: {execute_in}")

Expand Down
11 changes: 11 additions & 0 deletions temporalio/contrib/langgraph/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def wrap_workflow(
func: Callable[..., Any],
*,
streaming_topic: str | None = None,
summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None] | None = None,
) -> Callable[..., Awaitable[Any]]:
"""Wrap a function as a workflow-side LangGraph node.

Expand All @@ -28,9 +29,19 @@ def wrap_workflow(
function with the writer installed. Workflow-side nodes publish
synchronously to the in-workflow ``WorkflowStream`` (no signal
round-trip); activity-side nodes go through ``WorkflowStreamClient``.

Workflow-side nodes have no activity to carry a summary, so a
``summary_fn`` result updates the workflow's current details via
:func:`temporalio.workflow.set_current_details` (last-writer-wins);
an empty result clears it.
"""

async def wrapper(*args: Any, **kwargs: Any) -> Any:
if summary_fn is not None:
# Always write (clearing when empty) so this node never shows a
# stale summary left by an earlier workflow-bound node.
workflow.set_current_details(summary_fn(args, kwargs) or "")

async def run(stream_writer: Callable[[Any], None] | None) -> Any:
token = None
if stream_writer is not None:
Expand Down
Loading
Loading