Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ default_stages: [pre-commit, pre-merge-commit]
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.15.0
rev: v0.15.12
hooks:
# Run the linter.
- id: ruff-check
Expand Down
4 changes: 1 addition & 3 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1836,9 +1836,7 @@ def with_adapters(self, *adapters: lifecycle_base.LifecycleAdapter) -> Self:
self.adapters.extend(adapters)
return self

def with_materializers(
self, *materializers: ExtractorFactory | MaterializerFactory
) -> Self:
def with_materializers(self, *materializers: ExtractorFactory | MaterializerFactory) -> Self:
"""Add materializer nodes to the `Driver`
The generated nodes can be referenced by name in `.execute()`

Expand Down
15 changes: 7 additions & 8 deletions hamilton/plugins/h_ddog.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@

logger = logging.getLogger(__name__)
try:
# TODO: this works for ddtrace < 3.0; Span was deprecated in 3.0
# See https://github.com/DataDog/dd-trace-py/pull/12186
from ddtrace import Span, context, tracer
from ddtrace import tracer
from ddtrace.trace import Context, Span
except ImportError as e:
logger.error("ImportError: %s", e)
logger.error(
"To use the h_ddog plugin, please install sf-hamilton[datadog] using "
"`pip install sf-hamilton[datadog]` (or use your favorite package manager)."
"To use the h_ddog plugin, please install apache-hamilton[datadog] using "
"`pip install apache-hamilton[datadog]` (or use your favorite package manager). "
"Remember to use quotes around the package name if using zsh!"
)
raise
Expand Down Expand Up @@ -75,14 +74,14 @@ def _serialize_span_dict(span_dict: dict[str, Span]):
}

@staticmethod
def _deserialize_span_dict(serialized_repr: dict[str, dict]) -> dict[str, context.Context]:
def _deserialize_span_dict(serialized_repr: dict[str, dict]) -> dict[str, Context]:
"""Note that we deserialize as contexts, as passing spans is not supported
(the child should never terminate the parent span).

:param span_dict: Dict of str -> dict params for contexts
:return: A dictionary of contexts
"""
return {key: context.Context(**params) for key, params in serialized_repr.items()}
return {key: Context(**params) for key, params in serialized_repr.items()}

def __getstate__(self):
"""Gets the state for serialization"""
Expand Down Expand Up @@ -293,7 +292,7 @@ class DDOGTracer(
This tracer bypasses context management so we can more accurately track relationships between nodes/tags. Also, we plan to
get this working with OpenTelemetry, and use that for datadog integration.

To use this, you'll want to run `pip install sf-hamilton[ddog]` (or `pip install "sf-hamilton[ddog]"` if using zsh)
To use this, you'll want to run `pip install apache-hamilton[datadog]` (or `pip install "apache-hamilton[datadog]"` if using zsh)
"""

