Skip to content

Commit

Permalink
apply to hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Feb 4, 2025
1 parent b883aa5 commit 8b72e5f
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from typing import TYPE_CHECKING, Any, Optional
from typing import Any, Optional

if TYPE_CHECKING:
from dagster_msteams.client import Link
from dagster_msteams.utils import Link


class AdaptiveCard:
"""Class to contruct a MS Teams adaptive card for posting Dagster messages."""

def __init__(self, message: str, link: Optional["Link"] = None):
def __init__(self, message: str, link: Optional[Link] = None):
if link:
message += f" [{link.text}]({link.url})"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from collections.abc import Mapping
from typing import TYPE_CHECKING, Optional
from typing import Optional

if TYPE_CHECKING:
from dagster_msteams.client import Link
from dagster_msteams.utils import Link


class Card:
Expand All @@ -27,7 +26,7 @@ def _create_attachment(self, text_message: str) -> Mapping:
content_type = "application/vnd.microsoft.card.hero"
return {"contentType": content_type, "content": content}

def add_attachment(self, text_message: str, link: Optional["Link"] = None):
def add_attachment(self, text_message: str, link: Optional[Link] = None):
if link:
text_message += f" <a href='{link.url}'>{link.text}</a>"
hero_card_attachment = self._create_attachment(text_message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from collections.abc import Mapping
from typing import NamedTuple, Optional, cast
from typing import Optional, cast
from urllib.parse import urlparse

import dagster._check as check
from requests import codes, exceptions, post

from dagster_msteams.adaptive_card import AdaptiveCard
from dagster_msteams.card import Card


class Link(NamedTuple):
text: str
url: str
from dagster_msteams.utils import Link


class TeamsClient:
Expand Down
34 changes: 17 additions & 17 deletions python_modules/libraries/dagster-msteams/dagster_msteams/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
from dagster._core.execution.context.hook import HookContext
from dagster._utils.warnings import normalize_renamed_param

from dagster_msteams.card import Card
from dagster_msteams.resources import MSTeamsResource

# TODO test these hooks
from dagster_msteams.utils import Link


def _default_status_message(context: HookContext, status: str) -> str:
Expand Down Expand Up @@ -73,15 +71,16 @@ def my_job():

@failure_hook(required_resource_keys={"msteams"})
def _hook(context: HookContext):
text = message_fn(context)
if webserver_base_url:
text += f"<a href='{webserver_base_url}/runs/{context.run_id}'>View in Dagster UI</a>"
card = Card()
card.add_attachment(text_message=text)
message = message_fn(context)
link = (
Link("View in Dagster UI", f"{webserver_base_url}/runs/{context.run_id}")
if webserver_base_url
else None
)
if isinstance(context.resources.msteams, MSTeamsResource):
context.resources.msteams.get_client().post_message(payload=card.payload)
context.resources.msteams.get_client().post_message(message=message, link=link)
else:
context.resources.msteams.post_message(payload=card.payload)
context.resources.msteams.post_message(message=message, link=link)

return _hook

Expand Down Expand Up @@ -133,14 +132,15 @@ def my_job():

@success_hook(required_resource_keys={"msteams"})
def _hook(context: HookContext):
text = message_fn(context)
if webserver_base_url:
text += f"<a href='{webserver_base_url}/runs/{context.run_id}'>View in webserver</a>"
card = Card()
card.add_attachment(text_message=text)
message = message_fn(context)
link = (
Link("View in Dagster UI", f"{webserver_base_url}/runs/{context.run_id}")
if webserver_base_url
else None
)
if isinstance(context.resources.msteams, MSTeamsResource):
context.resources.msteams.get_client().post_message(payload=card.payload)
context.resources.msteams.get_client().post_message(message=message, link=link)
else:
context.resources.msteams.post_message(payload=card.payload)
context.resources.msteams.post_message(message=message, link=link)

return _hook
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import NamedTuple


class Link(NamedTuple):
text: str
url: str
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
# serializer version: 1
# name: test_failure_hook_with_pythonic_resource[https://foo.webhook.office.com/bar/baz]
_CallList([
_Call(
tuple(
'https://foo.webhook.office.com/bar/baz',
),
dict({
'headers': dict({
'Content-Type': 'application/json',
}),
'json': dict({
'attachments': list([
dict({
'content': dict({
'text': "Some custom text <a href='localhost:3000/runs/a791605c-fa84-4c59-9027-13d009c1c3a2'>View in Dagster UI</a>",
'title': 'Dagster Pipeline Alert',
}),
'contentType': 'application/vnd.microsoft.card.hero',
}),
]),
'type': 'message',
}),
'proxies': None,
'timeout': 60,
'verify': True,
}),
),
])
# ---
# name: test_failure_hook_with_pythonic_resource[https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0]
_CallList([
_Call(
tuple(
'https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0',
),
dict({
'headers': dict({
'Content-Type': 'application/json',
}),
'json': dict({
'attachments': list([
dict({
'content': dict({
'body': list([
dict({
'text': 'Some custom text [View in Dagster UI](localhost:3000/runs/d5358b53-37eb-42e2-9c46-97315843c304)',
'type': 'TextBlock',
'wrap': True,
}),
]),
'type': 'AdaptiveCard',
'version': '1.0',
}),
'contentType': 'application/vnd.microsoft.card.adaptive',
'contentUrl': None,
}),
]),
'type': 'message',
}),
'proxies': None,
'timeout': 60,
'verify': True,
}),
),
])
# ---
# name: test_success_hook_with_pythonic_resource[https://foo.webhook.office.com/bar/baz]
_CallList([
_Call(
tuple(
'https://foo.webhook.office.com/bar/baz',
),
dict({
'headers': dict({
'Content-Type': 'application/json',
}),
'json': dict({
'attachments': list([
dict({
'content': dict({
'text': '''
Op pass_op on job job_def succeeded!
Run ID: 135f8cfd-81ca-4448-8e7f-66b58201391f
''',
'title': 'Dagster Pipeline Alert',
}),
'contentType': 'application/vnd.microsoft.card.hero',
}),
]),
'type': 'message',
}),
'proxies': None,
'timeout': 60,
'verify': True,
}),
),
_Call(
tuple(
'https://foo.webhook.office.com/bar/baz',
),
dict({
'headers': dict({
'Content-Type': 'application/json',
}),
'json': dict({
'attachments': list([
dict({
'content': dict({
'text': '''
Op success_solid_with_hook on job job_def succeeded!
Run ID: 135f8cfd-81ca-4448-8e7f-66b58201391f
''',
'title': 'Dagster Pipeline Alert',
}),
'contentType': 'application/vnd.microsoft.card.hero',
}),
]),
'type': 'message',
}),
'proxies': None,
'timeout': 60,
'verify': True,
}),
),
])
# ---
# name: test_success_hook_with_pythonic_resource[https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0]
_CallList([
_Call(
tuple(
'https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0',
),
dict({
'headers': dict({
'Content-Type': 'application/json',
}),
'json': dict({
'attachments': list([
dict({
'content': dict({
'body': list([
dict({
'text': '''
Op pass_op on job job_def succeeded!
Run ID: f235ae37-e068-4f90-888d-d59a7cf017db
''',
'type': 'TextBlock',
'wrap': True,
}),
]),
'type': 'AdaptiveCard',
'version': '1.0',
}),
'contentType': 'application/vnd.microsoft.card.adaptive',
'contentUrl': None,
}),
]),
'type': 'message',
}),
'proxies': None,
'timeout': 60,
'verify': True,
}),
),
_Call(
tuple(
'https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0',
),
dict({
'headers': dict({
'Content-Type': 'application/json',
}),
'json': dict({
'attachments': list([
dict({
'content': dict({
'body': list([
dict({
'text': '''
Op success_solid_with_hook on job job_def succeeded!
Run ID: f235ae37-e068-4f90-888d-d59a7cf017db
''',
'type': 'TextBlock',
'wrap': True,
}),
]),
'type': 'AdaptiveCard',
'version': '1.0',
}),
'contentType': 'application/vnd.microsoft.card.adaptive',
'contentUrl': None,
}),
]),
'type': 'message',
}),
'proxies': None,
'timeout': 60,
'verify': True,
}),
),
])
# ---
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import patch

import pytest
from dagster_msteams.client import TeamsClient

Expand All @@ -23,3 +25,17 @@ def json_message():
def teams_client():
client = TeamsClient(hook_url="https://some_url_here/")
return client


@pytest.fixture(scope="function", name="mock_post_method")
def create_mock_post_method():
with patch("dagster_msteams.client.post") as mock_post:
mock_response = mock_post.return_value
mock_response.status_code = 200
mock_response.json.return_value = {"message": "Success"}
mock_response.text = "1"
yield mock_post


LEGACY_WEBHOOK_URL = "https://foo.webhook.office.com/bar/baz"
WEBHOOK_URL = "https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0"
Original file line number Diff line number Diff line change
@@ -1,35 +1,9 @@
import json
from typing import Any
from unittest.mock import patch

import pytest
from dagster_msteams.client import Link, TeamsClient


@pytest.fixture(scope="function", name="mock_post_method")
def create_mock_post_method():
with patch("dagster_msteams.client.post") as mock_post:
mock_response = mock_post.return_value
mock_response.status_code = 200
mock_response.json.return_value = {"message": "Success"}
mock_response.text = "1"
yield mock_post


@patch("dagster_msteams.client.TeamsClient.post_message")
def test_post_message(mock_teams_post_message, json_message, teams_client):
body = {"ok": True}
mock_teams_post_message.return_value = {
"status": 200,
"body": json.dumps(body),
"headers": "",
}
teams_client.post_message(json_message)
assert mock_teams_post_message.called


LEGACY_WEBHOOK_URL = "https://foo.webhook.office.com/bar/baz"
WEBHOOK_URL = "https://foo.westus.logic.azure.com:443/workflows/8be36cde7f394925af220480f6701bd0"
from dagster_msteams_tests.conftest import LEGACY_WEBHOOK_URL, WEBHOOK_URL


@pytest.mark.parametrize(
Expand Down
Loading

0 comments on commit 8b72e5f

Please sign in to comment.