Skip to content

Commit f06e201

Browse files
anshbansaltreff7es
authored andcommitted
fix(ingest): serialisation of structured report (#14973)
1 parent bcf1065 commit f06e201

File tree

2 files changed

+19
-21
lines changed

2 files changed

+19
-21
lines changed

metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from dataclasses import dataclass, field
55
from datetime import datetime, timezone
66
from enum import Enum
7-
from typing import Tuple
87

98
from datahub.utilities.perf_timer import PerfTimer
109
from datahub.utilities.stats_collections import TopKDict
@@ -38,9 +37,7 @@ class IngestionStageReport:
3837
ingestion_high_stage_seconds: dict[IngestionHighStage, float] = field(
3938
default_factory=lambda: defaultdict(float)
4039
)
41-
ingestion_stage_durations: TopKDict[Tuple[IngestionHighStage, str], float] = field(
42-
default_factory=TopKDict
43-
)
40+
ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict)
4441

4542
def new_stage(
4643
self, stage: str, high_stage: IngestionHighStage = IngestionHighStage._UNDEFINED
@@ -81,9 +78,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
8178
f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds",
8279
stacklevel=2,
8380
)
84-
self._report.ingestion_stage_durations[
85-
(self._high_stage, self._ingestion_stage)
86-
] = elapsed
81+
# Store tuple as string to avoid serialization errors
82+
key = f"({self._high_stage.value}, {self._ingestion_stage})"
83+
self._report.ingestion_stage_durations[key] = elapsed
8784
else:
8885
logger.info(
8986
f"Time spent in stage <{self._high_stage.value}>: {elapsed} seconds",

metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ def test_ingestion_stage_context_records_duration():
1414
pass
1515
assert len(report.ingestion_stage_durations) == 1
1616
key = next(iter(report.ingestion_stage_durations.keys()))
17-
assert key[0] == IngestionHighStage._UNDEFINED
18-
assert "Test Stage" in key[1]
17+
assert "Ingestion" in key
18+
assert "Test Stage" in key
1919

2020

2121
def test_ingestion_stage_context_handles_exceptions():
@@ -27,7 +27,8 @@ def test_ingestion_stage_context_handles_exceptions():
2727
pass
2828
assert len(report.ingestion_stage_durations) == 1
2929
key = next(iter(report.ingestion_stage_durations.keys()))
30-
assert "Test Stage" in key[1]
30+
assert "Ingestion" in key
31+
assert "Test Stage" in key
3132

3233

3334
def test_ingestion_stage_context_report_handles_multiple_stages():
@@ -45,9 +46,9 @@ def test_ingestion_stage_context_report_handles_multiple_stages():
4546
)
4647

4748
sorted_stages = list(sorted(report.ingestion_stage_durations.keys()))
48-
assert "Test Stage 1" in sorted_stages[0][1]
49-
assert "Test Stage 2" in sorted_stages[1][1]
50-
assert "Test Stage 3" in sorted_stages[2][1]
49+
assert "Test Stage 1" in sorted_stages[0]
50+
assert "Test Stage 2" in sorted_stages[1]
51+
assert "Test Stage 3" in sorted_stages[2]
5152

5253

5354
def test_ingestion_stage_context_report_handles_nested_stages():
@@ -64,14 +65,14 @@ def test_ingestion_stage_context_report_handles_nested_stages():
6465
for duration in report.ingestion_stage_durations.values()
6566
)
6667
sorted_stages = list(sorted(report.ingestion_stage_durations.keys()))
67-
assert "Inner1" in sorted_stages[0][1]
68-
assert "Inner2" in sorted_stages[1][1]
69-
assert "Outer" in sorted_stages[2][1]
68+
assert "Inner1" in sorted_stages[0]
69+
assert "Inner2" in sorted_stages[1]
70+
assert "Outer" in sorted_stages[2]
7071

7172
# Check that outer stage duration >= sum of inner stage durations
72-
outer_key = [k for k in report.ingestion_stage_durations if "Outer" in k[1]][0]
73-
inner1_key = [k for k in report.ingestion_stage_durations if "Inner1" in k[1]][0]
74-
inner2_key = [k for k in report.ingestion_stage_durations if "Inner2" in k[1]][0]
73+
outer_key = [k for k in report.ingestion_stage_durations if "Outer" in k][0]
74+
inner1_key = [k for k in report.ingestion_stage_durations if "Inner1" in k][0]
75+
inner2_key = [k for k in report.ingestion_stage_durations if "Inner2" in k][0]
7576

7677
outer_duration = report.ingestion_stage_durations[outer_key]
7778
inner1_duration = report.ingestion_stage_durations[inner1_key]
@@ -96,6 +97,6 @@ def test_ingestion_stage_with_high_stage():
9697
time.sleep(0.1)
9798
assert len(report.ingestion_stage_durations) == 1
9899
key = next(iter(report.ingestion_stage_durations.keys()))
99-
assert key[0] == IngestionHighStage.PROFILING
100-
assert "Test Stage" in key[1]
100+
assert "Profiling" in key
101+
assert "Test Stage" in key
101102
assert report.ingestion_high_stage_seconds[IngestionHighStage.PROFILING] > 0

0 commit comments

Comments
 (0)