Skip to content

Commit fdc6934

Browse files
authored
Merge branch 'master' into feat(frontend)_fetch_groups_ldap_authentication
2 parents c431e10 + c092e91 commit fdc6934

File tree

14 files changed

+498
-59
lines changed

14 files changed

+498
-59
lines changed

datahub-actions/src/datahub_actions/api/action_graph.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import json
1616
import logging
17+
import time
1718
import urllib.parse
1819
from dataclasses import dataclass
1920
from typing import Any, Dict, List, Optional
@@ -22,6 +23,7 @@
2223
from datahub.ingestion.graph.client import DataHubGraph
2324
from datahub.metadata.schema_classes import (
2425
GlossaryTermAssociationClass,
26+
MetadataAttributionClass,
2527
TagAssociationClass,
2628
)
2729
from datahub.specific.dataset import DatasetPatchBuilder
@@ -250,20 +252,57 @@ def check_relationship(self, entity_urn, target_urn, relationship_type):
250252
return target_urn in entities
251253
return False
252254

255+
def _create_attribution_from_context(
256+
self, context: Optional[Dict]
257+
) -> Optional[MetadataAttributionClass]:
258+
"""Create MetadataAttributionClass from context if action source is present."""
259+
if not context:
260+
return None
261+
262+
# Extract action source from context if present
263+
action_source = context.get("propagation_source") or context.get("source")
264+
if not action_source:
265+
return None
266+
267+
return MetadataAttributionClass(
268+
source=action_source,
269+
time=int(time.time() * 1000.0),
270+
actor=context.get("actor", "urn:li:corpuser:__datahub_system"),
271+
sourceDetail=context,
272+
)
273+
253274
def add_tags_to_dataset(
254275
self,
255276
entity_urn: str,
256277
dataset_tags: List[str],
257278
field_tags: Optional[Dict] = None,
258279
context: Optional[Dict] = None,
280+
action_urn: Optional[str] = None,
259281
) -> None:
260282
if field_tags is None:
261283
field_tags = {}
284+
285+
# Create attribution - prefer action_urn parameter, fallback to context
286+
attribution = None
287+
if action_urn:
288+
attribution = MetadataAttributionClass(
289+
source=action_urn,
290+
time=int(time.time() * 1000.0),
291+
actor=context.get("actor", "urn:li:corpuser:__datahub_system")
292+
if context
293+
else "urn:li:corpuser:__datahub_system",
294+
sourceDetail=context if context else {},
295+
)
296+
else:
297+
attribution = self._create_attribution_from_context(context)
298+
262299
dataset = DatasetPatchBuilder(entity_urn)
263300
for t in dataset_tags:
264301
dataset.add_tag(
265302
tag=TagAssociationClass(
266-
tag=t, context=json.dumps(context) if context else None
303+
tag=t,
304+
context=json.dumps(context) if context else None,
305+
attribution=attribution,
267306
)
268307
)
269308

@@ -272,7 +311,9 @@ def add_tags_to_dataset(
272311
for tag in tags:
273312
field_builder.add_tag(
274313
tag=TagAssociationClass(
275-
tag=tag, context=json.dumps(context) if context else None
314+
tag=tag,
315+
context=json.dumps(context) if context else None,
316+
attribution=attribution,
276317
)
277318
)
278319

datahub-actions/src/datahub_actions/event/event_registry.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from datahub.metadata.schema_classes import (
1919
EntityChangeEventClass,
2020
MetadataChangeLogClass,
21+
RelationshipChangeEventClass,
2122
)
2223
from datahub_actions.event.event import Event
2324

@@ -80,14 +81,40 @@ def as_json(self) -> str:
8081
json_obj["parameters"] = self._inner_dict["__parameters_json"]
8182
return json.dumps(json_obj)
8283

84+
@property
85+
def safe_parameters(self) -> dict:
86+
return self.parameters or self.get("__parameters_json") or {} # type: ignore
87+
88+
89+
class RelationshipChangeEvent(RelationshipChangeEventClass, Event):
90+
@classmethod
91+
def from_class(
92+
cls, clazz: RelationshipChangeEventClass
93+
) -> "RelationshipChangeEvent":
94+
instance = cls._construct({})
95+
instance._restore_defaults()
96+
# Shallow map inner dictionaries.
97+
instance._inner_dict = clazz._inner_dict
98+
return instance
99+
100+
@classmethod
101+
def from_json(cls, json_str: str) -> "Event":
102+
json_obj = json.loads(json_str)
103+
return cls.from_class(cls.from_obj(json_obj))
104+
105+
def as_json(self) -> str:
106+
return json.dumps(self.to_obj())
107+
83108

84109
# Standard Event Types for easy reference.
85110
ENTITY_CHANGE_EVENT_V1_TYPE = "EntityChangeEvent_v1"
86111
METADATA_CHANGE_LOG_EVENT_V1_TYPE = "MetadataChangeLogEvent_v1"
112+
RELATIONSHIP_CHANGE_EVENT_V1_TYPE = "RelationshipChangeEvent_v1"
87113

88114
# Lightweight Event Registry
89115
event_registry = PluginRegistry[Event]()
90116

91117
# Register standard event library. Each type can be considered a separate "stream" / "topic"
92118
event_registry.register(METADATA_CHANGE_LOG_EVENT_V1_TYPE, MetadataChangeLogEvent)
93119
event_registry.register(ENTITY_CHANGE_EVENT_V1_TYPE, EntityChangeEvent)
120+
event_registry.register(RELATIONSHIP_CHANGE_EVENT_V1_TYPE, RelationshipChangeEvent)

datahub-actions/src/datahub_actions/plugin/source/acryl/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@
22
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME = "MetadataChangeLog_Versioned_v1"
33
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1"
44
ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent"
5+
RELATIONSHIP_CHANGE_EVENT_NAME = "relationshipChangeEvent"

datahub-actions/src/datahub_actions/plugin/source/acryl/datahub_cloud_event_source.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
from datahub_actions.event.event_registry import (
1515
ENTITY_CHANGE_EVENT_V1_TYPE,
1616
METADATA_CHANGE_LOG_EVENT_V1_TYPE,
17+
RELATIONSHIP_CHANGE_EVENT_V1_TYPE,
1718
EntityChangeEvent,
1819
MetadataChangeLogEvent,
20+
RelationshipChangeEvent,
1921
)
2022

2123
# May or may not need these.
@@ -25,6 +27,7 @@
2527
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
2628
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
2729
PLATFORM_EVENT_TOPIC_NAME,
30+
RELATIONSHIP_CHANGE_EVENT_NAME,
2831
)
2932
from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import (
3033
AckManager,
@@ -261,8 +264,11 @@ def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]:
261264
post_json_transform(value["payload"])
262265
)
263266
if ENTITY_CHANGE_EVENT_NAME == value["name"]:
264-
event = build_entity_change_event(payload)
265-
yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, {})
267+
ece = build_entity_change_event(payload)
268+
yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, ece, {})
269+
elif RELATIONSHIP_CHANGE_EVENT_NAME == value["name"]:
270+
rce = RelationshipChangeEvent.from_json(payload.get("value"))
271+
yield EventEnvelope(RELATIONSHIP_CHANGE_EVENT_V1_TYPE, rce, {})
266272

