Skip to content

Commit c1561e2

Browse files
committed
Update Topic name handling with Quix platform
- Added new field "Topic.quix_name" to store the non-prefixed topic name from Quix - Generate stream_ids in backwards-compatible way for
1 parent d17db69 commit c1561e2

File tree

9 files changed

+97
-20
lines changed

9 files changed

+97
-20
lines changed

quixstreams/dataframe/dataframe.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,7 @@ def stream_id(self) -> str:
172172
173173
By default, a topic name or a combination of topic names are used as `stream_id`.
174174
"""
175-
176-
topics = sorted(self._topics, key=lambda t: t.name)
177-
return "--".join(t.name for t in topics)
175+
return self._topic_manager.stream_id_from_topics(*self.topics)
178176

179177
@property
180178
def topics(self) -> list[Topic]:

quixstreams/models/topics/manager.py

+9
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,15 @@ def derive_topic_config(cls, *topics: Topic) -> TopicConfig:
332332
},
333333
)
334334

335+
def stream_id_from_topics(self, *topics: Topic) -> str:
336+
"""
337+
Generate a stream_id by combining names of the provided topics.
338+
"""
339+
if not topics:
340+
raise ValueError("At least one Topic must be passed")
341+
342+
return "--".join(t.name for t in topics)
343+
335344
def _validate_topic_name(self, name: str) -> None:
336345
"""
337346
Validates the original topic name

quixstreams/models/topics/topic.py

+5
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def __init__(
100100
value_serializer: Optional[SerializerType] = None,
101101
key_serializer: Optional[SerializerType] = BytesSerializer(),
102102
timestamp_extractor: Optional[TimestampExtractor] = None,
103+
quix_name: str = "",
103104
):
104105
"""
105106
:param name: topic name
@@ -110,8 +111,11 @@ def __init__(
110111
:param key_serializer: a serializer type for keys
111112
:param timestamp_extractor: a callable that returns a timestamp in
112113
milliseconds from a deserialized message.
114+
:param quix_name: a name of the topic in the Quix Cloud.
115+
It is set only by `QuixTopicManager`.
113116
"""
114117
self.name = name
118+
self.quix_name = quix_name or name
115119
self._create_config = copy.deepcopy(create_config)
116120
self._broker_config: Optional[TopicConfig] = None
117121
self._value_deserializer = _get_deserializer(value_deserializer)
@@ -127,6 +131,7 @@ def __clone__(
127131
):
128132
return self.__class__(
129133
name=name,
134+
quix_name=self.quix_name,
130135
create_config=create_config or self._create_config,
131136
value_deserializer=self._value_deserializer,
132137
key_deserializer=self._key_deserializer,

quixstreams/platforms/quix/config.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,6 @@ def convert_topic_response(cls, api_response: dict) -> Topic:
187187
topic_config = api_response["configuration"]
188188
extra_config["retention.ms"] = topic_config["retentionInMinutes"] * 60 * 1000
189189
extra_config["retention.bytes"] = topic_config["retentionInBytes"]
190-
# A hack to pass extra info back from Quix cloud
191-
extra_config["__quix_topic_name__"] = api_response["name"]
192190

193191
# Map value returned by Quix API to Kafka Admin API format
194192
if topic_config.get("cleanupPolicy"):
@@ -202,7 +200,11 @@ def convert_topic_response(cls, api_response: dict) -> Topic:
202200
replication_factor=topic_config["replicationFactor"],
203201
extra_config=extra_config,
204202
)
205-
topic = Topic(name=api_response["id"], create_config=config)
203+
topic = Topic(
204+
name=api_response["id"],
205+
create_config=config,
206+
quix_name=api_response["name"],
207+
)
206208
topic.broker_config = config
207209
return topic
208210

quixstreams/platforms/quix/topic_manager.py

+26-11
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,21 @@ def __init__(
5959
auto_create_topics=auto_create_topics,
6060
)
6161
self._quix_config_builder = quix_config_builder
62-
self._topic_id_to_name: dict[str, str] = {}
62+
63+
def stream_id_from_topics(self, *topics: Topic) -> str:
64+
"""
65+
Generate a stream_id by combining names of the provided topics.
66+
"""
67+
if not topics:
68+
raise ValueError("At least one Topic must be passed")
69+
elif len(topics) == 1:
70+
# If only one topic is passed, return its full name
71+
# for backwards compatibility
72+
return topics[0].name
73+
74+
# Use the "quix_name" to generate stream_id.
75+
# In Quix Cloud, the "quix_name" can differ from the actual broker topic name
76+
return "--".join(t.quix_name for t in topics)
6377

6478
def _fetch_topic(self, topic: Topic) -> Topic:
6579
try:
@@ -92,10 +106,6 @@ def _finalize_topic(self, topic: Topic) -> Topic:
92106
broker_topic = self._fetch_topic(topic=topic)
93107
broker_config = broker_topic.broker_config
94108

95-
# A hack to pass extra info back from Quix cloud
96-
quix_topic_name = broker_config.extra_config.pop("__quix_topic_name__")
97-
topic_out = topic.__clone__(name=broker_topic.name)
98-
99109
# Set a broker config for the topic
100110
broker_config = TopicConfig(
101111
num_partitions=broker_config.num_partitions,
@@ -106,8 +116,8 @@ def _finalize_topic(self, topic: Topic) -> Topic:
106116
if k in self._extra_config_imports
107117
},
108118
)
119+
topic_out = topic.__clone__(name=broker_topic.name)
109120
topic_out.broker_config = broker_config
110-
self._topic_id_to_name[topic_out.name] = quix_topic_name
111121
self._quix_config_builder.wait_for_topic_ready_statuses([topic_out])
112122
return topic_out
113123

