Skip to content

Commit aa9a5c9

Browse files
Merge pull request openwallet-foundation#1063 from Indicio-tech/feature/event-bus
Feature/event bus
2 parents 8a60d1a + 708f9d3 commit aa9a5c9

File tree

39 files changed

+524
-317
lines changed

39 files changed

+524
-317
lines changed

aries_cloudagent/admin/base_server.py

-20
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@
22

33

44
from abc import ABC, abstractmethod
5-
from typing import Sequence
6-
7-
from ..core.profile import Profile
85

96

107
class BaseAdminServer(ABC):
@@ -23,20 +20,3 @@ async def start(self) -> None:
2320
@abstractmethod
2421
async def stop(self) -> None:
2522
"""Stop the webserver."""
26-
27-
@abstractmethod
28-
def add_webhook_target(
29-
self,
30-
target_url: str,
31-
topic_filter: Sequence[str] = None,
32-
max_attempts: int = None,
33-
):
34-
"""Add a webhook target."""
35-
36-
@abstractmethod
37-
def remove_webhook_target(self, target_url: str):
38-
"""Remove a webhook target."""
39-
40-
@abstractmethod
41-
async def send_webhook(self, profile: Profile, topic: str, payload: dict):
42-
"""Add a webhook to the queue, to send to all registered targets."""

aries_cloudagent/admin/server.py

+52-69
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,57 @@
11
"""Admin server classes."""
22

33
import asyncio
4+
from hmac import compare_digest
45
import logging
56
import re
7+
from typing import Callable, Coroutine
68
import uuid
9+
import warnings
710

8-
from typing import Callable, Coroutine, Sequence, Set
9-
10-
import aiohttp_cors
11-
import jwt
12-
13-
from hmac import compare_digest
1411
from aiohttp import web
1512
from aiohttp_apispec import (
1613
docs,
1714
response_schema,
1815
setup_aiohttp_apispec,
1916
validation_middleware,
2017
)
21-
18+
import aiohttp_cors
19+
import jwt
2220
from marshmallow import fields
2321

2422
from ..config.injection_context import InjectionContext
25-
from ..core.profile import Profile
23+
from ..core.event_bus import Event, EventBus
2624
from ..core.plugin_registry import PluginRegistry
25+
from ..core.profile import Profile
2726
from ..ledger.error import LedgerConfigError, LedgerTransactionError
2827
from ..messaging.models.openapi import OpenAPISchema
2928
from ..messaging.responder import BaseResponder
30-
from ..transport.queue.basic import BasicMessageQueue
29+
from ..multitenant.manager import MultitenantManager, MultitenantManagerError
30+
from ..storage.error import StorageNotFoundError
3131
from ..transport.outbound.message import OutboundMessage
32+
from ..transport.queue.basic import BasicMessageQueue
3233
from ..utils.stats import Collector
3334
from ..utils.task_queue import TaskQueue
3435
from ..version import __version__
35-
from ..multitenant.manager import MultitenantManager, MultitenantManagerError
36-
37-
from ..storage.error import StorageNotFoundError
3836
from .base_server import BaseAdminServer
3937
from .error import AdminSetupError
4038
from .request_context import AdminRequestContext
4139

42-
4340
LOGGER = logging.getLogger(__name__)
4441

42+
EVENT_PATTERN_WEBHOOK = re.compile("^acapy::webhook::(.*)$")
43+
EVENT_PATTERN_RECORD = re.compile("^acapy::record::(.*)::(.*)$")
44+
45+
EVENT_WEBHOOK_MAPPING = {
46+
"acapy::basicmessage::received": "basicmessages",
47+
"acapy::problem_report": "problem_report",
48+
"acapy::ping::received": "ping",
49+
"acapy::ping::response_received": "ping",
50+
"acapy::actionmenu::received": "actionmenu",
51+
"acapy::actionmenu::get-active-menu": "get-active-menu",
52+
"acapy::actionmenu::perform-menu-action": "perform-menu-action",
53+
}
54+
4555

4656
class AdminModulesSchema(OpenAPISchema):
4757
"""Schema for the modules endpoint."""
@@ -93,7 +103,6 @@ def __init__(
93103
self,
94104
profile: Profile,
95105
send: Coroutine,
96-
webhook: Coroutine,
97106
**kwargs,
98107
):
99108
"""
@@ -106,7 +115,6 @@ def __init__(
106115
super().__init__(**kwargs)
107116
self._profile = profile
108117
self._send = send
109-
self._webhook = webhook
110118

111119
async def send_outbound(self, message: OutboundMessage):
112120
"""
@@ -119,53 +127,23 @@ async def send_outbound(self, message: OutboundMessage):
119127

