Skip to content

Commit 0b3de3f

Browse files
authored
Merge pull request openwallet-foundation#1124 from dbluhm/fix/stateless-record-webhooks
Fix: stateless record webhooks
2 parents aa9a5c9 + f421fe2 commit 0b3de3f

File tree

4 files changed

+65
-17
lines changed

4 files changed

+65
-17
lines changed

aries_cloudagent/admin/server.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
LOGGER = logging.getLogger(__name__)
4141

4242
EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
43-
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")
43+
EVENT_PATTERN_RECORD = re.compile("^acapy::record::([^:]*)(?:::.*)?$")
4444

4545
EVENT_WEBHOOK_MAPPING = {
4646
"acapy::basicmessage::received": "basicmessages",
@@ -450,8 +450,8 @@ def sort_dict(raw: dict) -> dict:
450450

451451
event_bus = self.context.inject(EventBus, required=False)
452452
if event_bus:
453-
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
454-
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)
453+
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self._on_webhook_event)
454+
event_bus.subscribe(EVENT_PATTERN_RECORD, self._on_record_event)
455455

456456
for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
457457
event_bus.subscribe(
@@ -788,19 +788,19 @@ async def websocket_handler(self, request):
788788

789789
return ws
790790

791-
async def __on_webhook_event(self, profile: Profile, event: Event):
791+
async def _on_webhook_event(self, profile: Profile, event: Event):
792792
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
793793
webhook_topic = match.group(1) if match else None
794794
if webhook_topic:
795795
await self.send_webhook(profile, webhook_topic, event.payload)
796796

797-
async def __on_record_event(self, profile: Profile, event: Event):
797+
async def _on_record_event(self, profile: Profile, event: Event):
798798
match = EVENT_PATTERN_RECORD.search(event.topic)
799799
webhook_topic = match.group(1) if match else None
800800
if webhook_topic:
801801
await self.send_webhook(profile, webhook_topic, event.payload)
802802

803-
async def send_webhook(self, profile: Profile, topic: str, payload: dict):
803+
async def send_webhook(self, profile: Profile, topic: str, payload: dict = None):
804804
"""Add a webhook to the queue, to send to all registered targets."""
805805
wallet_id = profile.settings.get("wallet.id")
806806
webhook_urls = profile.settings.get("admin.webhook_urls")

aries_cloudagent/admin/tests/test_admin_server.py

+25-2
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,20 @@
22

33
from aiohttp import ClientSession, DummyCookieJar, TCPConnector, web
44
from aiohttp.test_utils import unused_port
5+
import pytest
56

67
from asynctest import TestCase as AsyncTestCase
78
from asynctest import mock as async_mock
89

10+
from .. import server as test_module
911
from ...config.default_context import DefaultContextBuilder
1012
from ...config.injection_context import InjectionContext
13+
from ...core.event_bus import Event
1114
from ...core.in_memory import InMemoryProfile
1215
from ...core.protocol_registry import ProtocolRegistry
1316
from ...transport.outbound.message import OutboundMessage
1417
from ...utils.stats import Collector
1518
from ...utils.task_queue import TaskQueue
16-
17-
from .. import server as test_module
1819
from ..server import AdminServer, AdminSetupError
1920

2021

@@ -434,3 +435,25 @@ async def test_server_health_state(self):
434435
) as response:
435436
assert response.status == 503
436437
await server.stop()
438+
439+
440+
@pytest.fixture
441+
async def server():
442+
test_class = TestAdminServer()
443+
await test_class.setUp()
444+
yield test_class.get_admin_server()
445+
await test_class.tearDown()
446+
447+
448+
@pytest.mark.asyncio
449+
@pytest.mark.parametrize(
450+
"event_topic, webhook_topic",
451+
[("acapy::record::topic", "topic"), ("acapy::record::topic::state", "topic")],
452+
)
453+
async def test_on_record_event(server, event_topic, webhook_topic):
454+
profile = InMemoryProfile.test_profile()
455+
with async_mock.patch.object(
456+
server, "send_webhook", async_mock.CoroutineMock()
457+
) as mock_send_webhook:
458+
await server._on_record_event(profile, Event(event_topic, None))
459+
mock_send_webhook.assert_called_once_with(profile, webhook_topic, None)

aries_cloudagent/messaging/models/base_record.py

+12-5
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class Meta:
7070
RECORD_ID_NAME = "id"
7171
RECORD_TYPE = None
7272
RECORD_TOPIC: Optional[str] = None
73+
EVENT_NAMESPACE: str = "acapy::record"
7374
LOG_STATE_FLAG = None
7475
TAG_NAMES = {"state"}
7576

@@ -371,20 +372,26 @@ async def delete_record(self, session: ProfileSession):
371372
await storage.delete_record(self.storage_record)
372373
# FIXME - update state and send webhook?
373374

374-
async def emit_event(self, session: ProfileSession, payload: Any):
375+
async def emit_event(self, session: ProfileSession, payload: Any = None):
375376
"""Emit an event.
376377
377378
Args:
378379
session: The profile session to use
379380
payload: The event payload
380381
"""
381382

382-
if not self.RECORD_TOPIC or not self.state or not payload:
383+
if not self.RECORD_TOPIC:
383384
return
384385

385-
await session.profile.notify(
386-
f"acapy::record::{self.RECORD_TOPIC}::{self.state}", payload
387-
)
386+
if self.state:
387+
topic = f"{self.EVENT_NAMESPACE}::{self.RECORD_TOPIC}::{self.state}"
388+
else:
389+
topic = f"{self.EVENT_NAMESPACE}::{self.RECORD_TOPIC}"
390+
391+
if not payload:
392+
payload = self.serialize()
393+
394+
await session.profile.notify(topic, payload)
388395

389396
@classmethod
390397
def log_state(

aries_cloudagent/messaging/models/tests/test_base_record.py

+22-4
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,29 @@ async def test_emit_event(self):
260260
session.profile.context.injector.bind_instance(EventBus, mock_event_bus)
261261
record = BaseRecordImpl()
262262
payload = {"test": "payload"}
263-
await record.emit_event(session, None) # cover short circuit
264-
await record.emit_event(session, payload) # cover short circuit
265-
record.RECORD_TOPIC = "topic"
266-
await record.emit_event(session, payload) # cover short circuit
263+
264+
# Records must have topic to emit events
265+
record.RECORD_TOPIC = None
266+
await record.emit_event(session, payload)
267267
assert mock_event_bus.events == []
268+
269+
record.RECORD_TOPIC = "topic"
270+
271+
# Stateless record with no payload emits event with serialized record
272+
await record.emit_event(session)
273+
assert mock_event_bus.events == [
274+
(session.profile, Event("acapy::record::topic", {}))
275+
]
276+
mock_event_bus.events.clear()
277+
278+
# Stateless record with payload emits event
279+
await record.emit_event(session, payload)
280+
assert mock_event_bus.events == [
281+
(session.profile, Event("acapy::record::topic", payload))
282+
]
283+
mock_event_bus.events.clear()
284+
285+
# Statefull record with payload emits event
268286
record.state = "test_state"
269287
await record.emit_event(session, payload)
270288
assert mock_event_bus.events == [

0 commit comments

Comments
 (0)