Skip to content

Commit f81788f

Browse files
authored
No Consumer Reader (#580)
1 parent 30b4191 commit f81788f

File tree

7 files changed

+419
-30
lines changed

7 files changed

+419
-30
lines changed

tests/topics/test_topic_reader.py

+246
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,249 @@ async def wait(fut):
251251

252252
await reader0.close()
253253
await reader1.close()
254+
255+
256+
@pytest.fixture()
257+
def topic_selector(topic_with_messages):
258+
return ydb.TopicReaderSelector(path=topic_with_messages, partitions=[0])
259+
260+
261+
@pytest.mark.asyncio
262+
class TestTopicNoConsumerReaderAsyncIO:
263+
async def test_reader_with_no_partition_ids_raises(self, driver, topic_with_messages):
264+
with pytest.raises(ydb.Error):
265+
driver.topic_client.reader(
266+
topic_with_messages,
267+
consumer=None,
268+
event_handler=ydb.TopicReaderEvents.EventHandler(),
269+
)
270+
271+
async def test_reader_with_no_event_handler_raises(self, driver, topic_with_messages):
272+
with pytest.raises(ydb.Error):
273+
driver.topic_client.reader(
274+
topic_with_messages,
275+
consumer=None,
276+
)
277+
278+
async def test_reader_with_no_partition_ids_selector_raises(self, driver, topic_selector):
279+
topic_selector.partitions = None
280+
281+
with pytest.raises(ydb.Error):
282+
driver.topic_client.reader(
283+
topic_selector,
284+
consumer=None,
285+
event_handler=ydb.TopicReaderEvents.EventHandler(),
286+
)
287+
288+
async def test_reader_with_default_lambda(self, driver, topic_selector):
289+
reader = driver.topic_client.reader(
290+
topic_selector,
291+
consumer=None,
292+
event_handler=ydb.TopicReaderEvents.EventHandler(),
293+
)
294+
msg = await reader.receive_message()
295+
296+
assert msg.seqno == 1
297+
298+
await reader.close()
299+
300+
async def test_reader_with_sync_lambda(self, driver, topic_selector):
301+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
302+
def on_partition_get_start_offset(self, event):
303+
assert topic_selector.path.endswith(event.topic)
304+
assert event.partition_id == 0
305+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
306+
307+
reader = driver.topic_client.reader(
308+
topic_selector,
309+
consumer=None,
310+
event_handler=CustomEventHandler(),
311+
)
312+
313+
msg = await reader.receive_message()
314+
315+
assert msg.seqno == 2
316+
317+
await reader.close()
318+
319+
async def test_reader_with_async_lambda(self, driver, topic_selector):
320+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
321+
async def on_partition_get_start_offset(self, event):
322+
assert topic_selector.path.endswith(event.topic)
323+
assert event.partition_id == 0
324+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
325+
326+
reader = driver.topic_client.reader(
327+
topic_selector,
328+
consumer=None,
329+
event_handler=CustomEventHandler(),
330+
)
331+
332+
msg = await reader.receive_message()
333+
334+
assert msg.seqno == 2
335+
336+
await reader.close()
337+
338+
async def test_commit_not_allowed(self, driver, topic_selector):
339+
reader = driver.topic_client.reader(
340+
topic_selector,
341+
consumer=None,
342+
event_handler=ydb.TopicReaderEvents.EventHandler(),
343+
)
344+
batch = await reader.receive_batch()
345+
346+
with pytest.raises(ydb.Error):
347+
reader.commit(batch)
348+
349+
with pytest.raises(ydb.Error):
350+
await reader.commit_with_ack(batch)
351+
352+
await reader.close()
353+
354+
async def test_offsets_updated_after_reconnect(self, driver, topic_selector):
355+
current_offset = 0
356+
357+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
358+
def on_partition_get_start_offset(self, event):
359+
nonlocal current_offset
360+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)
361+
362+
reader = driver.topic_client.reader(
363+
topic_selector,
364+
consumer=None,
365+
event_handler=CustomEventHandler(),
366+
)
367+
msg = await reader.receive_message()
368+
369+
assert msg.seqno == current_offset + 1
370+
371+
current_offset += 2
372+
reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
373+
374+
await asyncio.sleep(0)
375+
376+
msg = await reader.receive_message()
377+
378+
assert msg.seqno == current_offset + 1
379+
380+
await reader.close()
381+
382+
383+
class TestTopicReaderWithoutConsumer:
384+
def test_reader_with_no_partition_ids_raises(self, driver_sync, topic_with_messages):
385+
with pytest.raises(ydb.Error):
386+
driver_sync.topic_client.reader(
387+
topic_with_messages,
388+
consumer=None,
389+
event_handler=ydb.TopicReaderEvents.EventHandler(),
390+
)
391+
392+
def test_reader_with_no_event_handler_raises(self, driver_sync, topic_with_messages):
393+
with pytest.raises(ydb.Error):
394+
driver_sync.topic_client.reader(
395+
topic_with_messages,
396+
consumer=None,
397+
)
398+
399+
def test_reader_with_no_partition_ids_selector_raises(self, driver_sync, topic_selector):
400+
topic_selector.partitions = None
401+
402+
with pytest.raises(ydb.Error):
403+
driver_sync.topic_client.reader(
404+
topic_selector,
405+
consumer=None,
406+
event_handler=ydb.TopicReaderEvents.EventHandler(),
407+
)
408+
409+
def test_reader_with_default_lambda(self, driver_sync, topic_selector):
410+
reader = driver_sync.topic_client.reader(
411+
topic_selector,
412+
consumer=None,
413+
event_handler=ydb.TopicReaderEvents.EventHandler(),
414+
)
415+
msg = reader.receive_message()
416+
417+
assert msg.seqno == 1
418+
419+
reader.close()
420+
421+
def test_reader_with_sync_lambda(self, driver_sync, topic_selector):
422+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
423+
def on_partition_get_start_offset(self, event):
424+
assert topic_selector.path.endswith(event.topic)
425+
assert event.partition_id == 0
426+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
427+
428+
reader = driver_sync.topic_client.reader(
429+
topic_selector,
430+
consumer=None,
431+
event_handler=CustomEventHandler(),
432+
)
433+
434+
msg = reader.receive_message()
435+
436+
assert msg.seqno == 2
437+
438+
reader.close()
439+
440+
def test_reader_with_async_lambda(self, driver_sync, topic_selector):
441+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
442+
async def on_partition_get_start_offset(self, event):
443+
assert topic_selector.path.endswith(event.topic)
444+
assert event.partition_id == 0
445+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(1)
446+
447+
reader = driver_sync.topic_client.reader(
448+
topic_selector,
449+
consumer=None,
450+
event_handler=CustomEventHandler(),
451+
)
452+
453+
msg = reader.receive_message()
454+
455+
assert msg.seqno == 2
456+
457+
reader.close()
458+
459+
def test_commit_not_allowed(self, driver_sync, topic_selector):
460+
reader = driver_sync.topic_client.reader(
461+
topic_selector,
462+
consumer=None,
463+
event_handler=ydb.TopicReaderEvents.EventHandler(),
464+
)
465+
batch = reader.receive_batch()
466+
467+
with pytest.raises(ydb.Error):
468+
reader.commit(batch)
469+
470+
with pytest.raises(ydb.Error):
471+
reader.commit_with_ack(batch)
472+
473+
reader.close()
474+
475+
def test_offsets_updated_after_reconnect(self, driver_sync, topic_selector):
476+
current_offset = 0
477+
478+
class CustomEventHandler(ydb.TopicReaderEvents.EventHandler):
479+
def on_partition_get_start_offset(self, event):
480+
nonlocal current_offset
481+
return ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse(current_offset)
482+
483+
reader = driver_sync.topic_client.reader(
484+
topic_selector,
485+
consumer=None,
486+
event_handler=CustomEventHandler(),
487+
)
488+
msg = reader.receive_message()
489+
490+
assert msg.seqno == current_offset + 1
491+
492+
current_offset += 2
493+
reader._async_reader._reconnector._stream_reader._set_first_error(ydb.Unavailable("some retriable error"))
494+
495+
msg = reader.receive_message()
496+
497+
assert msg.seqno == current_offset + 1
498+
499+
reader.close()

