Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Idle Watermark Implementation Inside Async Data Movement for Source and ISB #2385

Merged
merged 25 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/shared/idlehandler/source_idlehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (iw *SourceIdleHandler) PublishSourceIdleWatermark(partitions []int32) {
// in this case, we can publish the idle watermark as the last published idle watermark + the increment by value.
nextIdleWM = iw.lastPublishedIdleWm.Add(iw.config.IdleSource.GetIncrementBy())
} else {
// if its not -1, then we can publish the idle watermark as the computed watermark + the increment by value.
// if it's not -1, then we can publish the idle watermark as the computed watermark + the increment by value.
nextIdleWM = computedWm.Add(iw.config.IdleSource.GetIncrementBy())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error {
for toVertexName, toVertexBuffers := range df.toBuffers {
for index := range toVertexBuffers {
// publish idle watermark to all the source partitions owned by this reader.
// it is 1:1 for many (HTTP, tickgen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing.
// it is 1:1 for many (HTTP, tick-gen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing.
for _, sp := range df.reader.Partitions(df.ctx) {
if vertexPublishers, ok := df.toVertexWMPublishers[toVertexName]; ok {
var publisher, ok = vertexPublishers[sp]
Expand Down
79 changes: 79 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions rust/extns/numaflow-pulsar/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,23 @@ pub struct PulsarSource {
/// timeout for each batch read request
timeout: Duration,
actor_tx: mpsc::Sender<ConsumerActorMessage>,
vertex_replica: u16,
}

impl PulsarSource {
pub async fn new(
config: PulsarSourceConfig,
batch_size: usize,
timeout: Duration,
vertex_replica: u16,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(10);
ConsumerReaderActor::start(config, rx).await?;
Ok(Self {
actor_tx: tx,
batch_size,
timeout,
vertex_replica,
})
}
}
Expand Down Expand Up @@ -292,7 +295,7 @@ impl PulsarSource {
None
}

pub fn partitions(&self) -> Vec<u16> {
unimplemented!()
pub fn partitions_vec(&self) -> Vec<u16> {
vec![self.vertex_replica]
}
}
Loading
Loading