267273
@staticmethod
268274
def handle_mcl(msg: ExternalEvent) -> Iterable[EventEnvelope]:

datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
from datahub_actions.event.event_registry import (
3434
ENTITY_CHANGE_EVENT_V1_TYPE,
3535
METADATA_CHANGE_LOG_EVENT_V1_TYPE,
36+
RELATIONSHIP_CHANGE_EVENT_V1_TYPE,
3637
EntityChangeEvent,
3738
MetadataChangeLogEvent,
39+
RelationshipChangeEvent,
3840
)
3941

4042
# May or may not need these.
@@ -46,6 +48,7 @@
4648

4749

4850
ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent"
51+
RELATIONSHIP_CHANGE_EVENT_NAME = "relationshipChangeEvent"
4952
DEFAULT_TOPIC_ROUTES = {
5053
"mcl": "MetadataChangeLog_Versioned_v1",
5154
"mcl_timeseries": "MetadataChangeLog_Timeseries_v1",
@@ -216,9 +219,13 @@ def handle_pe(msg: Any) -> Iterable[EventEnvelope]:
216219
post_json_transform(value["payload"])
217220
)
218221
if ENTITY_CHANGE_EVENT_NAME == value["name"]:
219-
event = build_entity_change_event(payload)
222+
ece = build_entity_change_event(payload)
220223
kafka_meta = build_kafka_meta(msg)
221-
yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, kafka_meta)
224+
yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, ece, kafka_meta)
225+
elif RELATIONSHIP_CHANGE_EVENT_NAME == value["name"]:
226+
rce = RelationshipChangeEvent.from_json(payload.get("value"))
227+
kafka_meta = build_kafka_meta(msg)
228+
yield EventEnvelope(RELATIONSHIP_CHANGE_EVENT_V1_TYPE, rce, kafka_meta)
222229

223230
def close(self) -> None:
224231
if self.consumer:
Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,32 @@
11
"""Module for AWS MSK IAM authentication."""
22

33
import logging
4+
import os
45

5-
from aws_msk_iam_sasl_signer_python.msk_iam_sasl_signer import MSKAuthTokenProvider
6+
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
67

78
logger = logging.getLogger(__name__)
89

910

10-
def oauth_cb(oauth_config):
11+
def oauth_cb(oauth_config: dict) -> tuple[str, float]:
1112
"""
1213
OAuth callback function for AWS MSK IAM authentication.
1314
1415
This function is called by the Kafka client to generate the SASL/OAUTHBEARER token
1516
for authentication with AWS MSK using IAM.
1617
1718
Returns:
18-
tuple: (auth_token, expiry_time_seconds)
19+
tuple[str, float]: (auth_token, expiry_time_seconds)
1920
"""
2021
try:
21-
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token()
22+
region = (
23+
os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1"
24+
)
25+
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token(region=region)
2226
# Convert expiry from milliseconds to seconds as required by Kafka client
23-
return auth_token, expiry_ms / 1000
27+
return auth_token, float(expiry_ms) / 1000
2428
except Exception as e:
25-
logger.error(f"Error generating AWS MSK IAM authentication token: {e}")
29+
logger.error(
30+
f"Error generating AWS MSK IAM authentication token: {e}", exc_info=True
31+
)
2632
raise