ydb/_grpc/grpcwrapper/ydb_topic.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -439,12 +439,13 @@ def from_proto(
439439
@dataclass
440440
class InitRequest(IToProto):
441441
topics_read_settings: List["StreamReadMessage.InitRequest.TopicReadSettings"]
442-
consumer: str
442+
consumer: Optional[str]
443443
auto_partitioning_support: bool
444444

445445
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.InitRequest:
446446
res = ydb_topic_pb2.StreamReadMessage.InitRequest()
447-
res.consumer = self.consumer
447+
if self.consumer is not None:
448+
res.consumer = self.consumer
448449
for settings in self.topics_read_settings:
449450
res.topics_read_settings.append(settings.to_proto())
450451
res.auto_partitioning_support = self.auto_partitioning_support

ydb/_topic_reader/events.py

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import Awaitable, Union
4+
5+
from ..issues import ClientInternalError
6+
7+
__all__ = [
8+
"OnCommit",
9+
"OnPartitionGetStartOffsetRequest",
10+
"OnPartitionGetStartOffsetResponse",
11+
"OnInitPartition",
12+
"OnShutdownPartition",
13+
"EventHandler",
14+
]
15+
16+
17+
class BaseReaderEvent:
18+
pass
19+
20+
21+
@dataclass
22+
class OnCommit(BaseReaderEvent):
23+
topic: str
24+
offset: int
25+
26+
27+
@dataclass
28+
class OnPartitionGetStartOffsetRequest(BaseReaderEvent):
29+
topic: str
30+
partition_id: int
31+
32+
33+
@dataclass
34+
class OnPartitionGetStartOffsetResponse:
35+
start_offset: int
36+
37+
38+
class OnInitPartition(BaseReaderEvent):
39+
pass
40+
41+
42+
class OnShutdownPartition:
43+
pass
44+
45+
46+
TopicEventDispatchType = Union[OnPartitionGetStartOffsetResponse, None]
47+
48+
49+
class EventHandler:
50+
def on_commit(self, event: OnCommit) -> Union[None, Awaitable[None]]:
51+
pass
52+
53+
def on_partition_get_start_offset(
54+
self,
55+
event: OnPartitionGetStartOffsetRequest,
56+
) -> Union[OnPartitionGetStartOffsetResponse, Awaitable[OnPartitionGetStartOffsetResponse]]:
57+
pass
58+
59+
def on_init_partition(self, event: OnInitPartition) -> Union[None, Awaitable[None]]:
60+
pass
61+
62+
def on_shutdown_partition(self, event: OnShutdownPartition) -> Union[None, Awaitable[None]]:
63+
pass
64+
65+
async def _dispatch(self, event: BaseReaderEvent) -> Awaitable[TopicEventDispatchType]:
66+
f = None
67+
if isinstance(event, OnCommit):
68+
f = self.on_commit
69+
elif isinstance(event, OnPartitionGetStartOffsetRequest):
70+
f = self.on_partition_get_start_offset
71+
elif isinstance(event, OnInitPartition):
72+
f = self.on_init_partition
73+
elif isinstance(event, OnShutdownPartition):
74+
f = self.on_shutdown_partition
75+
else:
76+
raise ClientInternalError("Unsupported topic reader event")
77+
78+
if asyncio.iscoroutinefunction(f):
79+
return await f(event)
80+
81+
return f(event)

0 commit comments

Comments
 (0)