Skip to content

Commit 45cc3b7

Browse files
authored
Merge pull request #531 from ydb-platform/topic_metadata
Topic metadata
2 parents b92e428 + b1273fc commit 45cc3b7

11 files changed

+125
-17
lines changed

examples/topic/basic_example.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
99
config = ydb.DriverConfig(endpoint=endpoint, database=database)
1010
config.credentials = ydb.credentials_from_env_variables()
1111
driver = ydb.aio.Driver(config)
12-
await driver.wait(15)
12+
await driver.wait(5, fail_fast=True)
1313
return driver
1414

1515

@@ -25,7 +25,8 @@ async def create_topic(driver: ydb.aio.Driver, topic: str, consumer: str):
2525
async def write_messages(driver: ydb.aio.Driver, topic: str):
2626
async with driver.topic_client.writer(topic) as writer:
2727
for i in range(10):
28-
await writer.write(f"mess-{i}")
28+
mess = ydb.TopicWriterMessage(data=f"mess-{i}", metadata_items={"index": f"{i}"})
29+
await writer.write(mess)
2930
await asyncio.sleep(1)
3031

3132

@@ -38,6 +39,7 @@ async def read_messages(driver: ydb.aio.Driver, topic: str, consumer: str):
3839
print(mess.seqno)
3940
print(mess.created_at)
4041
print(mess.data.decode())
42+
print(mess.metadata_items)
4143
reader.commit(mess)
4244
except asyncio.TimeoutError:
4345
return

tests/conftest.py

+25
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,31 @@ async def topic_with_messages(driver, topic_consumer, database):
263263
return topic_path
264264

265265

266+
@pytest.fixture()
267+
@pytest.mark.asyncio()
268+
async def topic_with_messages_with_metadata(driver, topic_consumer, database):
269+
topic_path = database + "/test-topic-with-messages-with-metadata"
270+
try:
271+
await driver.topic_client.drop_topic(topic_path)
272+
except issues.SchemeError:
273+
pass
274+
275+
await driver.topic_client.create_topic(
276+
path=topic_path,
277+
consumers=[topic_consumer],
278+
)
279+
280+
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id", codec=ydb.TopicCodec.RAW)
281+
await writer.write_with_ack(
282+
[
283+
ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}),
284+
ydb.TopicWriterMessage(data="456".encode(), metadata_items={"key": b"value"}),
285+
]
286+
)
287+
await writer.close()
288+
return topic_path
289+
290+
266291
@pytest.fixture()
267292
@pytest.mark.asyncio()
268293
async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO:

tests/topics/test_topic_reader.py

+29
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,21 @@ async def test_read_message(self, driver, topic_with_messages, topic_consumer):
3030

3131
await reader.close()
3232

33+
async def test_read_metadata(self, driver, topic_with_messages_with_metadata, topic_consumer):
34+
reader = driver.topic_client.reader(topic_with_messages_with_metadata, topic_consumer)
35+
36+
expected_metadata_items = {"key": b"value"}
37+
38+
for _ in range(2):
39+
await reader.wait_message()
40+
msg = await reader.receive_message()
41+
42+
assert msg is not None
43+
assert msg.metadata_items
44+
assert msg.metadata_items == expected_metadata_items
45+
46+
await reader.close()
47+
3348
async def test_read_and_commit_with_close_reader(self, driver, topic_with_messages, topic_consumer):
3449
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
3550
message = await reader.receive_message()
@@ -135,6 +150,20 @@ def test_read_message(self, driver_sync, topic_with_messages, topic_consumer):
135150

136151
reader.close()
137152

153+
def test_read_metadata(self, driver_sync, topic_with_messages_with_metadata, topic_consumer):
154+
reader = driver_sync.topic_client.reader(topic_with_messages_with_metadata, topic_consumer)
155+
156+
expected_metadata_items = {"key": b"value"}
157+
158+
for _ in range(2):
159+
msg = reader.receive_message()
160+
161+
assert msg is not None
162+
assert msg.metadata_items
163+
assert msg.metadata_items == expected_metadata_items
164+
165+
reader.close()
166+
138167
def test_read_and_commit_with_close_reader(self, driver_sync, topic_with_messages, topic_consumer):
139168
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
140169
message = reader.receive_message()

tests/topics/test_topic_writer.py

+10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
1515
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
1616
await writer.close()
1717

