diff --git a/go.mod b/go.mod index c028f373..e8188c3a 100644 --- a/go.mod +++ b/go.mod @@ -23,3 +23,5 @@ require ( google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +retract v0.4.1 diff --git a/pkg/apis/proto/sink/v1/udsink.pb.go b/pkg/apis/proto/sink/v1/udsink.pb.go index 06f3f009..8d56942b 100644 --- a/pkg/apis/proto/sink/v1/udsink.pb.go +++ b/pkg/apis/proto/sink/v1/udsink.pb.go @@ -130,7 +130,7 @@ type Datum struct { Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` EventTime *EventTime `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` Watermark *Watermark `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Metadata *Metadata `protobuf:"bytes,5,opt,name=metadata,proto3" json:"metadata,omitempty"` + Id string `protobuf:"bytes,5,opt,name=id,proto3" json:"id,omitempty"` } func (x *Datum) Reset() { @@ -193,70 +193,13 @@ func (x *Datum) GetWatermark() *Watermark { return nil } -func (x *Datum) GetMetadata() *Metadata { - if x != nil { - return x.Metadata - } - return nil -} - -// * -// Metadata of a datum element. -type Metadata struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - NumDelivered uint64 `protobuf:"varint,2,opt,name=num_delivered,json=numDelivered,proto3" json:"num_delivered,omitempty"` -} - -func (x *Metadata) Reset() { - *x = Metadata{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Metadata) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Metadata) ProtoMessage() {} - -func (x *Metadata) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Metadata.ProtoReflect.Descriptor instead. -func (*Metadata) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{3} -} - -func (x *Metadata) GetId() string { +func (x *Datum) GetId() string { if x != nil { return x.Id } return "" } -func (x *Metadata) GetNumDelivered() uint64 { - if x != nil { - return x.NumDelivered - } - return 0 -} - // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -270,7 +213,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -283,7 +226,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -296,7 +239,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{4} + return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{3} } func (x *ReadyResponse) GetReady() bool { @@ -324,7 +267,7 @@ type Response struct { func (x *Response) Reset() { *x = Response{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[5] + mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -337,7 +280,7 @@ func (x *Response) String() string { func (*Response) ProtoMessage() {} func (x *Response) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[5] + mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -350,7 +293,7 @@ func (x *Response) ProtoReflect() protoreflect.Message { // Deprecated: Use Response.ProtoReflect.Descriptor instead. func (*Response) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{5} + return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{4} } func (x *Response) GetId() string { @@ -388,7 +331,7 @@ type ResponseList struct { func (x *ResponseList) Reset() { *x = ResponseList{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[6] + mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -401,7 +344,7 @@ func (x *ResponseList) String() string { func (*ResponseList) ProtoMessage() {} func (x *ResponseList) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[6] + mi := &file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -414,7 +357,7 @@ func (x *ResponseList) ProtoReflect() protoreflect.Message { // Deprecated: Use ResponseList.ProtoReflect.Descriptor instead. func (*ResponseList) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{6} + return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP(), []int{5} } func (x *ResponseList) GetResponses() []*Response { @@ -442,7 +385,7 @@ var file_pkg_apis_proto_sink_v1_udsink_proto_rawDesc = []byte{ 0x6b, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0xc5, 0x01, 0x0a, 0x05, + 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0xa6, 0x01, 0x0a, 0x05, 0x44, 0x61, 0x74, 0x75, 0x6d, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, @@ -452,14 +395,8 @@ var file_pkg_apis_proto_sink_v1_udsink_proto_rawDesc = []byte{ 0x6d, 0x65, 0x12, 0x30, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, - 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x2d, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, - 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x22, 0x3f, 0x0a, 0x08, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, - 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, - 0x23, 0x0a, 0x0d, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x65, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0c, 0x6e, 0x75, 0x6d, 0x44, 0x65, 0x6c, 0x69, 0x76, - 0x65, 0x72, 0x65, 0x64, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, + 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x02, 0x69, 0x64, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0x4d, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, @@ -497,34 +434,32 @@ func file_pkg_apis_proto_sink_v1_udsink_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_sink_v1_udsink_proto_rawDescData } -var file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_pkg_apis_proto_sink_v1_udsink_proto_goTypes = []interface{}{ (*EventTime)(nil), // 0: sink.v1.EventTime (*Watermark)(nil), // 1: sink.v1.Watermark (*Datum)(nil), // 2: sink.v1.Datum - (*Metadata)(nil), // 3: sink.v1.Metadata - (*ReadyResponse)(nil), // 4: sink.v1.ReadyResponse - (*Response)(nil), // 5: sink.v1.Response - (*ResponseList)(nil), // 6: sink.v1.ResponseList - (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 8: google.protobuf.Empty + (*ReadyResponse)(nil), // 3: sink.v1.ReadyResponse + (*Response)(nil), // 4: sink.v1.Response + (*ResponseList)(nil), // 5: sink.v1.ResponseList + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 7: google.protobuf.Empty } var file_pkg_apis_proto_sink_v1_udsink_proto_depIdxs = []int32{ - 7, // 0: sink.v1.EventTime.event_time:type_name -> google.protobuf.Timestamp - 7, // 1: sink.v1.Watermark.watermark:type_name -> google.protobuf.Timestamp + 6, // 0: sink.v1.EventTime.event_time:type_name -> google.protobuf.Timestamp + 6, // 1: sink.v1.Watermark.watermark:type_name -> google.protobuf.Timestamp 0, // 2: sink.v1.Datum.event_time:type_name -> sink.v1.EventTime 1, // 3: sink.v1.Datum.watermark:type_name -> sink.v1.Watermark - 3, // 4: sink.v1.Datum.metadata:type_name -> sink.v1.Metadata - 5, // 5: sink.v1.ResponseList.responses:type_name -> sink.v1.Response - 2, // 6: sink.v1.UserDefinedSink.SinkFn:input_type -> sink.v1.Datum - 8, // 7: sink.v1.UserDefinedSink.IsReady:input_type -> google.protobuf.Empty - 6, // 8: sink.v1.UserDefinedSink.SinkFn:output_type -> sink.v1.ResponseList - 4, // 9: sink.v1.UserDefinedSink.IsReady:output_type -> sink.v1.ReadyResponse - 8, // [8:10] is the sub-list for method output_type - 6, // [6:8] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 4, // 4: sink.v1.ResponseList.responses:type_name -> sink.v1.Response + 2, // 5: sink.v1.UserDefinedSink.SinkFn:input_type -> sink.v1.Datum + 7, // 6: sink.v1.UserDefinedSink.IsReady:input_type -> google.protobuf.Empty + 5, // 7: sink.v1.UserDefinedSink.SinkFn:output_type -> sink.v1.ResponseList + 3, // 8: sink.v1.UserDefinedSink.IsReady:output_type -> sink.v1.ReadyResponse + 7, // [7:9] is the sub-list for method output_type + 5, // [5:7] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name } func init() { file_pkg_apis_proto_sink_v1_udsink_proto_init() } @@ -570,18 +505,6 @@ func file_pkg_apis_proto_sink_v1_udsink_proto_init() { } } file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Metadata); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReadyResponse); i { case 0: return &v.state @@ -593,7 +516,7 @@ func file_pkg_apis_proto_sink_v1_udsink_proto_init() { return nil } } - file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Response); i { case 0: return &v.state @@ -605,7 +528,7 @@ func file_pkg_apis_proto_sink_v1_udsink_proto_init() { return nil } } - file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + file_pkg_apis_proto_sink_v1_udsink_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ResponseList); i { case 0: return &v.state @@ -624,7 +547,7 @@ func file_pkg_apis_proto_sink_v1_udsink_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_sink_v1_udsink_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/sink/v1/udsink.proto b/pkg/apis/proto/sink/v1/udsink.proto index 8ba72b9a..e0bcd054 100644 --- a/pkg/apis/proto/sink/v1/udsink.proto +++ b/pkg/apis/proto/sink/v1/udsink.proto @@ -34,15 +34,7 @@ message Datum { bytes value = 2; EventTime event_time = 3; Watermark watermark = 4; - Metadata metadata = 5; -} - -/** - * Metadata of a datum element. - */ -message Metadata { - string id = 1; - uint64 num_delivered = 2; + string id = 5; } /** diff --git a/pkg/sink/clienttest/client_test.go b/pkg/sink/clienttest/client_test.go index efae1384..0da79e97 100644 --- a/pkg/sink/clienttest/client_test.go +++ b/pkg/sink/clienttest/client_test.go @@ -86,22 +86,16 @@ func TestSinkFn(t *testing.T) { mockClient := sinkmock.NewMockUserDefinedSinkClient(ctrl) testDatumList := []*sinkpb.Datum{ { + Id: "test_id_0", Value: []byte(`sink_message_success`), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "test_id_0", - NumDelivered: 1, - }, }, { + Id: "test_id_1", Value: []byte(`sink_message_err`), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "test_id_1", - NumDelivered: 1, - }, }, } testResponseList := []*sinkpb.Response{ diff --git a/pkg/sink/examples/log/main.go b/pkg/sink/examples/log/main.go index 9e36954f..f8c2ae89 100644 --- a/pkg/sink/examples/log/main.go +++ b/pkg/sink/examples/log/main.go @@ -14,7 +14,7 @@ func handle(_ context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Respo _ = d.EventTime() _ = d.Watermark() fmt.Println("User Defined Sink:", string(d.Value())) - id := d.Metadata().ID() + id := d.ID() result = result.Append(sinksdk.ResponseOK(id)) } return result diff --git a/pkg/sink/interface.go b/pkg/sink/interface.go index b97d554c..ca488fa7 100644 --- a/pkg/sink/interface.go +++ b/pkg/sink/interface.go @@ -13,15 +13,7 @@ type Datum interface { Value() []byte EventTime() time.Time Watermark() time.Time - Metadata() DatumMetadata -} - -// DatumMetadata contains metadata of a datum. -type DatumMetadata interface { - // ID returns the ID of the datum. ID() string - // NumDelivered returns the number of times the datum has been delivered. - NumDelivered() uint64 } // Client contains methods to call a gRPC client. diff --git a/pkg/sink/server/server_test.go b/pkg/sink/server/server_test.go index abb3890d..24498e75 100644 --- a/pkg/sink/server/server_test.go +++ b/pkg/sink/server/server_test.go @@ -35,7 +35,7 @@ func Test_server_sink(t *testing.T) { sinkHandler: sinksdk.SinkFunc(func(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses { result := sinksdk.ResponsesBuilder() for d := range datumStreamCh { - id := d.Metadata().ID() + id := d.ID() if strings.Contains(string(d.Value()), "err") { result = result.Append(sinksdk.ResponseFailure(id, "mock sink message error")) } else { @@ -63,22 +63,16 @@ func Test_server_sink(t *testing.T) { }() testDatumList := []*sinkpb.Datum{ { + Id: "test_id_0", Value: []byte(`sink_message_success`), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "test_id_0", - NumDelivered: 1, - }, }, { + Id: "test_id_1", Value: []byte(`sink_message_err`), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "test_id_1", - NumDelivered: 1, - }, }, } responseList, err := c.SinkFn(ctx, testDatumList) diff --git a/pkg/sink/service.go b/pkg/sink/service.go index e4a0fe96..f0fc5692 100644 --- a/pkg/sink/service.go +++ b/pkg/sink/service.go @@ -12,17 +12,21 @@ import ( // handlerDatum implements the Datum interface and is used in the sink handlers. type handlerDatum struct { + id string keys []string value []byte eventTime time.Time watermark time.Time - metadata handlerDatumMetadata } func (h *handlerDatum) Keys() []string { return h.keys } +func (h *handlerDatum) ID() string { + return h.id +} + func (h *handlerDatum) Value() []byte { return h.value } @@ -35,26 +39,6 @@ func (h *handlerDatum) Watermark() time.Time { return h.watermark } -func (h *handlerDatum) Metadata() DatumMetadata { - return h.metadata -} - -// handlerDatumMetadata implements the DatumMetadata interface and is used in the sink handlers. -type handlerDatumMetadata struct { - id string - numDelivered uint64 -} - -// ID returns the ID of the datum. -func (h handlerDatumMetadata) ID() string { - return h.id -} - -// NumDelivered returns the number of times the datum has been delivered. -func (h handlerDatumMetadata) NumDelivered() uint64 { - return h.numDelivered -} - // Service implements the proto gen server interface and contains the sink operation handler. type Service struct { sinkpb.UnimplementedUserDefinedSinkServer @@ -101,13 +85,10 @@ func (fs *Service) SinkFn(stream sinkpb.UserDefinedSink_SinkFnServer) error { return err } var hd = &handlerDatum{ + id: d.GetId(), value: d.GetValue(), eventTime: d.GetEventTime().EventTime.AsTime(), watermark: d.GetWatermark().Watermark.AsTime(), - metadata: handlerDatumMetadata{ - id: d.GetMetadata().GetId(), - numDelivered: d.GetMetadata().GetNumDelivered(), - }, } datumStreamCh <- hd } diff --git a/pkg/sink/service_test.go b/pkg/sink/service_test.go index 9e2e4d95..582f8b34 100644 --- a/pkg/sink/service_test.go +++ b/pkg/sink/service_test.go @@ -51,40 +51,31 @@ func TestService_SinkFn(t *testing.T) { input: []*sinkpb.Datum{ { + Id: "one-processed", Keys: []string{"sink-test"}, Value: []byte(strconv.Itoa(10)), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Time{})}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "one-processed", - NumDelivered: 1, - }, }, { + Id: "two-processed", Keys: []string{"sink-test"}, Value: []byte(strconv.Itoa(20)), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Time{})}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "two-processed", - NumDelivered: 1, - }, }, { + Id: "three-processed", Keys: []string{"sink-test"}, Value: []byte(strconv.Itoa(30)), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Time{})}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "three-processed", - NumDelivered: 1, - }, }, }, sh: SinkFunc(func(ctx context.Context, rch <-chan Datum) Responses { result := ResponsesBuilder() for d := range rch { - id := d.Metadata().ID() + id := d.ID() result = result.Append(ResponseOK(id)) } return result @@ -112,40 +103,31 @@ func TestService_SinkFn(t *testing.T) { input: []*sinkpb.Datum{ { + Id: "one-processed", Keys: []string{"sink-test-1", "sink-test-2"}, Value: []byte(strconv.Itoa(10)), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Time{})}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "one-processed", - NumDelivered: 1, - }, }, { + Id: "two-processed", Keys: []string{"sink-test-1", "sink-test-2"}, Value: []byte(strconv.Itoa(20)), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Time{})}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "two-processed", - NumDelivered: 1, - }, }, { + Id: "three-processed", Keys: []string{"sink-test-1", "sink-test-2"}, Value: []byte(strconv.Itoa(30)), EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Time{})}, Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})}, - Metadata: &sinkpb.Metadata{ - Id: "three-processed", - NumDelivered: 1, - }, }, }, sh: SinkFunc(func(ctx context.Context, rch <-chan Datum) Responses { result := ResponsesBuilder() for d := range rch { - id := d.Metadata().ID() + id := d.ID() result = result.Append(ResponseFailure(id, "unknown error")) } return result