Skip to content

Commit

Permalink
feat: implement batch map (#133)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
Co-authored-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Jul 11, 2024
1 parent ff520e4 commit 15e4521
Show file tree
Hide file tree
Showing 24 changed files with 1,820 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
"pkg/sinker/examples/fallback", "pkg/sideinput/examples/map_sideinput", "pkg/sideinput/examples/reduce_sideinput",
"pkg/sideinput/examples/sideinput_function", "pkg/sideinput/examples/simple_source_with_sideinput",
"pkg/sideinput/examples/sink_sideinput", "pkg/sinker/examples/redis-sink", "pkg/sideinput/examples/map_sideinput/udf",
"pkg/sideinput/examples/reduce_sideinput/udf"
"pkg/sideinput/examples/reduce_sideinput/udf", "pkg/batchmapper/examples/batchmap-flatmap"
]

steps:
Expand Down
456 changes: 456 additions & 0 deletions pkg/apis/proto/batchmap/v1/batchmap.pb.go

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions pkg/apis/proto/batchmap/v1/batchmap.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
syntax = "proto3";

option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package batchmap.v1;

service BatchMap {
// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);

// BatchMapFn is a bi-directional streaming rpc which applies a
// map function on each BatchMapRequest element of the stream and then streams
// back BatchMapResponse elements.
rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse);
}

/**
* BatchMapRequest represents a request element.
*/
message BatchMapRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// This ID is used uniquely identify a map request
string id = 6;
}

/**
* BatchMapResponse represents a response element.
*/
message BatchMapResponse {
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;
// This ID is used to refer the responses to the request it corresponds to.
string id = 2;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
183 changes: 183 additions & 0 deletions pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 15e4521

Please sign in to comment.