diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index bef77e4d..752e20c1 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index 3986e644..0ee687b7 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce.pb.go b/pkg/apis/proto/reduce/v1/reduce.pb.go index b089f2d3..2563e299 100644 --- a/pkg/apis/proto/reduce/v1/reduce.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go index 3edcc1f0..21d2a3d0 100644 --- a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/serving/v1/store.pb.go b/pkg/apis/proto/serving/v1/store.pb.go index eb5a0c89..fcfe0431 100644 --- a/pkg/apis/proto/serving/v1/store.pb.go +++ b/pkg/apis/proto/serving/v1/store.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/serving/v1/store.proto package v1 @@ -43,9 +43,8 @@ type Payload struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated - // uuid. - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // Origin is the Vertex that generated this result. + Origin string `protobuf:"bytes,1,opt,name=origin,proto3" json:"origin,omitempty"` // Value is the result of the computation. Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` } @@ -82,9 +81,9 @@ func (*Payload) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP(), []int{0} } -func (x *Payload) GetId() string { +func (x *Payload) GetOrigin() string { if x != nil { - return x.Id + return x.Origin } return "" } @@ -102,8 +101,9 @@ type PutRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Origin is the Vertex that generated this result. - Origin string `protobuf:"bytes,1,opt,name=origin,proto3" json:"origin,omitempty"` + // ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated + // uuid. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Payloads are one or more results generated (could be more than one due to flat-map). Payloads []*Payload `protobuf:"bytes,2,rep,name=payloads,proto3" json:"payloads,omitempty"` } @@ -140,9 +140,9 @@ func (*PutRequest) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP(), []int{1} } -func (x *PutRequest) GetOrigin() string { +func (x *PutRequest) GetId() string { if x != nil { - return x.Origin + return x.Id } return "" } @@ -252,20 +252,19 @@ func (x *GetRequest) GetId() string { return "" } -// OriginalPayload is one of the result generated by the Compute Graph of Numaflow. -type OriginalPayload struct { +// GetResponse is the result stored in the Store. +type GetResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Origin is the Vertex that generated this result. - Origin string `protobuf:"bytes,1,opt,name=origin,proto3" json:"origin,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Payloads are one or more results generated (could be more than one due to flat-map). Payloads []*Payload `protobuf:"bytes,2,rep,name=payloads,proto3" json:"payloads,omitempty"` } -func (x *OriginalPayload) Reset() { - *x = OriginalPayload{} +func (x *GetResponse) Reset() { + *x = GetResponse{} if protoimpl.UnsafeEnabled { mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -273,13 +272,13 @@ func (x *OriginalPayload) Reset() { } } -func (x *OriginalPayload) String() string { +func (x *GetResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*OriginalPayload) ProtoMessage() {} +func (*GetResponse) ProtoMessage() {} -func (x *OriginalPayload) ProtoReflect() protoreflect.Message { +func (x *GetResponse) ProtoReflect() protoreflect.Message { mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -291,68 +290,19 @@ func (x *OriginalPayload) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use OriginalPayload.ProtoReflect.Descriptor instead. -func (*OriginalPayload) Descriptor() ([]byte, []int) { +// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. +func (*GetResponse) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP(), []int{4} } -func (x *OriginalPayload) GetOrigin() string { +func (x *GetResponse) GetId() string { if x != nil { - return x.Origin + return x.Id } return "" } -func (x *OriginalPayload) GetPayloads() []*Payload { - if x != nil { - return x.Payloads - } - return nil -} - -// GetResponse is the result stored in the Store. -type GetResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // OriginalPayload are one or more results generated (could be more than one due to flat-map). - Payloads []*OriginalPayload `protobuf:"bytes,1,rep,name=payloads,proto3" json:"payloads,omitempty"` -} - -func (x *GetResponse) Reset() { - *x = GetResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *GetResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*GetResponse) ProtoMessage() {} - -func (x *GetResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. -func (*GetResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP(), []int{5} -} - -func (x *GetResponse) GetPayloads() []*OriginalPayload { +func (x *GetResponse) GetPayloads() []*Payload { if x != nil { return x.Payloads } @@ -372,7 +322,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[6] + mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -385,7 +335,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[6] + mi := &file_pkg_apis_proto_serving_v1_store_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -398,7 +348,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP(), []int{6} + return file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP(), []int{5} } func (x *ReadyResponse) GetReady() bool { @@ -418,12 +368,12 @@ var file_pkg_apis_proto_serving_v1_store_proto_rawDesc = []byte{ 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0x2f, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x22, 0x55, 0x0a, 0x0a, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x12, 0x2f, 0x0a, 0x08, 0x70, 0x61, 0x79, 0x6c, + 0x6f, 0x22, 0x37, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x16, 0x0a, 0x06, + 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x72, + 0x69, 0x67, 0x69, 0x6e, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x4d, 0x0a, 0x0a, 0x50, 0x75, + 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2f, 0x0a, 0x08, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x08, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x22, 0x27, 0x0a, 0x0b, 0x50, 0x75, 0x74, @@ -431,35 +381,30 @@ var file_pkg_apis_proto_serving_v1_store_proto_rawDesc = []byte{ 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x1c, 0x0a, 0x0a, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x22, 0x5a, 0x0a, 0x0f, 0x4f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, - 0x6f, 0x61, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x12, 0x2f, 0x0a, 0x08, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x13, 0x2e, - 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, - 0x61, 0x64, 0x52, 0x08, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x22, 0x46, 0x0a, 0x0b, - 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x37, 0x0a, 0x08, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1b, 0x2e, - 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x72, 0x69, 0x67, 0x69, - 0x6e, 0x61, 0x6c, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x08, 0x70, 0x61, 0x79, 0x6c, - 0x6f, 0x61, 0x64, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xbc, 0x01, 0x0a, 0x0c, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x36, 0x0a, 0x03, - 0x50, 0x75, 0x74, 0x12, 0x16, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, - 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, - 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x07, - 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, - 0x19, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, - 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, - 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, - 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x4e, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, + 0x2f, 0x0a, 0x08, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x13, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x08, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, + 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xbc, 0x01, 0x0a, 0x0c, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x6e, 0x67, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, + 0x16, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, + 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x16, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, + 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x17, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x19, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x6e, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3b, 0x5a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, + 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, + 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x6e, 0x67, + 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -474,32 +419,30 @@ func file_pkg_apis_proto_serving_v1_store_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_serving_v1_store_proto_rawDescData } -var file_pkg_apis_proto_serving_v1_store_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_pkg_apis_proto_serving_v1_store_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_pkg_apis_proto_serving_v1_store_proto_goTypes = []any{ - (*Payload)(nil), // 0: serving.v1.Payload - (*PutRequest)(nil), // 1: serving.v1.PutRequest - (*PutResponse)(nil), // 2: serving.v1.PutResponse - (*GetRequest)(nil), // 3: serving.v1.GetRequest - (*OriginalPayload)(nil), // 4: serving.v1.OriginalPayload - (*GetResponse)(nil), // 5: serving.v1.GetResponse - (*ReadyResponse)(nil), // 6: serving.v1.ReadyResponse - (*emptypb.Empty)(nil), // 7: google.protobuf.Empty + (*Payload)(nil), // 0: serving.v1.Payload + (*PutRequest)(nil), // 1: serving.v1.PutRequest + (*PutResponse)(nil), // 2: serving.v1.PutResponse + (*GetRequest)(nil), // 3: serving.v1.GetRequest + (*GetResponse)(nil), // 4: serving.v1.GetResponse + (*ReadyResponse)(nil), // 5: serving.v1.ReadyResponse + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty } var file_pkg_apis_proto_serving_v1_store_proto_depIdxs = []int32{ 0, // 0: serving.v1.PutRequest.payloads:type_name -> serving.v1.Payload - 0, // 1: serving.v1.OriginalPayload.payloads:type_name -> serving.v1.Payload - 4, // 2: serving.v1.GetResponse.payloads:type_name -> serving.v1.OriginalPayload - 1, // 3: serving.v1.ServingStore.Put:input_type -> serving.v1.PutRequest - 3, // 4: serving.v1.ServingStore.Get:input_type -> serving.v1.GetRequest - 7, // 5: serving.v1.ServingStore.IsReady:input_type -> google.protobuf.Empty - 2, // 6: serving.v1.ServingStore.Put:output_type -> serving.v1.PutResponse - 5, // 7: serving.v1.ServingStore.Get:output_type -> serving.v1.GetResponse - 6, // 8: serving.v1.ServingStore.IsReady:output_type -> serving.v1.ReadyResponse - 6, // [6:9] is the sub-list for method output_type - 3, // [3:6] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 0, // 1: serving.v1.GetResponse.payloads:type_name -> serving.v1.Payload + 1, // 2: serving.v1.ServingStore.Put:input_type -> serving.v1.PutRequest + 3, // 3: serving.v1.ServingStore.Get:input_type -> serving.v1.GetRequest + 6, // 4: serving.v1.ServingStore.IsReady:input_type -> google.protobuf.Empty + 2, // 5: serving.v1.ServingStore.Put:output_type -> serving.v1.PutResponse + 4, // 6: serving.v1.ServingStore.Get:output_type -> serving.v1.GetResponse + 5, // 7: serving.v1.ServingStore.IsReady:output_type -> serving.v1.ReadyResponse + 5, // [5:8] is the sub-list for method output_type + 2, // [2:5] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_pkg_apis_proto_serving_v1_store_proto_init() } @@ -557,18 +500,6 @@ func file_pkg_apis_proto_serving_v1_store_proto_init() { } } file_pkg_apis_proto_serving_v1_store_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*OriginalPayload); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_pkg_apis_proto_serving_v1_store_proto_msgTypes[5].Exporter = func(v any, i int) any { switch v := v.(*GetResponse); i { case 0: return &v.state @@ -580,7 +511,7 @@ func file_pkg_apis_proto_serving_v1_store_proto_init() { return nil } } - file_pkg_apis_proto_serving_v1_store_proto_msgTypes[6].Exporter = func(v any, i int) any { + file_pkg_apis_proto_serving_v1_store_proto_msgTypes[5].Exporter = func(v any, i int) any { switch v := v.(*ReadyResponse); i { case 0: return &v.state @@ -599,7 +530,7 @@ func file_pkg_apis_proto_serving_v1_store_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_serving_v1_store_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/serving/v1/store.proto b/pkg/apis/proto/serving/v1/store.proto index 3a303ca8..f64a5041 100644 --- a/pkg/apis/proto/serving/v1/store.proto +++ b/pkg/apis/proto/serving/v1/store.proto @@ -37,17 +37,17 @@ service ServingStore { // Payload that represent the output that is to be written into to the store. message Payload { - // ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated - // uuid. - string id = 1; + // Origin is the Vertex that generated this result. + string origin = 1; // Value is the result of the computation. bytes value = 2; } // PutRequest is the request sent to the Store. message PutRequest { - // Origin is the Vertex that generated this result. - string origin = 1; + // ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated + // uuid. + string id = 1; // Payloads are one or more results generated (could be more than one due to flat-map). repeated Payload payloads = 2; } @@ -64,18 +64,11 @@ message GetRequest { string id = 1; } -// OriginalPayload is one of the result generated by the Compute Graph of Numaflow. -message OriginalPayload { - // Origin is the Vertex that generated this result. - string origin = 1; - // Payloads are one or more results generated (could be more than one due to flat-map). - repeated Payload payloads = 2; -} - // GetResponse is the result stored in the Store. message GetResponse { - // OriginalPayload are one or more results generated (could be more than one due to flat-map). - repeated OriginalPayload payloads = 1; + string id = 1; + // Payloads are one or more results generated (could be more than one due to flat-map). + repeated Payload payloads = 2; } /** diff --git a/pkg/apis/proto/serving/v1/store_grpc.pb.go b/pkg/apis/proto/serving/v1/store_grpc.pb.go index bd5804cd..b5a353e0 100644 --- a/pkg/apis/proto/serving/v1/store_grpc.pb.go +++ b/pkg/apis/proto/serving/v1/store_grpc.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/serving/v1/store.proto package v1 diff --git a/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go b/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go index 8db8d639..57b171b1 100644 --- a/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go +++ b/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/sessionreduce/v1/sessionreduce.proto package v1 diff --git a/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go b/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go index bc287f78..95b3d4e9 100644 --- a/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go +++ b/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/sessionreduce/v1/sessionreduce.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput.pb.go b/pkg/apis/proto/sideinput/v1/sideinput.pb.go index 8f860c5f..b58bab15 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go index 0449c726..c80e57ed 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index 86b62584..ea2b3bb8 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink_grpc.pb.go b/pkg/apis/proto/sink/v1/sink_grpc.pb.go index 142ed414..7a975673 100644 --- a/pkg/apis/proto/sink/v1/sink_grpc.pb.go +++ b/pkg/apis/proto/sink/v1/sink_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index f5cd3e2e..c69b85fe 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/source/v1/source.proto package v1 diff --git a/pkg/apis/proto/source/v1/source_grpc.pb.go b/pkg/apis/proto/source/v1/source_grpc.pb.go index 7b6fde20..69818db4 100644 --- a/pkg/apis/proto/source/v1/source_grpc.pb.go +++ b/pkg/apis/proto/source/v1/source_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/source/v1/source.proto package v1 diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index 6717c751..e851979e 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.34.2 -// protoc v5.29.1 +// protoc v4.25.1 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 diff --git a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go index 653d9baf..52995c34 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.4.0 -// - protoc v5.29.1 +// - protoc v4.25.1 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 diff --git a/pkg/batchmapper/examples/batchmap_flatmap/main.go b/pkg/batchmapper/examples/batchmap_flatmap/main.go index 9d232055..a5b7a47d 100644 --- a/pkg/batchmapper/examples/batchmap_flatmap/main.go +++ b/pkg/batchmapper/examples/batchmap_flatmap/main.go @@ -14,7 +14,7 @@ func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper. msg := d.Value() _ = d.EventTime() // Event time is available _ = d.Watermark() // Watermark is available - batchResponse := batchmapper.NewBatchResponse(d.Id()) + batchResponse := batchmapper.NewBatchResponse(d.ID()) strs := strings.Split(string(msg), ",") for _, s := range strs { batchResponse = batchResponse.Append(batchmapper.NewMessage([]byte(s))) diff --git a/pkg/batchmapper/interface.go b/pkg/batchmapper/interface.go index a667f232..f1817668 100644 --- a/pkg/batchmapper/interface.go +++ b/pkg/batchmapper/interface.go @@ -16,7 +16,7 @@ type Datum interface { // Headers returns the headers of the message. Headers() map[string]string // Id returns the unique ID set for the given message - Id() string + ID() string // Keys returns the keys associated with a given datum Keys() []string } diff --git a/pkg/batchmapper/message.go b/pkg/batchmapper/message.go index 8d7b74ee..da379459 100644 --- a/pkg/batchmapper/message.go +++ b/pkg/batchmapper/message.go @@ -61,7 +61,7 @@ type batchResponse struct { } // Id returns request ID for the given list of responses -func (m batchResponse) Id() string { +func (m batchResponse) ID() string { return m.id } diff --git a/pkg/batchmapper/server_test.go b/pkg/batchmapper/server_test.go index e51ec42c..ff10adbd 100644 --- a/pkg/batchmapper/server_test.go +++ b/pkg/batchmapper/server_test.go @@ -23,7 +23,7 @@ func TestBatchMapServer_Start(t *testing.T) { var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { batchResponses := BatchResponsesBuilder() for d := range datums { - results := NewBatchResponse(d.Id()) + results := NewBatchResponse(d.ID()) results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) batchResponses.Append(results) } diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index dbdf2a3b..2e68fc5c 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -186,7 +186,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer } singleRequestResp := &mappb.MapResponse{ Results: elements, - Id: batchResp.Id(), + Id: batchResp.ID(), } select { case <-ctx.Done(): diff --git a/pkg/batchmapper/service_test.go b/pkg/batchmapper/service_test.go index 5184882c..42b5cc0a 100644 --- a/pkg/batchmapper/service_test.go +++ b/pkg/batchmapper/service_test.go @@ -101,7 +101,7 @@ func TestService_BatchMapFn(t *testing.T) { handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { batchResponses := BatchResponsesBuilder() for d := range datums { - results := NewBatchResponse(d.Id()) + results := NewBatchResponse(d.ID()) results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) batchResponses = batchResponses.Append(results) } @@ -175,7 +175,7 @@ func TestService_BatchMapFn(t *testing.T) { handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { batchResponses := BatchResponsesBuilder() for d := range datums { - results := NewBatchResponse(d.Id()) + results := NewBatchResponse(d.ID()) results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) batchResponses = batchResponses.Append(results) } diff --git a/pkg/batchmapper/types.go b/pkg/batchmapper/types.go index 1facd290..2fc93eac 100644 --- a/pkg/batchmapper/types.go +++ b/pkg/batchmapper/types.go @@ -39,7 +39,7 @@ func (h *handlerDatum) Headers() map[string]string { return h.headers } -func (h *handlerDatum) Id() string { +func (h *handlerDatum) ID() string { return h.id } diff --git a/pkg/servingstore/examples/memory_store/Dockerfile b/pkg/servingstore/examples/memory_store/Dockerfile new file mode 100644 index 00000000..1f6d62e7 --- /dev/null +++ b/pkg/servingstore/examples/memory_store/Dockerfile @@ -0,0 +1,20 @@ +#################################################################################################### +# base +#################################################################################################### +FROM alpine:3.20 AS base +ARG TARGETARCH +RUN apk update && apk upgrade && \ + apk add ca-certificates && \ + apk --no-cache add tzdata + +COPY dist/serving-inmem-store-${TARGETARCH} /bin/serving-inmem-store +RUN chmod +x /bin/serving-inmem-store + +#################################################################################################### +# flatmap +#################################################################################################### +FROM scratch AS memory_store +COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo +COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=base /bin/serving-inmem-store /bin/serving-inmem-store +ENTRYPOINT [ "/bin/serving-inmem-store" ] diff --git a/pkg/servingstore/examples/memory_store/Makefile b/pkg/servingstore/examples/memory_store/Makefile new file mode 100644 index 00000000..22465057 --- /dev/null +++ b/pkg/servingstore/examples/memory_store/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/serving-inmem-store:${TAG} +ARCHITECTURES = amd64 arm64 + +.PHONY: build +build: + for arch in $(ARCHITECTURES); do \ + CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serving-inmem-store-$${arch} main.go; \ + done + +.PHONY: image-push +image-push: build + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target memory_store . --push + +.PHONY: image +image: build + docker build -t ${IMAGE_REGISTRY} --target memory_store . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi + +clean: + -rm -rf ./dist diff --git a/pkg/servingstore/examples/memory_store/README.md b/pkg/servingstore/examples/memory_store/README.md new file mode 100644 index 00000000..b5702dec --- /dev/null +++ b/pkg/servingstore/examples/memory_store/README.md @@ -0,0 +1,3 @@ +# In-Memory Store + +This example demonstrates how to use the in-memory store to store and retrieve data. diff --git a/pkg/servingstore/examples/memory_store/go.mod b/pkg/servingstore/examples/memory_store/go.mod new file mode 100644 index 00000000..e5e15f3a --- /dev/null +++ b/pkg/servingstore/examples/memory_store/go.mod @@ -0,0 +1,18 @@ +module memory_store + +go 1.22 + +toolchain go1.23.1 + +replace github.com/numaproj/numaflow-go => ../../../.. + +require github.com/numaproj/numaflow-go v0.9.0 + +require ( + golang.org/x/net v0.29.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/grpc v1.66.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/pkg/servingstore/examples/memory_store/go.sum b/pkg/servingstore/examples/memory_store/go.sum new file mode 100644 index 00000000..09e06a2c --- /dev/null +++ b/pkg/servingstore/examples/memory_store/go.sum @@ -0,0 +1,22 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/servingstore/examples/memory_store/main.go b/pkg/servingstore/examples/memory_store/main.go new file mode 100644 index 00000000..55d3ab17 --- /dev/null +++ b/pkg/servingstore/examples/memory_store/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "context" + "log" + + "github.com/numaproj/numaflow-go/pkg/servingstore" +) + +type InMemoryStore struct { + store map[string][]servingstore.Payload +} + +func (i *InMemoryStore) Put(ctx context.Context, putDatum servingstore.PutDatum) { + id := putDatum.ID() + log.Printf("Received Put request for %s", id) + if _, ok := i.store[id]; !ok { + i.store[id] = make([]servingstore.Payload, 0) + } + for _, payload := range putDatum.Payloads() { + i.store[id] = append(i.store[id], servingstore.NewPayload(payload.Origin(), payload.Value())) + } +} + +func (i *InMemoryStore) Get(ctx context.Context, getDatum servingstore.GetDatum) servingstore.StoredResult { + id := getDatum.ID() + log.Printf("Received Get request for %s", id) + if data, ok := i.store[id]; ok { + return servingstore.NewStoredResult(id, data) + } else { + return servingstore.NewStoredResult(id, nil) + } +} + +func NewInMemoryStore() *InMemoryStore { + return &InMemoryStore{ + store: make(map[string][]servingstore.Payload), + } +} + +func main() { + err := servingstore.NewServer(NewInMemoryStore()).Start(context.Background()) + if err != nil { + log.Panic("Failed to serving store function server: ", err) + } +} diff --git a/pkg/servingstore/examples/memory_store/memory_store b/pkg/servingstore/examples/memory_store/memory_store new file mode 100755 index 00000000..421bac9c Binary files /dev/null and b/pkg/servingstore/examples/memory_store/memory_store differ diff --git a/pkg/servingstore/interface.go b/pkg/servingstore/interface.go index 2e45bfd2..1cfae537 100644 --- a/pkg/servingstore/interface.go +++ b/pkg/servingstore/interface.go @@ -10,34 +10,34 @@ type ServingStorer interface { Put(ctx context.Context, put PutDatum) // Get is to retrieve data from the Serving Store. - Get(ctx context.Context, get GetDatum) StoredResults + Get(ctx context.Context, get GetDatum) StoredResult } // PutDatum interface exposes methods to retrieve data from the Put rpc. type PutDatum interface { - Origin() string - Payloads() [][]byte + ID() string + Payloads() []Payload } // PutRequest contains the details to store the payload to the Store. type PutRequest struct { - origin string - payloads [][]byte + id string + payloads []Payload } -// Origin returns the origin name. -func (p *PutRequest) Origin() string { - return p.origin +// ID returns the id of the original request. +func (p *PutRequest) ID() string { + return p.id } // Payloads returns the payloads to be stored. -func (p *PutRequest) Payloads() [][]byte { +func (p *PutRequest) Payloads() []Payload { return p.payloads } // GetDatum is the interface to expose methods to retrieve from the Get rpc. type GetDatum interface { - Id() string + ID() string } // GetRequest has details on the Get rpc. @@ -45,7 +45,7 @@ type GetRequest struct { id string } -// Id is the unique ID original request which is used get the data stored in the Store. -func (g *GetRequest) Id() string { +// ID is the unique ID original request which is used get the data stored in the Store. +func (g *GetRequest) ID() string { return g.id } diff --git a/pkg/servingstore/message.go b/pkg/servingstore/message.go index 7a345193..99117865 100644 --- a/pkg/servingstore/message.go +++ b/pkg/servingstore/message.go @@ -2,40 +2,32 @@ package servingstore // StoredResult is the data stored in the store per origin. type StoredResult struct { - origin string + id string payloads []Payload } // NewStoredResult creates a new StoreResult from the provided origin and payloads. -func NewStoredResult(origin string, payloads []Payload) StoredResult { - return StoredResult{origin: origin, payloads: payloads} +func NewStoredResult(id string, payloads []Payload) StoredResult { + return StoredResult{id: id, payloads: payloads} } // Payload is each independent result stored in the Store for the given ID. type Payload struct { - value []byte + origin string + value []byte } // NewPayload creates a new Payload from the given value. -func NewPayload(value []byte) Payload { - return Payload{value: value} +func NewPayload(origin string, value []byte) Payload { + return Payload{origin: origin, value: value} } -// StoredResults contains 0, 1, or more StoredResult. -type StoredResults []StoredResult - -// StoredResultsBuilder returns an empty instance of StoredResults -func StoredResultsBuilder() StoredResults { - return StoredResults{} -} - -// Append appends a StoredResult -func (r StoredResults) Append(msg StoredResult) StoredResults { - r = append(r, msg) - return r +// Value returns the value of the Payload. +func (p *Payload) Value() []byte { + return p.value } -// Items returns the StoredResults list -func (r StoredResults) Items() []StoredResult { - return r +// Origin returns the origin name. +func (p *Payload) Origin() string { + return p.origin } diff --git a/pkg/servingstore/service.go b/pkg/servingstore/service.go index e18ec30b..817b96d4 100644 --- a/pkg/servingstore/service.go +++ b/pkg/servingstore/service.go @@ -48,13 +48,12 @@ func (s *Service) Put(ctx context.Context, request *servingpb.PutRequest) (*serv // handle panic defer func() { err = handlePanic() }() - var payloads = make([][]byte, 0, len(request.Payloads)) + var payloads = make([]Payload, 0, len(request.Payloads)) for _, payload := range request.Payloads { - payloads = append(payloads, payload.Value) + payloads = append(payloads, Payload{origin: payload.Origin, value: payload.Value}) } - s.ServingStore.Put(ctx, &PutRequest{origin: request.Origin, payloads: payloads}) - + s.ServingStore.Put(ctx, &PutRequest{id: request.Id, payloads: payloads}) return &servingpb.PutResponse{Success: true}, err } @@ -64,23 +63,14 @@ func (s *Service) Get(ctx context.Context, request *servingpb.GetRequest) (*serv // handle panic defer func() { err = handlePanic() }() - storedResults := s.ServingStore.Get(ctx, &GetRequest{id: request.Id}) - - items := storedResults.Items() - var payloads = make([]*servingpb.OriginalPayload, 0, len(items)) + storedResult := s.ServingStore.Get(ctx, &GetRequest{id: request.Id}) - for _, storedResult := range items { - var p = make([]*servingpb.Payload, 0) - for _, payload := range storedResult.payloads { - p = append(p, &servingpb.Payload{Id: request.GetId(), Value: payload.value}) - } - payloads = append(payloads, &servingpb.OriginalPayload{ - Origin: storedResult.origin, - Payloads: p, - }) + var payloads = make([]*servingpb.Payload, 0) + for _, payload := range storedResult.payloads { + payloads = append(payloads, &servingpb.Payload{Origin: payload.origin, Value: payload.value}) } - return &servingpb.GetResponse{Payloads: payloads}, err + return &servingpb.GetResponse{Id: request.GetId(), Payloads: payloads}, err } // IsReady is used to indicate that the server is ready.