Skip to content

Commit

Permalink
eot for sink
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Oct 14, 2024
1 parent b7c6e55 commit d393a61
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 125 deletions.
250 changes: 133 additions & 117 deletions pkg/apis/proto/sink/v1/sink.pb.go

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions pkg/apis/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ message SinkRequest {
string id = 5;
map<string, string> headers = 6;
}
message Status {
bool eot = 1;
}
// Required field indicating the request.
Request request = 1;
// Required field indicating the status of the request.
// If eot is set to true, it indicates the end of transmission.
Status status = 2;
TransmissionStatus status = 2;
// optional field indicating the handshake message.
optional Handshake handshake = 3;
}
Expand All @@ -55,6 +52,13 @@ message ReadyResponse {
bool ready = 1;
}

/**
* TransmissionStatus is the status of the transmission.
*/
message TransmissionStatus {
bool eot = 1;
}

/*
* Status is the status of the response.
*/
Expand All @@ -78,4 +82,5 @@ message SinkResponse {
}
Result result = 1;
optional Handshake handshake = 2;
optional TransmissionStatus status = 3;
}
1 change: 1 addition & 0 deletions pkg/mapstreamer/examples/flatmap_stream/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapstreamer/examples/flatmap_stream/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
2 changes: 0 additions & 2 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes
err = fmt.Errorf("panic inside mapStream handler: %v", r)
return
}
// close the message channel after the handler is done processing the request
close(messageCh)
}()
streamReq := req.GetRequest()
hd := NewHandlerDatum(streamReq.GetValue(), streamReq.GetEventTime().AsTime(), streamReq.GetWatermark().AsTime(), streamReq.GetHeaders())
Expand Down
14 changes: 14 additions & 0 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,5 +197,19 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
return err
}
}

select {
case <-ctx.Done():
return ctx.Err()
default:
}
// send the end of transmission message
eotResponse := &sinkpb.SinkResponse{
Status: &sinkpb.TransmissionStatus{Eot: true},
}
if err := stream.Send(eotResponse); err != nil {
log.Printf("error sending end of transmission message: %v", err)
return err
}
return nil
}
10 changes: 8 additions & 2 deletions pkg/sinker/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestService_SinkFn(t *testing.T) {
},
{
Request: &sinkpb.SinkRequest_Request{},
Status: &sinkpb.SinkRequest_Status{Eot: true},
Status: &sinkpb.TransmissionStatus{Eot: true},
},
},
sh: SinkerFunc(func(ctx context.Context, rch <-chan Datum) Responses {
Expand Down Expand Up @@ -129,6 +129,9 @@ func TestService_SinkFn(t *testing.T) {
ErrMsg: "",
},
},
{
Status: &sinkpb.TransmissionStatus{Eot: true},
},
},
},
{
Expand Down Expand Up @@ -171,7 +174,7 @@ func TestService_SinkFn(t *testing.T) {
},
{
Request: &sinkpb.SinkRequest_Request{},
Status: &sinkpb.SinkRequest_Status{Eot: true},
Status: &sinkpb.TransmissionStatus{Eot: true},
},
},
sh: SinkerFunc(func(ctx context.Context, rch <-chan Datum) Responses {
Expand Down Expand Up @@ -210,6 +213,9 @@ func TestService_SinkFn(t *testing.T) {
ErrMsg: "unknown error",
},
},
{
Status: &sinkpb.TransmissionStatus{Eot: true},
},
},
},
}
Expand Down

0 comments on commit d393a61

Please sign in to comment.