Skip to content

Commit

Permalink
feat: Idle Watermark Implementation Inside Async Data Movement for So…
Browse files Browse the repository at this point in the history
…urce and ISB (#2385)

Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Feb 11, 2025
1 parent bf0f9db commit 8d51c66
Show file tree
Hide file tree
Showing 44 changed files with 3,024 additions and 469 deletions.
2 changes: 1 addition & 1 deletion pkg/shared/idlehandler/source_idlehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (iw *SourceIdleHandler) PublishSourceIdleWatermark(partitions []int32) {
// in this case, we can publish the idle watermark as the last published idle watermark + the increment by value.
nextIdleWM = iw.lastPublishedIdleWm.Add(iw.config.IdleSource.GetIncrementBy())
} else {
// if its not -1, then we can publish the idle watermark as the computed watermark + the increment by value.
// if it's not -1, then we can publish the idle watermark as the computed watermark + the increment by value.
nextIdleWM = computedWm.Add(iw.config.IdleSource.GetIncrementBy())
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) error {
for toVertexName, toVertexBuffers := range df.toBuffers {
for index := range toVertexBuffers {
// publish idle watermark to all the source partitions owned by this reader.
// it is 1:1 for many (HTTP, tickgen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing.
// it is 1:1 for many (HTTP, tick-gen, etc.) but for e.g., for Kafka it is 1:N and the list of partitions in the N could keep changing.
for _, sp := range df.reader.Partitions(df.ctx) {
if vertexPublishers, ok := df.toVertexWMPublishers[toVertexName]; ok {
var publisher, ok = vertexPublishers[sp]
Expand Down
79 changes: 79 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions rust/extns/numaflow-pulsar/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,23 @@ pub struct PulsarSource {
/// timeout for each batch read request
timeout: Duration,
actor_tx: mpsc::Sender<ConsumerActorMessage>,
vertex_replica: u16,
}

impl PulsarSource {
pub async fn new(
config: PulsarSourceConfig,
batch_size: usize,
timeout: Duration,
vertex_replica: u16,
) -> Result<Self> {
let (tx, rx) = mpsc::channel(10);
ConsumerReaderActor::start(config, rx).await?;
Ok(Self {
actor_tx: tx,
batch_size,
timeout,
vertex_replica,
})
}
}
Expand Down Expand Up @@ -292,7 +295,7 @@ impl PulsarSource {
None
}

pub fn partitions(&self) -> Vec<u16> {
unimplemented!()
pub fn partitions_vec(&self) -> Vec<u16> {
vec![self.vertex_replica]
}
}
Loading

0 comments on commit 8d51c66

Please sign in to comment.