@@ -148,8 +158,13 @@ def _internal_name(
148158
149159
:return: formatted topic name
150160
"""
151-
return super()._internal_name(
152-
topic_type,
153-
self._topic_id_to_name[topic_name] if topic_name else None,
154-
suffix,
155-
)
161+
162+
# Map the full topic name to the shorter "quix_name" and use
163+
# it for internal topics.
164+
# "quix_name" is not prefixed with the workspace id.
165+
if topic_name is not None:
166+
topic = self.non_changelog_topics.get(topic_name)
167+
if topic is not None:
168+
topic_name = topic.quix_name
169+
170+
return super()._internal_name(topic_type, topic_name, suffix)

tests/test_quixstreams/fixtures.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -600,9 +600,10 @@ def factory(
600600
key_deserializer: Optional[Union[Deserializer, str]] = None,
601601
value_deserializer: Optional[Union[Deserializer, str]] = None,
602602
timestamp_extractor: Optional[TimestampExtractor] = None,
603+
topic_manager: Optional[TopicManager] = None,
603604
) -> Topic:
604605
name = name or str(uuid.uuid4())
605-
topic_manager = topic_manager_factory()
606+
topic_manager = topic_manager or topic_manager_factory()
606607
topic_args = {
607608
"key_serializer": key_serializer,
608609
"value_serializer": value_serializer,

tests/test_quixstreams/test_dataframe/fixtures.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ def factory(
2626
registry: Optional[DataFrameRegistry] = None,
2727
) -> StreamingDataFrame:
2828
producer = producer if producer is not None else MagicMock(spec_set=RowProducer)
29-
topic_manager = topic_manager or MagicMock(spec=TopicManager)
29+
topic_manager = topic_manager or topic_manager_factory()
3030
state_manager = state_manager or MagicMock(spec=StateStoreManager)
31-
topic = topic or topic_manager_topic_factory("test")
31+
topic = topic or topic_manager_topic_factory(
32+
"test", topic_manager=topic_manager
33+
)
3234
consumer = MagicMock(spec_set=RowConsumer)
3335
pausing_manager = PausingManager(consumer=consumer, topic_manager=topic_manager)
3436
sink_manager = SinkManager()

tests/test_quixstreams/test_models/test_topics/test_manager.py

+14
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ def test_topic_with_config(self, topic_manager_factory):
4949
assert topic_manager.topics[topic_name] == topic
5050

5151
assert topic.name == topic_name
52+
assert topic.quix_name == topic_name
5253
assert topic.create_config.num_partitions == create_config.num_partitions
5354
assert (
5455
topic.create_config.replication_factor == create_config.replication_factor
@@ -321,3 +322,16 @@ def test_non_changelog_topics(self, topic_manager_factory):
321322
assert data_topic.name in topic_manager.non_changelog_topics
322323
assert repartition_topic.name in topic_manager.non_changelog_topics
323324
assert changelog_topic.name not in topic_manager.non_changelog_topics
325+
326+
def test_stream_id_from_topics_success(self, topic_manager_factory):
327+
topic_manager = topic_manager_factory()
328+
topic1 = topic_manager.topic("test1")
329+
topic2 = topic_manager.topic("test2")
330+
stream_id = topic_manager.stream_id_from_topics(topic1, topic2)
331+
332+
assert stream_id == "test1--test2"
333+
334+
def test_stream_id_from_topics_no_topics_fails(self, topic_manager_factory):
335+
topic_manager = topic_manager_factory()
336+
with pytest.raises(ValueError):
337+
topic_manager.stream_id_from_topics()

tests/test_quixstreams/test_platforms/test_quix/test_topic_manager.py

+31
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import pytest
2+
13
from quixstreams.models import TopicConfig
24

35

@@ -13,6 +15,7 @@ def test_quix_topic(self, quix_topic_manager_factory):
1315
topic_manager = quix_topic_manager_factory()
1416
topic = topic_manager.topic(topic_name, create_config=create_config)
1517
assert topic.name == expected_topic_id
18+
assert topic.quix_name == topic_name
1619
assert topic_manager.topics[topic.name] == topic
1720
assert topic.broker_config.num_partitions == create_config.num_partitions
1821
assert (
@@ -115,3 +118,31 @@ def test_quix_changelog_nested_internal_topic_naming(
115118
changelog.broker_config.replication_factor
116119
== repartition.broker_config.replication_factor
117120
)
121+
122+
def test_stream_id_from_topics_multiple_topics_success(
123+
self, quix_topic_manager_factory
124+
):
125+
topic_manager = quix_topic_manager_factory(workspace_id="workspace_id")
126+
topic1 = topic_manager.topic("test1")
127+
topic2 = topic_manager.topic("test2")
128+
stream_id = topic_manager.stream_id_from_topics(topic1, topic2)
129+
130+
assert stream_id == "test1--test2"
131+
132+
def test_stream_id_from_topics_single_topic_prefixed_with_workspace(
133+
self, quix_topic_manager_factory
134+
):
135+
"""
136+
Test that stream_id is prefixed with workspace_id if the single topic is passed
137+
for the backwards compatibility.
138+
"""
139+
topic_manager = quix_topic_manager_factory(workspace_id="workspace_id")
140+
topic1 = topic_manager.topic("test1")
141+
stream_id = topic_manager.stream_id_from_topics(topic1)
142+
143+
assert stream_id == "workspace_id-test1"
144+
145+
def test_stream_id_from_topics_no_topics_fails(self, quix_topic_manager_factory):
146+
topic_manager = quix_topic_manager_factory()
147+
with pytest.raises(ValueError):
148+
topic_manager.stream_id_from_topics()

0 commit comments

Comments
 (0)