Skip to content

Commit 04fdac2

Browse files
committed
Improvements to internal messaging debug logging
1 parent 68b892a commit 04fdac2

File tree

9 files changed

+39
-23
lines changed

9 files changed

+39
-23
lines changed

docs/explanation/internal-architecture.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ Lightbus' internal workings are composed of:
1818
![Internal Architecture Diagram][diagram]
1919

2020
[diagram]: ../static/images/internal-architecture.png
21+

lightbus/client/docks/base.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ def __init__(
2929
self.api_registry = api_registry
3030
self.config = config
3131
self.error_queue = error_queue
32-
self.producer = InternalProducer(queue=produce_to, error_queue=error_queue)
33-
self.consumer = InternalConsumer(queue=consume_from, error_queue=error_queue)
32+
self.producer = InternalProducer(
33+
name=self.__class__.__name__, queue=produce_to, error_queue=error_queue
34+
)
35+
self.consumer = InternalConsumer(
36+
name=self.__class__.__name__, queue=consume_from, error_queue=error_queue
37+
)
3438

3539
self.producer.start()
3640
self.consumer.start(self.handle)

lightbus/client/internal_messaging/consumer.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ class InternalConsumer:
1717
consumption by the transport
1818
"""
1919

20-
def __init__(self, queue: InternalQueue, error_queue: ErrorQueueType):
20+
def __init__(self, name: str, queue: InternalQueue, error_queue: ErrorQueueType):
2121
self._consumer_task: Optional[asyncio.Task] = None
2222
self._running_commands = set()
2323
self._ready = asyncio.Event()
24+
self.name = name
2425
self.queue = queue
2526
self.error_queue = error_queue
2627

@@ -30,7 +31,7 @@ def start(self, handler: Callable):
3031
Use `stop()` to shutdown the invoker.
3132
"""
3233
logger.debug(
33-
f"Starting consumer for handler {handler.__qualname__}(). This should report ready"
34+
f"Starting {self.name} consumer for handler {handler.__qualname__}(). This should report ready"
3435
" shortly..."
3536
)
3637
self._consumer_task = asyncio.ensure_future(
@@ -58,7 +59,9 @@ async def close(self):
5859

5960
async def _consumer_loop(self, queue, handler):
6061
"""Continually fetch commands from the queue and handle them"""
61-
logger.debug(f"Consumer loop is ready with handler {handler.__qualname__}()")
62+
logger.debug(
63+
f"Consumer loop for {self.name} is ready with handler {handler.__qualname__}()"
64+
)
6265
self._ready.set()
6366

6467
while True:
@@ -75,7 +78,7 @@ def handle_in_background(self, queue: InternalQueue, handler, command, on_done:
7578
7679
This execution happens in the background.
7780
"""
78-
logger.debug(f"Handling command {command}")
81+
logger.debug(f"Handling command {command} in consumer {self.name}")
7982

8083
def when_task_finished(fut: asyncio.Future):
8184
self._running_commands.remove(fut)

lightbus/client/internal_messaging/producer.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,14 @@ class InternalProducer:
5050
# How often should the queue sizes be monitored
5151
monitor_interval = 0.1
5252

53-
def __init__(self, queue: InternalQueue, error_queue: ErrorQueueType):
53+
def __init__(self, name: str, queue: InternalQueue, error_queue: ErrorQueueType):
5454
"""Initialise the invoker
5555
5656
The callable specified by `on_exception` will be called with a single positional argument,
5757
which is the exception which occurred. This should take care of shutting down the invoker,
5858
as well as any other cleanup which needs to happen.
5959
"""
60+
self.name = name
6061
self._queue_monitor_task: Optional[asyncio.Task] = None
6162
self._monitor_ready = asyncio.Event()
6263
self.queue = queue
@@ -105,20 +106,18 @@ async def _queue_monitor(self):
105106

106107
logger.warning(
107108
"Queue in %s has shrunk back down to %s commands.%s",
108-
self.__class__.__name__,
109+
self.name,
109110
current_size,
110111
everything_ok,
111112
)
112113
elif show_size_warning:
113-
logger.warning(
114-
"Queue in %s now has %s commands.", self.__class__.__name__, current_size
115-
)
114+
logger.warning("Queue in %s now has %s commands.", self.name, current_size)
116115

117116
previous_size = current_size
118117
await asyncio.sleep(self.monitor_interval)
119118

120119
def send(self, command) -> asyncio.Event:
121-
logger.debug(f"Sending command {command}")
120+
logger.debug(f"Sending command {command} in producer {self.name}")
122121
event = asyncio.Event()
123122
self.queue.put_nowait((command, event))
124123
return event

lightbus/client/subclients/base.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@ def __init__(
2626
self.config = config
2727
self.schema = schema
2828
self.error_queue = error_queue
29-
self.producer = InternalProducer(queue=produce_to, error_queue=error_queue)
30-
self.consumer = InternalConsumer(queue=consume_from, error_queue=error_queue)
29+
self.producer = InternalProducer(
30+
name=self.__class__.__name__, queue=produce_to, error_queue=error_queue
31+
)
32+
self.consumer = InternalConsumer(
33+
name=self.__class__.__name__, queue=consume_from, error_queue=error_queue
34+
)
3135

3236
self.producer.start()
3337
self.consumer.start(self.handle)

lightbus/transports/redis/rpc.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727

2828

2929
class RedisRpcTransport(RedisTransportMixin, RpcTransport):
30-
""" Redis RPC transport providing at-most-once delivery
30+
"""Redis RPC transport providing at-most-once delivery
3131
3232
This transport uses a redis list and a blocking pop operation
3333
to distribute an RPC call to a single RPC consumer.
3434
3535
Each call also has a corresponding expiry key created. Once the
3636
key expires it should be assumed that the RPC call has timed
37-
out and that therefore is should be discarded rather than
37+
out and that therefore should be discarded rather than
3838
be processed.
3939
"""
4040

@@ -177,6 +177,7 @@ async def _consume_rpcs(self, apis: Sequence[Api]) -> Sequence[RpcMessage]:
177177
# We need to manually close the connection here otherwise the aioredis
178178
# pool will emit warnings saying that this connection still has pending
179179
# commands (i.e. the above blocking pop)
180+
logger.debug("Closing redis connection")
180181
redis.close()
181182
raise
182183

lightbus/utilities/async_tools.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,5 +181,5 @@ async def call_on_schedule(callback, schedule: "Job", also_run_immediately: bool
181181

182182

183183
async def delayed_startup(coroutine: Coroutine, delay: float):
184-
await asyncio.sleep(delay)
184+
# await asyncio.sleep(delay)
185185
return await coroutine

tests/client/internal_messaging/conftest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ async def consumer():
1212
def _on_exception(e):
1313
raise e
1414

15-
consumer = InternalConsumer(queue=InternalQueue(), error_queue=InternalQueue())
15+
consumer = InternalConsumer(
16+
name="TestConsumer", queue=InternalQueue(), error_queue=InternalQueue()
17+
)
1618
yield consumer
1719
await consumer.close()
1820

@@ -22,7 +24,9 @@ async def producer():
2224
def _on_exception(e):
2325
raise e
2426

25-
producer = InternalProducer(queue=InternalQueue(), error_queue=InternalQueue())
27+
producer = InternalProducer(
28+
name="TestProducer", queue=InternalQueue(), error_queue=InternalQueue()
29+
)
2630
yield producer
2731
await producer.close()
2832

tests/client/internal_messaging/test_producer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ async def test_queue_monitor(producer: InternalProducer, caplog: LogCaptureFixtu
4141

4242
# Now we have logging
4343
assert len(caplog.records) == 1
44-
assert caplog.records[0].getMessage() == "Queue in InternalProducer now has 3 commands."
44+
assert caplog.records[0].getMessage() == "Queue in TestProducer now has 3 commands."
4545
caplog.clear() # Clear the log messages
4646

4747
# Let's check we get another messages when the queue gets bigger again
4848
producer.queue.put_nowait(None)
4949
await asyncio.sleep(0.05)
5050

5151
assert len(caplog.records) == 1
52-
assert caplog.records[0].getMessage() == "Queue in InternalProducer now has 4 commands."
52+
assert caplog.records[0].getMessage() == "Queue in TestProducer now has 4 commands."
5353
caplog.clear() # Clear the log messages
5454

5555
# Now check we get logging when the queue shrinks, but is still above the warning level
@@ -58,7 +58,7 @@ async def test_queue_monitor(producer: InternalProducer, caplog: LogCaptureFixtu
5858

5959
assert len(caplog.records) == 1
6060
assert caplog.records[0].getMessage() == (
61-
"Queue in InternalProducer has shrunk back down to 3 commands."
61+
"Queue in TestProducer has shrunk back down to 3 commands."
6262
)
6363
caplog.clear() # Clear the log messages
6464

@@ -68,7 +68,7 @@ async def test_queue_monitor(producer: InternalProducer, caplog: LogCaptureFixtu
6868

6969
assert len(caplog.records) == 1
7070
assert caplog.records[0].getMessage() == (
71-
"Queue in InternalProducer has shrunk back down to 2 commands. "
71+
"Queue in TestProducer has shrunk back down to 2 commands. "
7272
"Queue is now at an OK size again."
7373
)
7474
caplog.clear() # Clear the log messages

0 commit comments

Comments
 (0)