def __init__(self, root_name: str, include_causal_links: bool = False, service: str = None):
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ dask-array = ["dask[array]"]
dask-dataframe = ["dask[dataframe]"]
dask-diagnostics = ["dask[diagnostics]"]
dask-distributed = ["dask[distributed]"]
datadog = ["ddtrace<3.0"] # Temporary pin until h_ddog.py import is fixed for >3.0 version
datadog = ["ddtrace>=4.0,<5.0"]
diskcache = ["diskcache"]
experiments = [
"fastapi",
Expand Down Expand Up @@ -140,7 +140,7 @@ docs = [
"commonmark",
"dask-expr>=1.1.14",
"dask[distributed]",
"ddtrace<3.0",
"ddtrace>=4.0,<5",
"diskcache",
# required for all the plugins
"dlt",
Expand Down
152 changes: 152 additions & 0 deletions tests/plugins/test_h_ddog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from unittest.mock import MagicMock, patch

import pytest

pytest.importorskip("ddtrace")

from hamilton.plugins.h_ddog import AsyncDDOGTracer, DDOGTracer


@pytest.fixture()
def mock_tracer():
with patch("hamilton.plugins.h_ddog.tracer") as t:
mock_span = MagicMock()
mock_span.context = MagicMock(trace_id=1, span_id=2)
mock_span.__exit__ = MagicMock(return_value=False)
t.start_span.return_value = mock_span
t.current_trace_context.return_value = None
yield t


class TestDDOGTracerLifecycle:
def test_span_lifecycle(self, mock_tracer):
tracer_inst = DDOGTracer(root_name="test-root", service="test-svc")
run_id = "run-1"

tracer_inst.run_before_graph_execution(run_id=run_id)
assert mock_tracer.start_span.call_count == 1

tracer_inst.run_before_node_execution(
node_name="node_a",
node_kwargs={},
node_tags={"module": "test"},
task_id=None,
run_id=run_id,
)
assert mock_tracer.start_span.call_count == 2

tracer_inst.run_after_node_execution(
node_name="node_a", error=None, task_id=None, run_id=run_id
)
node_span = mock_tracer.start_span.return_value
node_span.__exit__.assert_called_with(None, None, None)

tracer_inst.run_after_graph_execution(error=None, run_id=run_id)

def test_span_lifecycle_with_error(self, mock_tracer):
tracer_inst = DDOGTracer(root_name="test-root")
run_id = "run-err"

tracer_inst.run_before_graph_execution(run_id=run_id)
tracer_inst.run_before_node_execution(
node_name="node_b",
node_kwargs={},
node_tags={},
task_id=None,
run_id=run_id,
)

err = ValueError("boom")
tracer_inst.run_after_node_execution(
node_name="node_b", error=err, task_id=None, run_id=run_id
)
node_span = mock_tracer.start_span.return_value
node_span.__exit__.assert_called_with(ValueError, err, err.__traceback__)

tracer_inst.run_after_graph_execution(error=err, run_id=run_id)

def test_task_span_lifecycle(self, mock_tracer):
tracer_inst = DDOGTracer(root_name="test-root")
run_id = "run-task"

tracer_inst.run_before_graph_execution(run_id=run_id)
tracer_inst.run_before_task_execution(task_id="task-1", run_id=run_id)
tracer_inst.run_before_node_execution(
node_name="node_c",
node_kwargs={},
node_tags={},
task_id="task-1",
run_id=run_id,
)
tracer_inst.run_after_node_execution(
node_name="node_c", error=None, task_id="task-1", run_id=run_id
)
tracer_inst.run_after_task_execution(task_id="task-1", run_id=run_id, error=None)
tracer_inst.run_after_graph_execution(error=None, run_id=run_id)


class TestAsyncDDOGTracer:
def test_instantiation(self):
inst = AsyncDDOGTracer(root_name="async-root", service="async-svc")
assert inst._impl.root_name == "async-root"
assert inst._impl.service == "async-svc"

def test_instantiation_with_causal_links(self):
inst = AsyncDDOGTracer(root_name="async-root", include_causal_links=True, service=None)
assert inst._impl.include_causal_links is True


class TestSerialization:
def test_getstate_setstate_round_trip(self, mock_tracer):
tracer_inst = DDOGTracer(root_name="ser-root", service="ser-svc")
run_id = "run-ser"

tracer_inst.run_before_graph_execution(run_id=run_id)

state = tracer_inst._impl.__getstate__()
assert "root_trace_name" in state
assert state["root_trace_name"] == "ser-root"
assert state["service"] == "ser-svc"

new_impl = object.__new__(type(tracer_inst._impl))
new_impl.__setstate__(state)
assert new_impl.root_name == "ser-root"
assert new_impl.service == "ser-svc"
assert new_impl.include_causal_links is False
assert isinstance(new_impl.run_span_cache, dict)

tracer_inst.run_after_graph_execution(error=None, run_id=run_id)

def test_serialize_deserialize_span_dict(self, mock_tracer):
from ddtrace.trace import Context, Span

from hamilton.plugins.h_ddog import _DDOGTracerImpl

mock_span = MagicMock(spec=Span)
mock_span.context = MagicMock(trace_id=123, span_id=456)
span_dict = {"key1": mock_span}

serialized = _DDOGTracerImpl._serialize_span_dict(span_dict)
assert serialized["key1"]["trace_id"] == 123
assert serialized["key1"]["span_id"] == 456

deserialized = _DDOGTracerImpl._deserialize_span_dict(serialized)
assert "key1" in deserialized
assert isinstance(deserialized["key1"], Context)
4 changes: 2 additions & 2 deletions ui/sdk/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
# Ruff version.
rev: v0.15.0
rev: v0.15.12
hooks:
- id: ruff-check
args: [ --fix , --exit-non-zero-on-fix ]
- repo: https://github.com/ambv/black
rev: 26.1.0
rev: 26.3.1
hooks:
- id: black
args: [--line-length=100]
Expand Down
2 changes: 1 addition & 1 deletion ui/sdk/ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@
line-length = 100


[per-file-ignores]
[lint.per-file-ignores]
# skip line length for API client files.
"src/dagworks/api/*" = ["E501"]
Loading