Skip to content

Commit

Permalink
revert: metadata for udsink (#46)
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Hao <[email protected]>
  • Loading branch information
xdevxy authored Apr 10, 2023
1 parent d1c512a commit 2ecaa6c
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 198 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ require (
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

retract v0.4.1
147 changes: 35 additions & 112 deletions pkg/apis/proto/sink/v1/udsink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 1 addition & 9 deletions pkg/apis/proto/sink/v1/udsink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,7 @@ message Datum {
bytes value = 2;
EventTime event_time = 3;
Watermark watermark = 4;
Metadata metadata = 5;
}

/**
* Metadata of a datum element.
*/
message Metadata {
string id = 1;
uint64 num_delivered = 2;
string id = 5;
}

/**
Expand Down
10 changes: 2 additions & 8 deletions pkg/sink/clienttest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,16 @@ func TestSinkFn(t *testing.T) {
mockClient := sinkmock.NewMockUserDefinedSinkClient(ctrl)
testDatumList := []*sinkpb.Datum{
{
Id: "test_id_0",
Value: []byte(`sink_message_success`),
EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})},
Metadata: &sinkpb.Metadata{
Id: "test_id_0",
NumDelivered: 1,
},
},
{
Id: "test_id_1",
Value: []byte(`sink_message_err`),
EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})},
Metadata: &sinkpb.Metadata{
Id: "test_id_1",
NumDelivered: 1,
},
},
}
testResponseList := []*sinkpb.Response{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/examples/log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func handle(_ context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Respo
_ = d.EventTime()
_ = d.Watermark()
fmt.Println("User Defined Sink:", string(d.Value()))
id := d.Metadata().ID()
id := d.ID()
result = result.Append(sinksdk.ResponseOK(id))
}
return result
Expand Down
8 changes: 0 additions & 8 deletions pkg/sink/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,7 @@ type Datum interface {
Value() []byte
EventTime() time.Time
Watermark() time.Time
Metadata() DatumMetadata
}

// DatumMetadata contains metadata of a datum.
type DatumMetadata interface {
// ID returns the ID of the datum.
ID() string
// NumDelivered returns the number of times the datum has been delivered.
NumDelivered() uint64
}

// Client contains methods to call a gRPC client.
Expand Down
12 changes: 3 additions & 9 deletions pkg/sink/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Test_server_sink(t *testing.T) {
sinkHandler: sinksdk.SinkFunc(func(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
result := sinksdk.ResponsesBuilder()
for d := range datumStreamCh {
id := d.Metadata().ID()
id := d.ID()
if strings.Contains(string(d.Value()), "err") {
result = result.Append(sinksdk.ResponseFailure(id, "mock sink message error"))
} else {
Expand Down Expand Up @@ -63,22 +63,16 @@ func Test_server_sink(t *testing.T) {
}()
testDatumList := []*sinkpb.Datum{
{
Id: "test_id_0",
Value: []byte(`sink_message_success`),
EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})},
Metadata: &sinkpb.Metadata{
Id: "test_id_0",
NumDelivered: 1,
},
},
{
Id: "test_id_1",
Value: []byte(`sink_message_err`),
EventTime: &sinkpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
Watermark: &sinkpb.Watermark{Watermark: timestamppb.New(time.Time{})},
Metadata: &sinkpb.Metadata{
Id: "test_id_1",
NumDelivered: 1,
},
},
}
responseList, err := c.SinkFn(ctx, testDatumList)
Expand Down
Loading

0 comments on commit 2ecaa6c

Please sign in to comment.