datahub-actions/tests/unit/utils/test_kafka_msk_iam.py

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,60 +7,50 @@
77
import pytest
88

99
MODULE_UNDER_TEST = "datahub_actions.utils.kafka_msk_iam"
10-
VENDOR_MODULE = "aws_msk_iam_sasl_signer_python.msk_iam_sasl_signer"
10+
VENDOR_MODULE = "aws_msk_iam_sasl_signer"
1111

1212

13-
def ensure_fake_vendor(monkeypatch):
13+
def ensure_fake_vendor(monkeypatch: Any) -> Any:
1414
"""
1515
Ensure a fake MSKAuthTokenProvider is available at import path
16-
aws_msk_iam_sasl_signer_python.msk_iam_sasl_signer for environments
17-
where the vendor package is not installed.
16+
aws_msk_iam_sasl_signer for environments where the vendor package is not installed.
1817
Returns the fake module so tests can monkeypatch its behavior.
1918
"""
2019
# If already present (package installed), just return the real module
2120
if VENDOR_MODULE in sys.modules:
2221
return sys.modules[VENDOR_MODULE]
2322

24-
# Build parent package structure: aws_msk_iam_sasl_signer_python.msk_iam_sasl_signer
25-
parent_name = "aws_msk_iam_sasl_signer_python"
26-
if parent_name not in sys.modules:
27-
parent: Any = types.ModuleType(parent_name)
28-
parent.__path__ = [] # mark as package
29-
monkeypatch.setitem(sys.modules, parent_name, parent)
30-
else:
31-
parent = cast(Any, sys.modules[parent_name])
32-
23+
# Create a minimal fake module matching the direct import path
3324
fake_mod: Any = types.ModuleType(VENDOR_MODULE)
3425

3526
class MSKAuthTokenProvider:
3627
@staticmethod
37-
def generate_auth_token(): # will be monkeypatched per test
28+
def generate_auth_token(
29+
region: str | None = None,
30+
) -> None: # will be monkeypatched per test
3831
raise NotImplementedError
3932

4033
fake_mod.MSKAuthTokenProvider = MSKAuthTokenProvider
4134
monkeypatch.setitem(sys.modules, VENDOR_MODULE, fake_mod)
4235

43-
# Also ensure attribute exists on parent to allow from ... import ...
44-
parent.msk_iam_sasl_signer = fake_mod
45-
4636
return fake_mod
4737

4838

49-
def import_sut(monkeypatch):
39+
def import_sut(monkeypatch: Any) -> Any:
5040
"""Import or reload the module under test after ensuring the vendor symbol exists."""
5141
ensure_fake_vendor(monkeypatch)
5242
if MODULE_UNDER_TEST in sys.modules:
5343
return importlib.reload(sys.modules[MODULE_UNDER_TEST])
5444
return importlib.import_module(MODULE_UNDER_TEST)
5545

5646

57-
def test_oauth_cb_success_converts_ms_to_seconds(monkeypatch):
47+
def test_oauth_cb_success_converts_ms_to_seconds(monkeypatch: Any) -> None:
5848
sut = import_sut(monkeypatch)
5949

6050
# Monkeypatch the provider to return a known token and expiry in ms
6151
provider = cast(Any, sut).MSKAuthTokenProvider
6252

63-
def fake_generate():
53+
def fake_generate(region: str | None = None) -> tuple[str, int]:
6454
return "my-token", 12_345 # ms
6555

6656
monkeypatch.setattr(provider, "generate_auth_token", staticmethod(fake_generate))
@@ -71,10 +61,10 @@ def fake_generate():
7161
assert expiry_seconds == 12.345 # ms to seconds via division
7262

7363

74-
def test_oauth_cb_raises_and_logs_on_error(monkeypatch, caplog):
64+
def test_oauth_cb_raises_and_logs_on_error(monkeypatch: Any, caplog: Any) -> None:
7565
sut = import_sut(monkeypatch)
7666

77-
def boom():
67+
def boom(region: str | None = None) -> None:
7868
raise RuntimeError("signer blew up")
7969

8070
provider = cast(Any, sut).MSKAuthTokenProvider
@@ -93,14 +83,14 @@ def boom():
9383
)
9484

9585

96-
def test_oauth_cb_returns_tuple_types(monkeypatch):
86+
def test_oauth_cb_returns_tuple_types(monkeypatch: Any) -> None:
9787
sut = import_sut(monkeypatch)
9888

9989
provider = cast(Any, sut).MSKAuthTokenProvider
10090
monkeypatch.setattr(
10191
provider,
10292
"generate_auth_token",
103-
staticmethod(lambda: ("tkn", 1_000)), # 1000 ms
93+
staticmethod(lambda region=None: ("tkn", 1_000)), # 1000 ms
10494
)
10595

10696
result = sut.oauth_cb(None)

0 commit comments

Comments
 (0)