120128
async def send_webhook(self, topic: str, payload: dict):
121129
"""
122-
Dispatch a webhook.
130+
Dispatch a webhook. DEPRECATED: use the event bus instead.
123131
124132
Args:
125133
topic: the webhook topic identifier
126134
payload: the webhook payload value
127135
"""
128-
await self._webhook(self._profile, topic, payload)
136+
warnings.warn(
137+
"responder.send_webhook is deprecated; please use the event bus instead.",
138+
DeprecationWarning,
139+
)
140+
await self._profile.notify("acapy::webhook::" + topic, payload)
129141

130142
@property
131143
def send_fn(self) -> Coroutine:
132144
"""Accessor for async function to send outbound message."""
133145
return self._send
134146

135-
@property
136-
def webhook_fn(self) -> Coroutine:
137-
"""Accessor for the async function to dispatch a webhook."""
138-
return self._webhook
139-
140-
141-
class WebhookTarget:
142-
"""Class for managing webhook target information."""
143-
144-
def __init__(
145-
self,
146-
endpoint: str,
147-
topic_filter: Sequence[str] = None,
148-
max_attempts: int = None,
149-
):
150-
"""Initialize the webhook target."""
151-
self.endpoint = endpoint
152-
self.max_attempts = max_attempts
153-
self._topic_filter = None
154-
self.topic_filter = topic_filter # call setter
155-
156-
@property
157-
def topic_filter(self) -> Set[str]:
158-
"""Accessor for the target's topic filter."""
159-
return self._topic_filter
160-
161-
@topic_filter.setter
162-
def topic_filter(self, val: Sequence[str]):
163-
"""Setter for the target's topic filter."""
164-
filt = set(val) if val else None
165-
if filt and "*" in filt:
166-
filt = None
167-
self._topic_filter = filt
168-
169147

