Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into protof
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jan 18, 2024
2 parents 392cb7d + 5424b35 commit c30dfc0
Show file tree
Hide file tree
Showing 125 changed files with 4,357 additions and 348 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ Source Transformer, Functions, Sinks or SideInputs in Golang.

- Implement [User Defined Sources](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sourcer)
- Implement [User Defined Source Transformers](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sourcetransformer)
- Implement [User Defined Functions](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/function)
- Implement [User Defined Sinks](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sink)
- Implement User Defined Functions
- [Map](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/mapper)
- [Reduce](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/reducer)
- Implement [User Defined Sinks](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sinker)
- Implement [User Defined SideInputs](https://pkg.go.dev/github.com/numaproj/numaflow-go/pkg/sideinput)

## Development
Expand All @@ -15,4 +17,4 @@ Source Transformer, Functions, Sinks or SideInputs in Golang.

`make test` - Run the tests.
`make proto`- Regenerate the protobuf files from the [proto files](https://github.com/numaproj/numaflow/tree/main/pkg/apis/proto) defined in [numaproj/numaflow](https://github.com/numaproj/numaflow) repository.
`make proto ORG=xxx PROJECT=xxx BRANCH=xxx` - Regenerate the protobuf files from specified github repository.
`make proto ORG=xxx PROJECT=xxx BRANCH=xxx` - Regenerate the protobuf files from specified github repository. Default values: `ORG=numaproj PROJECT=numaflow BRANCH=main`
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.8.1
go.uber.org/atomic v1.11.0
golang.org/x/net v0.9.0
golang.org/x/sync v0.1.0
google.golang.org/grpc v1.57.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down
81 changes: 81 additions & 0 deletions pkg/apis/proto/reduce/v1/reduce.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
syntax = "proto3";

option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";


package reduce.v1;

service Reduce {
// ReduceFn applies a reduce function to a request stream.
rpc ReduceFn(stream ReduceRequest) returns (stream ReduceResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

/**
* ReduceRequest represents a request element.
*/
message ReduceRequest {
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
bool EOF = 3;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
3 changes: 3 additions & 0 deletions pkg/apis/proto/sessionreduce/v1/mockgen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package v1

//go:generate mockgen -destination sessionreducemock/sessionreducemock.go -package sessionreducemock github.com/numaproj/numaflow-go/pkg/apis/proto/sessionreduce/v1 SessionReduceClient,SessionReduce_SessionReduceFnClient
84 changes: 84 additions & 0 deletions pkg/apis/proto/sessionreduce/v1/sessionreduce.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
syntax = "proto3";

option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/reducestream/v1";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";


package sessionreduce.v1;

service SessionReduce {
// SessionReduceFn applies a reduce function to a request stream.
rpc SessionReduceFn(stream SessionReduceRequest) returns (stream SessionReduceResponse);

// IsReady is the heartbeat endpoint for gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

// KeyedWindow represents a window with keys.
// since the client track the keys, we use keyed window.
message KeyedWindow {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
repeated string keys = 4;
}

/**
* SessionReduceRequest represents a request element.
*/
message SessionReduceRequest {
// WindowOperation represents a window operation.
// For Aligned window values can be one of OPEN, CLOSE, EXPAND, MERGE and APPEND.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
EXPAND = 2;
MERGE = 3;
APPEND = 4;
}

Event event = 1;
repeated KeyedWindow keyedWindows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
}

Payload payload = 1;
WindowOperation operation = 2;
}

/**
* SessionReduceResponse represents a response element.
*/
message SessionReduceResponse {
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}

Result result = 1;

// keyedWindow represents a window to which the result belongs.
KeyedWindow keyedWindow = 2;

// EOF represents the end of the response for a window.
bool EOF = 3;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
Loading

0 comments on commit c30dfc0

Please sign in to comment.