Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement StreamingDataFrame.concat() #802

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open

Conversation

daniil-quix
Copy link
Collaborator

@daniil-quix daniil-quix commented Mar 26, 2025

This PR adds a new feature - StreamingDataFrame.concat().
With .concat(), you can combine two different SDFs, get a new one, and transform data from multiple streams like a single one.
These SDFs may belong to different topics and be branches of the same SDF.

To achieve that, a massive overhaul was done.

How to use it

Use case 1: Combining two topics.

Requirements for using State stores when combining different topics:

  • Topics must have the same number of partitions.
    Otherwise, the code will raise an error.

  • The message keys should be distributed using the same algorithm (same partitioner) to perform stateful operations and repartitioning.

Note: The list applies only when the application uses State via stateful=True functions or windows.
If the app doesn't use state, it's ok to concat topics with different numbers of partitions.

from quixstreams import Application, State

topic_a = app.topic("topic-a")
topic_b = app.topic("topic-b")

sdf_a = app.dataframe(topic_a)
sdf_b = app.dataframe(topic_b)

# Combine "sdf_a" and "sdf_b" into a new dataframe "sdf_c"
# "sdf_c" will process messages from both topics and can use a shared state 
sdf_c = sdf_a.concat(sdf_b)

# "sdf_c" will register a new State store, independent of "sdf_a" and "sdf_b"
sdf_c.update(..., stateful=True)

# The repartition topics used by "group_by" are also independent of original "sdf_a" and "sdf_b"
sdf_c.group_by(...)

Use case 2: Combining branches of the same original StreamingDataFrame and topic.

Note: If the branches are not exclusive (when each message goes to each branch), the .concat() operation will output X times more values, where X is the number of branches.

from quixstreams import Application, State

topic_a = app.topic("topic-a")


sdf_a = app.dataframe(topic_a)

# Create branches with some processing code
branch_1 = sdf_a.filter(...).apply(...)

branch_2 = sdf_a.filter(...).apply(...)

# Combine branches back into the same dataframe
sdf_c = branch_1.combine(branch_2)

# "sdf_c" will use the same State stores as the branches
sdf_c.update(..., stateful=True)

# The repartition topics are also registered in the same namespace as the original topic
sdf_c.group_by(...)

What's changed

Introduced StreamingDataFrame.stream_id

Stream ID is used to break a 1:1 linking between input topics and State stores + repartitioning.

Though stream_id is still generated from topic names (and SDF can now belong to multiple topics), this ID is agnostic of the topics themselves and only identifies the data streams.
In future releases, we may allow customizing it.

There's a mapping between stream_ids and actual topics, which is used to assign proper state stores on rebalancing and to record the processed offsets to the changelog topics on checkpoint commits.
The mapping is maintained by DataFrameRegistry.

Added Topic.quix_name

Added Topic.quix_name to simplify mapping of topic IDs (prefixed with workspace_id) and actual names.
For other platforms, quix_name == name

Added StreamingDataFrame.concat()

StreamingDataFrame.concat() combines two different SDFs into one.
The new SDF will also have a new stream_id if different topics are combined.
If branches of the same SDF are combined, the stream_id remains the same.

Docs

The docs will be added later to this PR.

@daniil-quix daniil-quix force-pushed the feature/merge-state branch from c1561e2 to 7d842bb Compare March 26, 2025 16:53
@daniil-quix
Copy link
Collaborator Author

Closes #487

- Added stream_id as a new identifier of the SDF data sources
- Updated the registration of changelog and repartition topics
- Updated State to use stream_id instead of topics
- Fixed the current tests
- Add basic tests
- Validate the co-partitioning on state registration
- Added new field "Topic.quix_name" to store the non-prefixed topic name from Quix
- Generate stream_ids in backwards-compatible way for
- Test recovery with multiple input topics
- Test state with concatenated SDFs
@daniil-quix daniil-quix force-pushed the feature/merge-state branch from b498a17 to 119a3dd Compare March 27, 2025 18:56
Copy link
Contributor

@gwaramadze gwaramadze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 1

Copy link
Contributor

@gwaramadze gwaramadze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 2

Copy link
Contributor

@gwaramadze gwaramadze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Review
Development

Successfully merging this pull request may close these issues.

2 participants