170148
@web.middleware
171149
async def ready_middleware(request: web.BaseRequest, handler: Coroutine):
@@ -270,7 +248,6 @@ def __init__(
270248
self.root_profile = root_profile
271249
self.task_queue = task_queue
272250
self.webhook_router = webhook_router
273-
self.webhook_targets = {}
274251
self.websocket_queues = {}
275252
self.site = None
276253
self.multitenant_manager = context.inject(MultitenantManager, required=False)
@@ -371,7 +348,6 @@ async def setup_context(request: web.Request, handler):
371348
responder = AdminResponder(
372349
profile,
373350
self.outbound_message_router,
374-
self.send_webhook,
375351
)
376352
profile.context.injector.bind_instance(BaseResponder, responder)
377353

@@ -472,6 +448,19 @@ def sort_dict(raw: dict) -> dict:
472448
if plugin_registry:
473449
plugin_registry.post_process_routes(self.app)
474450

451+
event_bus = self.context.inject(EventBus, required=False)
452+
if event_bus:
453+
event_bus.subscribe(EVENT_PATTERN_WEBHOOK, self.__on_webhook_event)
454+
event_bus.subscribe(EVENT_PATTERN_RECORD, self.__on_record_event)
455+
456+
for event_topic, webhook_topic in EVENT_WEBHOOK_MAPPING.items():
457+
event_bus.subscribe(
458+
re.compile(re.escape(event_topic)),
459+
lambda profile, event, webhook_topic=webhook_topic: self.send_webhook(
460+
profile, webhook_topic, event.payload
461+
),
462+
)
463+
475464
# order tags alphabetically, parameters deterministically and pythonically
476465
swagger_dict = self.app._state["swagger_dict"]
477466
swagger_dict.get("tags", []).sort(key=lambda t: t["name"])
@@ -799,21 +788,17 @@ async def websocket_handler(self, request):
799788

800789
return ws
801790

802-
def add_webhook_target(
803-
self,
804-
target_url: str,
805-
topic_filter: Sequence[str] = None,
806-
max_attempts: int = None,
807-
):
808-
"""Add a webhook target."""
809-
self.webhook_targets[target_url] = WebhookTarget(
810-
target_url, topic_filter, max_attempts
811-
)
791+
async def __on_webhook_event(self, profile: Profile, event: Event):
792+
match = EVENT_PATTERN_WEBHOOK.search(event.topic)
793+
webhook_topic = match.group(1) if match else None
794+
if webhook_topic:
795+
await self.send_webhook(profile, webhook_topic, event.payload)
812796

813-
def remove_webhook_target(self, target_url: str):
814-
"""Remove a webhook target."""
815-
if target_url in self.webhook_targets:
816-
del self.webhook_targets[target_url]
797+
async def __on_record_event(self, profile: Profile, event: Event):
798+
match = EVENT_PATTERN_RECORD.search(event.topic)
799+
webhook_topic = match.group(1) if match else None
800+
if webhook_topic:
801+
await self.send_webhook(profile, webhook_topic, event.payload)
817802

818803
async def send_webhook(self, profile: Profile, topic: str, payload: dict):
819804
"""Add a webhook to the queue, to send to all registered targets."""
@@ -825,8 +810,6 @@ async def send_webhook(self, profile: Profile, topic: str, payload: dict):
825810
metadata = {"x-wallet-id": wallet_id}
826811

827812
if self.webhook_router:
828-
# for idx, target in self.webhook_targets.items():
829-
# if not target.topic_filter or topic in target.topic_filter:
830813
for endpoint in webhook_urls:
831814
self.webhook_router(
832815
topic,

aries_cloudagent/admin/tests/test_admin_server.py

-33
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,6 @@
1818
from ..server import AdminServer, AdminSetupError
1919

2020

21-
class TestAdminResponder(AsyncTestCase):
22-
async def test_admin_responder(self):
23-
admin_responder = test_module.AdminResponder(
24-
None, async_mock.CoroutineMock(), async_mock.CoroutineMock()
25-
)
26-
27-
assert admin_responder.send_fn is admin_responder._send
28-
assert admin_responder.webhook_fn is admin_responder._webhook
29-
30-
message = test_module.OutboundMessage(payload="hello")
31-
await admin_responder.send_outbound(message)
32-
assert admin_responder._send.called_once_with(None, message)
33-
34-
await admin_responder.send_webhook("topic", {"payload": "hello"})
35-
assert admin_responder._webhook.called_once_with("topic", {"outbound": "hello"})
36-
37-
38-
class TestWebhookTarget(AsyncTestCase):
39-
async def test_webhook_target(self):
40-
webhook_target = test_module.WebhookTarget(
41-
endpoint="localhost:8888",
42-
topic_filter=["birthdays", "animal videos"],
43-
max_attempts=None,
44-
)
45-
assert webhook_target.topic_filter == {"birthdays", "animal videos"}
46-
47-
webhook_target.topic_filter = []
48-
assert webhook_target.topic_filter is None
49-
50-
webhook_target.topic_filter = ["duct cleaning", "*"]
51-
assert webhook_target.topic_filter is None
52-
53-
5421
class TestAdminServer(AsyncTestCase):
5522
async def setUp(self):
5623
self.message_results = []

aries_cloudagent/config/default_context.py

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from ..cache.base import BaseCache
88
from ..cache.in_memory import InMemoryCache
9+
from ..core.event_bus import EventBus
910
from ..core.plugin_registry import PluginRegistry
1011
from ..core.profile import ProfileManager, ProfileManagerProvider
1112
from ..core.protocol_registry import ProtocolRegistry
@@ -42,6 +43,9 @@ async def build_context(self) -> InjectionContext:
4243
# Global protocol registry
4344
context.injector.bind_instance(ProtocolRegistry, ProtocolRegistry())
4445

46+
# Global event bus
47+
context.injector.bind_instance(EventBus, EventBus())
48+
4549
# Global did resolver registry
4650
did_resolver_registry = DIDResolverRegistry()
4751
context.injector.bind_instance(DIDResolverRegistry, did_resolver_registry)

aries_cloudagent/connections/models/conn_record.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ def __eq__(self, other: Union[str, "ConnRecord.State"]) -> bool:
141141
return self is ConnRecord.State.get(other)
142142

143143
RECORD_ID_NAME = "connection_id"
144-
WEBHOOK_TOPIC = "connections"
144+
RECORD_TOPIC = "connections"
145145
LOG_STATE_FLAG = "debug.connections"
146146
TAG_NAMES = {"my_did", "their_did", "request_id", "invitation_key"}
147147

aries_cloudagent/core/conductor.py

-6
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,6 @@ async def setup(self):
146146
self.dispatcher.task_queue,
147147
self.get_stats,
148148
)
149-
webhook_urls = context.settings.get("admin.webhook_urls")
150-
if webhook_urls:
151-
for url in webhook_urls:
152-
self.admin_server.add_webhook_target(url)
153149
context.injector.bind_instance(BaseAdminServer, self.admin_server)
154150
except Exception:
155151
LOGGER.exception("Unable to register admin server")
@@ -206,7 +202,6 @@ async def start(self) -> None:
206202
responder = AdminResponder(
207203
self.root_profile,
208204
self.admin_server.outbound_message_router,
209-
self.admin_server.send_webhook,
210205
)
211206
context.injector.bind_instance(BaseResponder, responder)
212207

@@ -398,7 +393,6 @@ def inbound_message_router(
398393
profile,
399394
message,
400395
self.outbound_message_router,
401-
self.admin_server and self.admin_server.send_webhook,
402396
lambda completed: self.dispatch_complete(message, completed),
403397
)
404398
except (LedgerConfigError, LedgerTransactionError) as e:

0 commit comments

Comments
 (0)