Channel and window close issue with Reduce Vertex/UDF #2363
Replies: 7 comments 12 replies
-
are you incrementing the event-time in the events at the source? the 60s is based on the how the time progresses at source. |
Beta Was this translation helpful? Give feedback.
-
https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/windowing/windowing/ |
Beta Was this translation helpful? Give feedback.
-
if you set key to true, you will have to set the keys in the pervious map vertex. |
Beta Was this translation helpful? Give feedback.
-
you will have to turn on idle detection if your source is not emitting data. |
Beta Was this translation helpful? Give feedback.
-
Can you help me with following question @vigith
|
Beta Was this translation helpful? Give feedback.
-
@yhl25 @vigith if I use keyed events , set keyed: true for this reduce vertex and partitions:2 value to 2. Does this guarantee ordering? Will all events with same key be consumed by one partition? |
Beta Was this translation helpful? Give feedback.
-
@yhl25 @vigith suppose I set window length as 60s and the processing for events in first window has not yet completed and events from second window (say 60-120s) hit reduce function, does this affect events from first window? Will I loose the unprocessed events from first window or any other side effect that I should take care of? |
Beta Was this translation helpful? Give feedback.
-
My usecase Description:
I am using Reduce with fixed window and keyed set to false with source and sink as Kafka. The vertex needs to accumulate data in given time window which is set to 60s and send it to another external service for performing operations on those events. (All config values mentioned are used for testing/poc). i will add both UDF and Pipeline code below.
Issue Description:
When the first event hits Reduce function the event stays in for loop that works on datum data stream, and the for loop does not terminate and process data even after 60 s are over. The frequency of events coming in my usecase is not fixed and continuous i.e 2 events may come in first second and Third can come after 2 minutes for example. The event stay in loop or channel until the event comes just before window closes. I Also tried same UDF with keyed set to True but getting similar behaviour.
Expected Behaviour:
I expected that Reduce UDF will accumulate what ever events come in 60 s and after window length is completed it will close channel and terminate loop to process these accumulated events, even if there is no continuous event hitting Reduce. So event after first few seconds if no event came I expected it to terminate loop/ close channel and process event accumulated in first few seconds.
Actual Behaviour:
The events coming in first few seconds stay in loop and loop does not get terminated/channel does not close even after 60s is completed. Suddenly if some another event comes just before this window length is completed the channel closes and process all previous windows accumulated messages.
Pipeline Code:
Script Code:
LOGS:
Logs showing inconsistency when keyed set to false:
Log showing inconsistency when keyed set to true:
Note All logs after "Reduce called" indicate different window and finally due to some event coming at Reduce UDF just before some window length completes, all grouped messages from different windows got processed.
Question:
-> Is Reduce correct for my usecase ?
-> When I used storage config it stored and retried the unprocessed event but never removed those event even after successful retry, do we need to handle this explicitly? Config shown below:
Beta Was this translation helpful? Give feedback.
All reactions