Skip to content

feat: add async source transformer #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft

Conversation

kohlisid
Copy link
Contributor

@kohlisid kohlisid commented Jun 17, 2025

fixes #70

Copy link

codecov bot commented Jun 18, 2025

Codecov Report

Attention: Patch coverage is 96.34146% with 3 lines in your changes missing coverage. Please review.

Project coverage is 94.26%. Comparing base (0d87b8f) to head (77953fa).

Files with missing lines Patch % Lines
...flow/sourcetransformer/servicer/_async_servicer.py 96.61% 0 Missing and 2 partials ⚠️
pynumaflow/sourcetransformer/async_server.py 94.73% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #230      +/-   ##
==========================================
+ Coverage   94.19%   94.26%   +0.07%     
==========================================
  Files          58       60       +2     
  Lines        2359     2441      +82     
  Branches      119      124       +5     
==========================================
+ Hits         2222     2301      +79     
- Misses        100      101       +1     
- Partials       37       39       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sidhant Kohli <[email protected]>
@kohlisid kohlisid changed the title [WIP] feat: add async source transformer feat: add async source transformer Jun 18, 2025
@kohlisid
Copy link
Contributor Author

kohlisid commented Jun 18, 2025

Testing async sleep

async def my_handler(keys: list[str], datum: Datum) -> Messages:
    val = datum.value
    event_time = datum.event_time
    messages = Messages()
    current_datetime = datetime.now()
    # Get the current date and time
    current_time = current_datetime.time()
    print(f"BS: {current_time}")
    await asyncio.sleep(5)
    current_datetime = datetime.now()
    current_time = current_datetime.time()
    print(f"AS: {current_time}")
    messages.append(Message(value=val, event_time=event_time))

    return messages

readBatchSize: 2

coder@empty-44a6c781:/workspace/empty$ kubectl logs simple-pipeline-in-0-hlk99 -f
+ python example.py
BS: 22:04:50.015658
BS: 22:04:50.015766
AS: 22:04:55.020848
AS: 22:04:55.020998
BS: 22:04:55.023170
BS: 22:04:55.023238
AS: 22:05:00.026655
AS: 22:05:00.026788
BS: 22:05:00.028047
BS: 22:05:00.028122
AS: 22:05:05.031671
AS: 22:05:05.031804

kohlisid added 2 commits June 18, 2025 15:59
Signed-off-by: Sidhant Kohli <[email protected]>
Signed-off-by: Sidhant Kohli <[email protected]>
@kohlisid
Copy link
Contributor Author

read batch size : 10

r$ kubectl logs simple-pipeline-in-0-nngd6 -f
+ python example.py

BS: 23:09:13.998727
BS: 23:09:13.998855
BS: 23:09:13.998928
BS: 23:09:13.998989
BS: 23:09:13.999050
BS: 23:09:13.999114
BS: 23:09:13.999169
BS: 23:09:13.999223
BS: 23:09:13.999278
BS: 23:09:13.999327

AS: 23:09:19.004380
AS: 23:09:19.004530
AS: 23:09:19.004565
AS: 23:09:19.004589
AS: 23:09:19.004610
AS: 23:09:19.004631
AS: 23:09:19.004651
AS: 23:09:19.004666
AS: 23:09:19.004684
AS: 23:09:19.004703

BS: 23:09:19.008078
BS: 23:09:19.008166
BS: 23:09:19.008222
BS: 23:09:19.008273
BS: 23:09:19.008323
BS: 23:09:19.008373
BS: 23:09:19.008424
BS: 23:09:19.008475
BS: 23:09:19.008526
BS: 23:09:19.008571

AS: 23:09:24.012147
AS: 23:09:24.012275
AS: 23:09:24.012306
AS: 23:09:24.012329
AS: 23:09:24.012350
AS: 23:09:24.012371
AS: 23:09:24.012392
AS: 23:09:24.012411
AS: 23:09:24.012432
AS: 23:09:24.012452

@kohlisid
Copy link
Contributor Author

kohlisid commented Jun 18, 2025

User exception - caught on Numa

2025/06/18 23:16:31 failed c.grpcClt.SourceTransformFn stream.Recv: NonRetryable: UDF_EXECUTION_ERROR(transformer): Exception('USER PANIC - Uncaught')
{"level":"error","ts":"2025-06-18T23:16:31.758003221Z","logger":"numaflow","caller":"forward/data_forward.go:313","msg":"failed to apply source transformer","error":"gRPC client.SourceTransformFn failed, NonRetryable: UDF_EXECUTION_ERROR(transformer): Exception('USER PANIC - Uncaught')","stacktrace":"github.com/numaproj/numaflow/pkg/sources/forward.(*DataForward).forwardAChunk\n\t/home/runner/work/numaflow/numaflow/pkg/sources/forward/data_forward.go:313\ngithub.com/numaproj/numaflow/pkg/sources/forward.(*DataForward).Start.func1\n\t/home/runner/work/numaflow/numaflow/pkg/sources/forward/data_forward.go:149"}

@kohlisid kohlisid requested a review from cosmic-chichu June 18, 2025 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AsyncIO for Source-Transformer
1 participant