18+
async def test_send_message_with_metadata(self, driver: ydb.aio.Driver, topic_path):
19+
writer = driver.topic_client.writer(topic_path, producer_id="test")
20+
await writer.write(ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}))
21+
await writer.close()
22+
1823
async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
1924
async with driver.topic_client.writer(
2025
topic_path,
@@ -136,6 +141,11 @@ def test_send_message(self, driver_sync: ydb.Driver, topic_path):
136141
writer.write(ydb.TopicWriterMessage(data="123".encode()))
137142
writer.close()
138143

144+
def test_send_message_with_metadata(self, driver_sync: ydb.Driver, topic_path):
145+
writer = driver_sync.topic_client.writer(topic_path, producer_id="test")
146+
writer.write(ydb.TopicWriterMessage(data="123".encode(), metadata_items={"key": "value"}))
147+
writer.close()
148+
139149
def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
140150
with driver_sync.topic_client.writer(
141151
topic_path,

ydb/_grpc/grpcwrapper/ydb_topic.py

+8
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ class MessageData(IToProto):
208208
data: bytes
209209
uncompressed_size: int
210210
partitioning: "StreamWriteMessage.PartitioningType"
211+
metadata_items: Dict[str, bytes]
211212

212213
def to_proto(
213214
self,
@@ -218,6 +219,10 @@ def to_proto(
218219
proto.data = self.data
219220
proto.uncompressed_size = self.uncompressed_size
220221

222+
for key, value in self.metadata_items.items():
223+
item = ydb_topic_pb2.MetadataItem(key=key, value=value)
224+
proto.metadata_items.append(item)
225+
221226
if self.partitioning is None:
222227
pass
223228
elif isinstance(self.partitioning, StreamWriteMessage.PartitioningPartitionID):
@@ -489,16 +494,19 @@ class MessageData(IFromProto):
489494
data: bytes
490495
uncompresed_size: int
491496
message_group_id: str
497+
metadata_items: Dict[str, bytes]
492498

493499
@staticmethod
494500
def from_proto(
495501
msg: ydb_topic_pb2.StreamReadMessage.ReadResponse.MessageData,
496502
) -> "StreamReadMessage.ReadResponse.MessageData":
503+
metadata_items = {meta.key: meta.value for meta in msg.metadata_items}
497504
return StreamReadMessage.ReadResponse.MessageData(
498505
offset=msg.offset,
499506
seq_no=msg.seq_no,
500507
created_at=msg.created_at.ToDatetime(),
501508
data=msg.data,
509+
metadata_items=metadata_items,
502510
uncompresed_size=msg.uncompressed_size,
503511
message_group_id=msg.message_group_id,
504512
)

ydb/_topic_reader/datatypes.py

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class PublicMessage(ICommittable, ISessionAlive):
4040
written_at: datetime.datetime
4141
producer_id: str
4242
data: Union[bytes, Any] # set as original decompressed bytes or deserialized object if deserializer set in reader
43+
metadata_items: Dict[str, bytes]
4344
_partition_session: PartitionSession
4445
_commit_start_offset: int
4546
_commit_end_offset: int

ydb/_topic_reader/topic_reader_asyncio.py

+1
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
627627
written_at=server_batch.written_at,
628628
producer_id=server_batch.producer_id,
629629
data=message_data.data,
630+
metadata_items=message_data.metadata_items,
630631
_partition_session=partition_session,
631632
_commit_start_offset=partition_session._next_message_start_commit_offset,
632633
_commit_end_offset=message_data.offset + 1,

ydb/_topic_reader/topic_reader_asyncio_test.py

+20
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def stub_message(id: int):
7474
written_at=datetime.datetime(2023, 3, 18, 14, 15),
7575
producer_id="",
7676
data=bytes(),
77+
metadata_items={},
7778
_partition_session=stub_partition_session(),
7879
_commit_start_offset=0,
7980
_commit_end_offset=1,
@@ -207,6 +208,7 @@ def create_message(
207208
written_at=datetime.datetime(2023, 2, 3, 14, 16),
208209
producer_id="test-producer-id",
209210
data=bytes(),
211+
metadata_items={},
210212
_partition_session=partition_session,
211213
_commit_start_offset=partition_session._next_message_start_commit_offset + offset_delta - 1,
212214
_commit_end_offset=partition_session._next_message_start_commit_offset + offset_delta,
@@ -250,6 +252,7 @@ def batch_size():
250252
seq_no=message.seqno,
251253
created_at=message.created_at,
252254
data=message.data,
255+
metadata_items={},
253256
uncompresed_size=len(message.data),
254257
message_group_id=message.message_group_id,
255258
)
@@ -445,6 +448,7 @@ async def test_commit_ranges_for_received_messages(
445448
written_at=datetime.datetime(2023, 3, 14, 15, 42),
446449
producer_id="asd",
447450
data=rb"123",
451+
metadata_items={},
448452
_partition_session=None,
449453
_commit_start_offset=5,
450454
_commit_end_offset=15,
@@ -468,6 +472,7 @@ async def test_commit_ranges_for_received_messages(
468472
written_at=datetime.datetime(2023, 3, 14, 15, 42),
469473
producer_id="asd",
470474
data=gzip.compress(rb"123"),
475+
metadata_items={},
471476
_partition_session=None,
472477
_commit_start_offset=5,
473478
_commit_end_offset=15,
@@ -490,6 +495,7 @@ async def test_commit_ranges_for_received_messages(
490495
offset=1,
491496
written_at=datetime.datetime(2023, 3, 14, 15, 42),
492497
producer_id="asd",
498+
metadata_items={},
493499
data=rb"123",
494500
_partition_session=None,
495501
_commit_start_offset=5,
@@ -504,6 +510,7 @@ async def test_commit_ranges_for_received_messages(
504510
written_at=datetime.datetime(2023, 3, 14, 15, 42),
505511
producer_id="asd",
506512
data=rb"456",
513+
metadata_items={},
507514
_partition_session=None,
508515
_commit_start_offset=5,
509516
_commit_end_offset=15,
@@ -527,6 +534,7 @@ async def test_commit_ranges_for_received_messages(
527534
written_at=datetime.datetime(2023, 3, 14, 15, 42),
528535
producer_id="asd",
529536
data=gzip.compress(rb"123"),
537+
metadata_items={},
530538
_partition_session=None,
531539
_commit_start_offset=5,
532540
_commit_end_offset=15,
@@ -540,6 +548,7 @@ async def test_commit_ranges_for_received_messages(
540548
written_at=datetime.datetime(2023, 3, 14, 15, 42),
541549
producer_id="asd",
542550
data=gzip.compress(rb"456"),
551+
metadata_items={},
543552
_partition_session=None,
544553
_commit_start_offset=5,
545554
_commit_end_offset=15,
@@ -766,6 +775,7 @@ async def test_free_buffer_after_partition_stop(self, stream, stream_reader, par
766775
seq_no=123,
767776
created_at=t,
768777
data=bytes(),
778+
metadata_items={},
769779
uncompresed_size=message_size,
770780
message_group_id="test-message-group",
771781
)
@@ -846,6 +856,7 @@ def reader_batch_count():
846856
created_at=created_at,
847857
data=data,
848858
uncompresed_size=len(data),
859+
metadata_items={},
849860
message_group_id=message_group_id,
850861
)
851862
],
@@ -877,6 +888,7 @@ def reader_batch_count():
877888
written_at=written_at,
878889
producer_id=producer_id,
879890
data=data,
891+
metadata_items={},
880892
_partition_session=partition_session,
881893
_commit_start_offset=expected_message_offset,
882894
_commit_end_offset=expected_message_offset + 1,
@@ -923,6 +935,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
923935
seq_no=3,
924936
created_at=created_at,
925937
data=data,
938+
metadata_items={},
926939
uncompresed_size=len(data),
927940
message_group_id=message_group_id,
928941
)
@@ -944,6 +957,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
944957
seq_no=2,
945958
created_at=created_at2,
946959
data=data,
960+
metadata_items={},
947961
uncompresed_size=len(data),
948962
message_group_id=message_group_id,
949963
)
@@ -960,6 +974,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
960974
seq_no=3,
961975
created_at=created_at3,
962976
data=data2,
977+
metadata_items={},
963978
uncompresed_size=len(data2),
964979
message_group_id=message_group_id,
965980
),
@@ -968,6 +983,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
968983
seq_no=5,
969984
created_at=created_at4,
970985
data=data,
986+
metadata_items={},
971987
uncompresed_size=len(data),
972988
message_group_id=message_group_id2,
973989
),
@@ -998,6 +1014,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
9981014
written_at=written_at,
9991015
producer_id=producer_id,
10001016
data=data,
1017+
metadata_items={},
10011018
_partition_session=partition_session,
10021019
_commit_start_offset=partition1_mess1_expected_offset,
10031020
_commit_end_offset=partition1_mess1_expected_offset + 1,
@@ -1018,6 +1035,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10181035
written_at=written_at2,
10191036
producer_id=producer_id,
10201037
data=data,
1038+
metadata_items={},
10211039
_partition_session=second_partition_session,
10221040
_commit_start_offset=partition2_mess1_expected_offset,
10231041
_commit_end_offset=partition2_mess1_expected_offset + 1,
@@ -1038,6 +1056,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10381056
written_at=written_at2,
10391057
producer_id=producer_id2,
10401058
data=data2,
1059+
metadata_items={},
10411060
_partition_session=second_partition_session,
10421061
_commit_start_offset=partition2_mess2_expected_offset,
10431062
_commit_end_offset=partition2_mess2_expected_offset + 1,
@@ -1051,6 +1070,7 @@ async def test_read_batches(self, stream_reader, partition_session, second_parti
10511070
written_at=written_at2,
10521071
producer_id=producer_id,
10531072
data=data,
1073+
metadata_items={},
10541074
_partition_session=second_partition_session,
10551075
_commit_start_offset=partition2_mess3_expected_offset,
10561076
_commit_end_offset=partition2_mess3_expected_offset + 1,

0 commit comments

Comments
 (0)