Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: shutdown when we see non retryable udf errors #2204

Merged
merged 6 commits into from
Nov 6, 2024
Merged

Conversation

yhl25
Copy link
Contributor

@yhl25 yhl25 commented Nov 5, 2024

Client side

Sink

2024/11/05 06:26:32 ServerInfo: &{uds go 1.3.1-z v0.8.1 map[]}
{"level":"info","ts":"2024-11-05T06:26:32.140803178Z","logger":"numaflow.Sink-processor","caller":"jetstream/kv_store.go:264","msg":"Successfully created watcher","pipeline":"simple-pipeline","vertex":"out","kvName":"default-simple-pipeline-cat-out_PROCESSORS","watcher":"default-simple-pipeline-cat-out_PROCESSORS"}
{"level":"info","ts":"2024-11-05T06:26:32.140827511Z","logger":"numaflow.Sink-processor","caller":"jetstream/kv_store.go:199","msg":"Watcher initialization and subscription got nil value","pipeline":"simple-pipeline","vertex":"out","kvName":"default-simple-pipeline-cat-out_PROCESSORS"}
{"level":"info","ts":"2024-11-05T06:26:32.140840261Z","logger":"numaflow.Sink-processor","caller":"fetch/processor_manager.go:212","msg":"Successfully added a new fromProcessor","pipeline":"simple-pipeline","vertex":"out","fromProcessor":"simple-pipeline-cat-0"}
{"level":"info","ts":"2024-11-05T06:26:32.14197322Z","logger":"numaflow.Sink-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"simple-pipeline","vertex":"out"}
{"level":"info","ts":"2024-11-05T06:26:32.14228647Z","logger":"numaflow.Sink-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"simple-pipeline","vertex":"out"}
{"level":"info","ts":"2024-11-05T06:26:32.142302886Z","logger":"numaflow.Sink-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"simple-pipeline","vertex":"out"}
{"level":"info","ts":"2024-11-05T06:26:32.142405845Z","logger":"numaflow.Sink-processor","caller":"sinks/sink.go:257","msg":"Start processing sink messages ","pipeline":"simple-pipeline","vertex":"out","isbsvc":"jetstream","fromPartition ":"default-simple-pipeline-out-0"}
{"level":"info","ts":"2024-11-05T06:26:32.142431345Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:120","msg":"Starting sink forwarder...","pipeline":"simple-pipeline","vertex":"out"}
{"level":"error","ts":"2024-11-05T06:26:32.147017303Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:271","msg":"failed to write to sink","pipeline":"simple-pipeline","vertex":"out","error":"gRPC client.SinkFn failed, failed to receive sink response: rpc error: code = Unknown desc = panic inside sink handler: Sink is not valid","stacktrace":"github.com/numaproj/numaflow/pkg/sinks/forward.(*DataForward).forwardAChunk\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/sinks/forward/forward.go:271\ngithub.com/numaproj/numaflow/pkg/sinks/forward.(*DataForward).Start.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/sinks/forward/forward.go:140"}
{"level":"error","ts":"2024-11-05T06:26:32.148659386Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:141","msg":"Failed to forward a chunk","pipeline":"simple-pipeline","vertex":"out","error":"gRPC client.SinkFn failed, failed to receive sink response: rpc error: code = Unknown desc = panic inside sink handler: Sink is not valid","stacktrace":"github.com/numaproj/numaflow/pkg/sinks/forward.(*DataForward).Start.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/sinks/forward/forward.go:141"}
{"level":"info","ts":"2024-11-05T06:26:32.148691011Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:154","msg":"Closed buffer reader","pipeline":"simple-pipeline","vertex":"out","bufferFrom":"default-simple-pipeline-out-0"}
{"level":"info","ts":"2024-11-05T06:26:32.148695345Z","logger":"numaflow.Sink-processor","caller":"forward/forward.go:161","msg":"Closed sink writer","pipeline":"simple-pipeline","vertex":"out","sink":"out"}
{"level":"error","ts":"2024-11-05T06:26:32.148701928Z","logger":"numaflow.Sink-processor","caller":"sinks/sink.go:269","msg":"Sink forwarder stopped with error","pipeline":"simple-pipeline","vertex":"out","fromPartition":"default-simple-pipeline-out-0","error":"gRPC client.SinkFn failed, failed to receive sink response: rpc error: code = Unknown desc = panic inside sink handler: Sink is not valid","stacktrace":"github.com/numaproj/numaflow/pkg/sinks.(*SinkProcessor).Start.func3\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/sinks/sink.go:269"}
{"level":"info","ts":"2024-11-05T06:26:32.148717803Z","logger":"numaflow.Sink-processor","caller":"sinks/sink.go:306","msg":"Exited...","pipeline":"simple-pipeline","vertex":"out"}

Map

{"level":"info","ts":"2024-11-05T07:07:29.203806724Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:293","msg":"Start processing udf messages","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","isbsvc":"jetstream","from":"default-simple-pipeline-cat-0","to":["default-simple-pipeline-out-0"]}
{"level":"info","ts":"2024-11-05T07:07:29.203829099Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:121","msg":"Starting forwarder...","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf"}
{"level":"info","ts":"2024-11-05T07:07:29.204130808Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"simple-pipeline","vertex":"cat"}
{"level":"info","ts":"2024-11-05T07:07:29.204152058Z","logger":"numaflow.MapUDF-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"simple-pipeline","vertex":"cat"}
2024/11/05 07:07:29 failed c.grpcClt.MapFn stream.Recv: NonRetryable: error processing requests: rpc error: code = Internal desc = panic inside map handler: Invalid map operation
{"level":"error","ts":"2024-11-05T07:07:29.214500974Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:629","msg":"mapUDF.Apply error","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: error processing requests: rpc error: code = Internal desc = panic inside map handler: Invalid map operation","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).applyUDF\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/forward/forward.go:629\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/forward/forward.go:273\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/forward/forward.go:141"}
{"level":"error","ts":"2024-11-05T07:07:29.214536683Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:275","msg":"failed to applyUDF","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: error processing requests: rpc error: code = Internal desc = panic inside map handler: Invalid map operation","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).forwardAChunk\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/forward/forward.go:275\ngithub.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/forward/forward.go:141"}
{"level":"error","ts":"2024-11-05T07:07:29.215329183Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:142","msg":"Failed to forward a chunk","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","error":"gRPC client.MapFn failed, NonRetryable: error processing requests: rpc error: code = Internal desc = panic inside map handler: Invalid map operation","stacktrace":"github.com/numaproj/numaflow/pkg/udf/forward.(*InterStepDataForward).Start.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/forward/forward.go:142"}
{"level":"info","ts":"2024-11-05T07:07:29.215351474Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:155","msg":"Closed buffer reader","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","bufferFrom":"default-simple-pipeline-cat-0"}
{"level":"info","ts":"2024-11-05T07:07:29.215356683Z","logger":"numaflow.MapUDF-processor","caller":"forward/forward.go:162","msg":"Closed partition writer","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","bufferTo":"default-simple-pipeline-out-0"}
{"level":"error","ts":"2024-11-05T07:07:29.215352849Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:305","msg":"Map forwarder stopped with error","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf","fromPartition":"default-simple-pipeline-cat-0","error":"gRPC client.MapFn failed, NonRetryable: error processing requests: rpc error: code = Internal desc = panic inside map handler: Invalid map operation","stacktrace":"github.com/numaproj/numaflow/pkg/udf.(*MapUDFProcessor).Start.func2\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/udf/map_udf.go:305"}
{"level":"info","ts":"2024-11-05T07:07:29.215479183Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"simple-pipeline","vertex":"cat","kvName":"default-simple-pipeline-in-cat_OT","watcher":"default-simple-pipeline-in-cat_OT"}
{"level":"info","ts":"2024-11-05T07:07:29.215466558Z","logger":"numaflow.MapUDF-processor","caller":"publish/publisher.go:286","msg":"Closing watermark publisher","pipeline":"simple-pipeline","vertex":"cat","entityID":"simple-pipeline-cat-0","otStore":"default-simple-pipeline-cat-out_OT","hbStore":"default-simple-pipeline-cat-out_PROCESSORS"}
{"level":"info","ts":"2024-11-05T07:07:29.215537099Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:166","msg":"stopping WatchAll","pipeline":"simple-pipeline","vertex":"cat","kvName":"default-simple-pipeline-in-cat_PROCESSORS","watcher":"default-simple-pipeline-in-cat_PROCESSORS"}
{"level":"error","ts":"2024-11-05T07:07:29.215631683Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:170","msg":"Failed to stop","pipeline":"simple-pipeline","vertex":"cat","kvName":"default-simple-pipeline-in-cat_PROCESSORS","watcher":"default-simple-pipeline-in-cat_PROCESSORS","error":"nats: invalid subscription","stacktrace":"github.com/numaproj/numaflow/pkg/shared/kvs/jetstream.(*jetStreamStore).Watch.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/shared/kvs/jetstream/kv_store.go:170"}
{"level":"info","ts":"2024-11-05T07:07:29.216648849Z","logger":"numaflow.MapUDF-processor","caller":"jetstream/kv_store.go:172","msg":"WatchAll successfully stopped","pipeline":"simple-pipeline","vertex":"cat","kvName":"default-simple-pipeline-in-cat_OT","watcher":"default-simple-pipeline-in-cat_OT"}
{"level":"info","ts":"2024-11-05T07:07:29.218063308Z","logger":"numaflow.MapUDF-processor","caller":"udf/map_udf.go:363","msg":"All udf data processors exited...","pipeline":"simple-pipeline","vertex":"cat","protocol":"uds-grpc-map-udf"}

Source

{"level":"info","ts":"2024-11-05T15:08:54.948115542Z","logger":"numaflow.Source-processor","caller":"jetstream/kv_store.go:199","msg":"Watcher initialization and subscription got nil value","pipeline":"simple-pipeline","vertex":"in","kvName":"default-simple-pipeline-in_SOURCE_PROCESSORS"}
2024/11/05 15:08:54 ServerInfo: &{uds go 1.3.1-z v0.8.1 map[]}
{"level":"info","ts":"2024-11-05T15:08:54.951542376Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:236","msg":"Generating self-signed certificate","pipeline":"simple-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-05T15:08:54.951920501Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:276","msg":"Not enabling pprof debug endpoints","pipeline":"simple-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-05T15:08:54.951939667Z","logger":"numaflow.Source-processor","caller":"sources/source.go:305","msg":"Start processing source messages","pipeline":"simple-pipeline","vertex":"in","isbs":"jetstream","to":["default-simple-pipeline-cat-0"]}
{"level":"info","ts":"2024-11-05T15:08:54.951967876Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:289","msg":"Starting metrics HTTPS server","pipeline":"simple-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-05T15:08:54.951977917Z","logger":"numaflow","caller":"forward/data_forward.go:129","msg":"Starting forwarder..."}
{"level":"warn","ts":"2024-11-05T15:08:54.952330001Z","logger":"numaflow","caller":"forward/data_forward.go:203","msg":"failed to read from source","error":"failed to read messages from udsource: failed to receive read response: rpc error: code = Unknown desc = panic inside source handler: Source not valid"}
{"level":"info","ts":"2024-11-05T15:08:54.952369376Z","logger":"numaflow.Source-processor","caller":"udsource/user_defined_source.go:120","msg":"Shutting down user-defined source...","pipeline":"simple-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-05T15:08:54.952395292Z","logger":"numaflow","caller":"forward/data_forward.go:162","msg":"Closed source reader","sourceFrom":"in"}
{"level":"info","ts":"2024-11-05T15:08:54.952404959Z","logger":"numaflow","caller":"forward/data_forward.go:172","msg":"Closed partition writer","bufferTo":"default-simple-pipeline-cat-0"}
{"level":"error","ts":"2024-11-05T15:08:54.952411709Z","logger":"numaflow.Source-processor","caller":"sources/source.go:317","msg":"Source forwarder stopped with error","pipeline":"simple-pipeline","vertex":"in","error":"failed to read messages from udsource: failed to receive read response: rpc error: code = Unknown desc = panic inside source handler: Source not valid","stacktrace":"github.com/numaproj/numaflow/pkg/sources.(*SourceProcessor).Start\n\t/Users/yhl01/Documents/numaproj/numaflow/pkg/sources/source.go:317\ngithub.com/numaproj/numaflow/cmd/commands.NewProcessorCommand.func1\n\t/Users/yhl01/Documents/numaproj/numaflow/cmd/commands/processor.go:86\ngithub.com/spf13/cobra.(*Command).execute\n\t/Users/yhl01/go/pkg/mod/github.com/spf13/[email protected]/command.go:985\ngithub.com/spf13/cobra.(*Command).ExecuteC\n\t/Users/yhl01/go/pkg/mod/github.com/spf13/[email protected]/command.go:1117\ngithub.com/spf13/cobra.(*Command).Execute\n\t/Users/yhl01/go/pkg/mod/github.com/spf13/[email protected]/command.go:1041\ngithub.com/numaproj/numaflow/cmd/commands.Execute\n\t/Users/yhl01/Documents/numaproj/numaflow/cmd/commands/root.go:32\nmain.main\n\t/Users/yhl01/Documents/numaproj/numaflow/cmd/main.go:24\nruntime.main\n\t/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/proc.go:272"}
{"level":"info","ts":"2024-11-05T15:08:54.952440667Z","logger":"numaflow.Source-processor","caller":"sources/source.go:335","msg":"Exited...","pipeline":"simple-pipeline","vertex":"in"}
{"level":"info","ts":"2024-11-05T15:08:54.952462001Z","logger":"numaflow.Source-processor","caller":"metrics/metrics_server.go:293","msg":"Metrics server shutdown","pipeline":"simple-pipeline","vertex":"in"}
{"level":"warn","ts":"2024-11-05T15:08:54.952471292Z","logger":"numaflow.Source-processor","caller":"sources/source.go:221","msg":"Failed to close gRPC client conn","pipeline":"simple-pipeline","vertex":"in","error":"rpc error: code = Canceled desc = grpc: the client connection is closing"}

Server side

Sink

User Defined Sink: {"Data":{"value":1730780477445230311},"Createdts":1730780477445230311}
2024/11/05 06:26:51 panic inside sink handler: Sink is not valid goroutine 35 [running]:
runtime/debug.Stack()
	/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/debug/stack.go:26 +0x64
github.com/numaproj/numaflow-go/pkg/sinker.(*Service).processData.func1()
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sinker/service.go:181 +0x40
panic({0x419040?, 0x5846d8?})
	/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:785 +0x124
main.(*logSink).Sink(0x0?, {0x0?, 0x0?}, 0x0?)
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sinker/examples/log/main.go:21 +0xec
github.com/numaproj/numaflow-go/pkg/sinker.(*Service).processData(0x0?, {0x58a980, 0x40001a2550}, {0x58da18, 0x40001920e0}, 0x0?)
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sinker/service.go:185 +0x74
github.com/numaproj/numaflow-go/pkg/sinker.(*Service).SinkFn.func2()
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sinker/service.go:93 +0x34
golang.org/x/sync/errgroup.(*Group).Go.func1()
	/Users/yhl01/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:78 +0x54
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 33
	/Users/yhl01/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75 +0x98
2024/11/05 06:26:51 Stopping the SinkFn with err, panic inside sink handler: Sink is not valid
2024/11/05 06:26:51 shutdown signal received
2024/11/05 06:26:51 gracefully stopping grpc server
2024/11/05 06:26:51 grpc server stopped

Map

2024/11/05 07:08:49 panic inside map handler: Invalid map operation goroutine 129 [running]:
runtime/debug.Stack()
	/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/debug/stack.go:26 +0x64
github.com/numaproj/numaflow-go/pkg/mapper.(*Service).handleRequest.func1()
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/mapper/service.go:137 +0x40
panic({0x418ea0?, 0x5844b0?})
	/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:785 +0x124
main.(*Forward).Map(0x0?, {0x0?, 0x0?}, {0x0?, 0x0?, 0x0?}, {0x58a858, 0x40002f2960})
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/mapper/examples/forward_message/main.go:21 +0x50
github.com/numaproj/numaflow-go/pkg/mapper.(*Service).handleRequest(0x40000ac750, {0x58a740, 0x400007cb90}, 0x40002f24b0, 0x4000228770)
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/mapper/service.go:144 +0x1fc
github.com/numaproj/numaflow-go/pkg/mapper.(*Service).MapFn.func2()
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/mapper/service.go:94 +0x30
golang.org/x/sync/errgroup.(*Group).Go.func1()
	/Users/yhl01/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:78 +0x54
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 9
	/Users/yhl01/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75 +0x98
2024/11/05 07:08:49 Stopping the MapFn with err, rpc error: code = Internal desc = panic inside map handler: Invalid map operation
2024/11/05 07:08:49 shutdown signal received
2024/11/05 07:08:49 gracefully stopping grpc server
2024/11/05 07:08:49 grpc server stopped

Source

2024/11/05 15:09:26 panic inside source handler: Source not valid goroutine 11 [running]:
runtime/debug.Stack()
	/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/debug/stack.go:26 +0x64
github.com/numaproj/numaflow-go/pkg/sourcer.(*Service).receiveReadRequests.func1.1()
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sourcer/service.go:122 +0x48
panic({0x4194c0?, 0x5874f0?})
	/Users/yhl01/go/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:785 +0x124
simple_source/impl.(*SimpleSource).Read(0x0?, {0x0?, 0x0?}, {0x58b698?, 0x400009b0f0?}, 0x0?)
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sourcer/examples/simple_source/impl/simple_source.go:48 +0x6c
github.com/numaproj/numaflow-go/pkg/sourcer.(*Service).receiveReadRequests.func1()
	/Users/yhl01/Documents/numaproj/numaflow-go/pkg/sourcer/service.go:132 +0xf4
golang.org/x/sync/errgroup.(*Group).Go.func1()
	/Users/yhl01/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:78 +0x54
created by golang.org/x/sync/errgroup.(*Group).Go in goroutine 9
	/Users/yhl01/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75 +0x98
2024/11/05 15:09:26 error processing requests: panic inside source handler: Source not valid
2024/11/05 15:09:26 shutdown signal received
2024/11/05 15:09:26 gracefully stopping grpc server
2024/11/05 15:09:26 error receiving from ack stream: context canceled
2024/11/05 15:09:26 grpc server stopped

numaproj/numaflow-go#165 should be merged first.

@yhl25 yhl25 requested review from whynowy and vigith as code owners November 5, 2024 06:32
@yhl25 yhl25 marked this pull request as draft November 5, 2024 06:32
@yhl25 yhl25 requested a review from KeranYang November 5, 2024 06:32
Copy link

codecov bot commented Nov 5, 2024

Codecov Report

Attention: Patch coverage is 30.99415% with 118 lines in your changes missing coverage. Please review.

Project coverage is 64.03%. Comparing base (9c1d3ce) to head (017cde3).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
pkg/sources/udsource/user_defined_source.go 0.00% 23 Missing ⚠️
pkg/sources/errors/errors.go 0.00% 18 Missing ⚠️
pkg/sources/forward/data_forward.go 51.42% 17 Missing ⚠️
pkg/sources/source.go 0.00% 15 Missing ⚠️
pkg/sinks/sink.go 0.00% 12 Missing ⚠️
pkg/udf/map_udf.go 0.00% 12 Missing ⚠️
pkg/sinks/forward/forward.go 58.33% 6 Missing and 4 partials ⚠️
pkg/udf/forward/forward.go 72.41% 7 Missing and 1 partial ⚠️
pkg/sinks/udsink/udsink_grpc.go 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2204      +/-   ##
==========================================
+ Coverage   63.93%   64.03%   +0.09%     
==========================================
  Files         334      335       +1     
  Lines       40676    40719      +43     
==========================================
+ Hits        26006    26073      +67     
+ Misses      13609    13595      -14     
+ Partials     1061     1051      -10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@whynowy whynowy requested a review from kohlisid November 5, 2024 07:06
@@ -266,7 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("failed to write to sink", zap.Error(err))
df.fromBufferPartition.NoAck(ctx, readOffsets)
return
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we want to restart the container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non retryable error, so we will have to restart.

// TODO(Retry-Sink): Check for ctx done separately? That should be covered in shutdown
if ok, _ := df.IsShuttingDown(); err != nil && ok {

if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any logic change for the operation (handlePostRetryFailures) below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change is required for that function

log := logging.FromContext(df.ctx)
stopped := make(chan struct{})
stopped := make(chan error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce another channel dedicated for error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a offline discussion, we decided to go with a blocking call implementation for start which returns an error similar to http server. We will make this change after 1.4.

log.Infow("Exited for partition...", zap.String("fromPartition", fromBufferPartitionName))
log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName))
case err := <-stopped: // critical error case
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this trigger a successful container restart? do we need to trigger a crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will trigger a successful container restart

Signed-off-by: Yashash H L <[email protected]>
@KeranYang
Copy link
Member

@yhl25 please merge numaflow-go PR first to fix the e2es before merging this one.

@yhl25 yhl25 marked this pull request as ready for review November 5, 2024 17:32
@vigith
Copy link
Member

vigith commented Nov 6, 2024

I reran the entire suite again and it is failing with

Watching POD: flatmap-stream-java-udsink-0-bu3ey
    map_test.go:117: Expected vertex ["java-udsink"] pod log to contain ["hello"] but didn't.
    panic.go:629: Deleting Pipeline flatmap-stream

I am not sure why the previous run was successful, @KeranYang do you think the java SDK PR will solve this?

@KeranYang KeranYang merged commit 9140f79 into main Nov 6, 2024
28 checks passed
@KeranYang KeranYang deleted the shutdown branch November 6, 2024 20:02
SaniyaKalamkar pushed a commit to SaniyaKalamkar/numaflow that referenced this pull request Jan 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants