Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow dynamic resolving within the graph #95

Merged
merged 19 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions tests/test_dynamic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import dataclasses

import znflow


@dataclasses.dataclass
class AddOne(znflow.Node):
inputs: int
outputs: int = None

def run(self):
if self.outputs is not None:
raise ValueError("Node has already been run")
self.outputs = self.inputs + 1


def test_break_loop():
"""Test loop breaking when output exceeds 5."""
graph = znflow.DiGraph()
with graph:
node1 = AddOne(inputs=1)
for _ in range(10):
node1 = AddOne(inputs=node1.outputs)
if znflow.resolve(node1.outputs) > 5:
break

graph.run()

# Assert the correct number of nodes in the graph
assert len(graph) == 5

# Assert the final output value
assert node1.outputs == 6


def test_break_loop_multiple():
"""Test loop breaking with multiple nodes and different conditions."""
graph = znflow.DiGraph()
with graph:
node1 = AddOne(inputs=1)
node2 = AddOne(inputs=node1.outputs) # Add another node in the loop

for _ in range(10):
node1 = AddOne(inputs=node1.outputs)
node2 = AddOne(inputs=node2.outputs)

# Break if either node's output exceeds 5 or both reach 3
if (
znflow.resolve(node1.outputs) > 5
or znflow.resolve(node2.outputs) > 5
or znflow.resolve(node1.outputs) == 3
and znflow.resolve(node2.outputs) == 3
):
break

graph.run()

# Assert the correct number of nodes in the graph
assert len(graph) <= 10 # Maximum number of iterations allowed

# Assert that at least one node's output exceeds 5 or both reach 3
assert (
znflow.resolve(node1.outputs) > 5
or znflow.resolve(node2.outputs) > 5
or znflow.resolve(node1.outputs) == 3
and znflow.resolve(node2.outputs) == 3
)


def test_resolvce_only_run_relevant_nodes():
"""Test that when using resolve only nodes that are direct predecessors are run."""
# Check by asserting None to the output of the second node
graph = znflow.DiGraph()
with graph:
node1 = AddOne(inputs=1)
node2 = AddOne(inputs=1234)
for _ in range(10):
node1 = AddOne(inputs=node1.outputs)
if znflow.resolve(node1.outputs) > 5:
break

# this has to be executed, because of the resolve
assert node1.outputs == 6

# this should not be executed, because it is not relevant to the resolve
assert node2.outputs is None

graph.run()
assert node2.outputs == 1235
assert node1.outputs == 6


def test_connections_remain():
graph = znflow.DiGraph()
with graph:
node1 = AddOne(inputs=1)
result = znflow.resolve(node1.outputs)
assert isinstance(result, int)
assert isinstance(node1.outputs, znflow.Connection)
40 changes: 40 additions & 0 deletions tests/test_late_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import dataclasses

import znflow


@dataclasses.dataclass
class AddOne(znflow.Node):
inputs: int
outputs: int = None

def run(self):
self.outputs = self.inputs + 1


def test_update_after_exit():
graph = znflow.DiGraph()
with graph:
node1 = AddOne(inputs=1)

node1.inputs = 2
graph.run(immutable_nodes=False)
assert node1.outputs == 3

node1.inputs = 3
graph.run(immutable_nodes=False)
assert node1.outputs == 4


def test_update_after_exit_immutable():
graph = znflow.DiGraph()
with graph:
node1 = AddOne(inputs=1)

node1.inputs = 2
graph.run()
assert node1.outputs == 3

node1.inputs = 3
graph.run()
assert node1.outputs == 3
2 changes: 2 additions & 0 deletions znflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_graph,
)
from znflow.combine import combine
from znflow.dynamic import resolve
from znflow.graph import DiGraph
from znflow.node import Node, nodify
from znflow.visualize import draw
Expand All @@ -37,6 +38,7 @@
"exceptions",
"get_graph",
"empty_graph",
"resolve",
]

with contextlib.suppress(ImportError):
Expand Down
5 changes: 4 additions & 1 deletion znflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from znflow import exceptions

if typing.TYPE_CHECKING:
from znflow.graph import DiGraph


@contextlib.contextmanager
def disable_graph(*args, **kwargs):
Expand Down Expand Up @@ -126,7 +129,7 @@ def run(self):
raise NotImplementedError


def get_graph():
def get_graph() -> DiGraph:
return NodeBaseMixin._graph_


Expand Down
41 changes: 41 additions & 0 deletions znflow/dynamic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import typing as t

from znflow.base import Connection, disable_graph, get_graph


def resolve(value: t.Union[Connection, t.Any], immutable_nodes: bool = True) -> t.Any:
"""Resolve a Connection to its actual value.

Allows dynamic resolution of connections to their actual values
within a graph context. This will run all Nodes up to this connection.

Attributes
----------
value : Connection
The connection to resolve.
immutable_nodes : bool
If True, the nodes are assumed to be immutable and
will not be rerun. If you change the inputs of a node
after it has been run, the outputs will not be updated.

Returns
-------
t.Any
The actual value of the connection.

"""
# TODO: support nodify as well
if not isinstance(value, (Connection)):
return value
# get the actual value
with disable_graph():
result = value.result
if result is not None:
return result
# we assume, that if the result is None, the node has not been run yet
graph = get_graph()

with disable_graph():
graph.run(nodes=[value.instance], immutable_nodes=immutable_nodes)
result = value.result
return result
57 changes: 50 additions & 7 deletions znflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,56 @@ def get_sorted_nodes(self):
all_pipelines += nx.dfs_postorder_nodes(reverse, stage)
return list(dict.fromkeys(all_pipelines)) # remove duplicates but keep order

def run(self):
for node_uuid in self.get_sorted_nodes():
node = self.nodes[node_uuid]["value"]
if not node._external_:
# update connectors
self._update_node_attributes(node, handler.UpdateConnectors())
node.run()
def run(
self,
nodes: typing.Optional[typing.List[NodeBaseMixin]] = None,
immutable_nodes: bool = True,
PythonFZ marked this conversation as resolved.
Show resolved Hide resolved
):
"""Run the graph.

Attributes
----------
nodes : list[Node]
The nodes to run. If None, all nodes are run.
immutable_nodes : bool
If True, the nodes are assumed to be immutable and
will not be rerun. If you change the inputs of a node
after it has been run, the outputs will not be updated.
"""
if nodes is not None:
for node_uuid in self.reverse():
if immutable_nodes and self.nodes[node_uuid].get("available", False):
continue
node = self.nodes[node_uuid]["value"]
if node in nodes:
predecessors = list(self.predecessors(node.uuid))
for predecessor in predecessors:
predecessor_node = self.nodes[predecessor]["value"]
if immutable_nodes and self.nodes[predecessor].get(
"available", False
):
continue
self._update_node_attributes(
predecessor_node, handler.UpdateConnectors()
)
predecessor_node.run()
if immutable_nodes:
self.nodes[predecessor]["available"] = True
self._update_node_attributes(node, handler.UpdateConnectors())
node.run()
if immutable_nodes:
self.nodes[node_uuid]["available"] = True
else:
for node_uuid in self.get_sorted_nodes():
if immutable_nodes and self.nodes[node_uuid].get("available", False):
continue
node = self.nodes[node_uuid]["value"]
if not node._external_:
# update connectors
self._update_node_attributes(node, handler.UpdateConnectors())
node.run()
if immutable_nodes:
self.nodes[node_uuid]["available"] = True

def write_graph(self, *args):
for node in args:
Expand Down
Loading