Skip to content

feat: add udstore and response-serve #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ setup:
proto:
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sinker -I=pynumaflow/proto/sinker --python_out=pynumaflow/proto/sinker --grpc_python_out=pynumaflow/proto/sinker pynumaflow/proto/sinker/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/mapper -I=pynumaflow/proto/mapper --python_out=pynumaflow/proto/mapper --grpc_python_out=pynumaflow/proto/mapper pynumaflow/proto/mapper/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/mapstreamer -I=pynumaflow/proto/mapstreamer --python_out=pynumaflow/proto/mapstreamer --grpc_python_out=pynumaflow/proto/mapstreamer pynumaflow/proto/mapstreamer/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/reducer -I=pynumaflow/proto/reducer --python_out=pynumaflow/proto/reducer --grpc_python_out=pynumaflow/proto/reducer pynumaflow/proto/reducer/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcetransformer -I=pynumaflow/proto/sourcetransformer --python_out=pynumaflow/proto/sourcetransformer --grpc_python_out=pynumaflow/proto/sourcetransformer pynumaflow/proto/sourcetransformer/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sideinput -I=pynumaflow/proto/sideinput --python_out=pynumaflow/proto/sideinput --grpc_python_out=pynumaflow/proto/sideinput pynumaflow/proto/sideinput/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/sourcer -I=pynumaflow/proto/sourcer --python_out=pynumaflow/proto/sourcer --grpc_python_out=pynumaflow/proto/sourcer pynumaflow/proto/sourcer/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/batchmapper -I=pynumaflow/proto/batchmapper --python_out=pynumaflow/proto/batchmapper --grpc_python_out=pynumaflow/proto/batchmapper pynumaflow/proto/batchmapper/*.proto
python3 -m grpc_tools.protoc --pyi_out=pynumaflow/proto/serving -I=pynumaflow/proto/serving --python_out=pynumaflow/proto/serving --grpc_python_out=pynumaflow/proto/serving pynumaflow/proto/serving/*.proto


sed -i '' 's/^\(import.*_pb2\)/from . \1/' pynumaflow/proto/*/*.py
55 changes: 55 additions & 0 deletions examples/servingstore/in_memory/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

FROM python:3.10-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.2.2 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup"

ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/servingstore/in_memory"
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init \
&& curl -sSL https://install.python-poetry.org | python3 -

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./ ./

WORKDIR $EXAMPLE_PATH
RUN poetry lock
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]

EXPOSE 5000
23 changes: 23 additions & 0 deletions examples/servingstore/in_memory/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/serving-store-example:${TAG}
DOCKER_FILE_PATH = examples/servingstore/in_memory/Dockerfile


.PHONY: update
update:
poetry update -vv

.PHONY: image-push
image-push: update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi
3 changes: 3 additions & 0 deletions examples/servingstore/in_memory/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Serving Store Example

An example that demonstrates how to write a `serving store` in python.
4 changes: 4 additions & 0 deletions examples/servingstore/in_memory/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
37 changes: 37 additions & 0 deletions examples/servingstore/in_memory/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pynumaflow.servingstore import (
ServingStorer,
PutDatum,
GetDatum,
StoredResult,
ServingStoreServer,
Payload,
)


class InMemoryStore(ServingStorer):
def __init__(self):
self.store = {}

def put(self, datum: PutDatum):
req_id = datum.id
print("Received Put request for ", req_id)
if req_id not in self.store:
self.store[req_id] = []

cur_payloads = self.store[req_id]
for x in datum.payloads:
cur_payloads.append(Payload(x.origin, x.value))
self.store[req_id] = cur_payloads

def get(self, datum: GetDatum) -> StoredResult:
req_id = datum.id
print("Received Get request for ", req_id)
resp = []
if req_id in self.store:
resp = self.store[req_id]
return StoredResult(id_=req_id, payloads=resp)


if __name__ == "__main__":
grpc_server = ServingStoreServer(InMemoryStore())
grpc_server.start()
15 changes: 15 additions & 0 deletions examples/servingstore/in_memory/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "in-memory-servingstore"
version = "0.2.4"
description = ""
authors = ["Numaflow developers"]

[tool.poetry.dependencies]
python = ">=3.10,<3.13"
pynumaflow = { path = "../../../"}

[tool.poetry.dev-dependencies]

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
3 changes: 3 additions & 0 deletions pynumaflow/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
MULTIPROC_MAP_SOCK_ADDR = "/var/run/numaflow/multiproc"
FALLBACK_SINK_SOCK_PATH = "/var/run/numaflow/fb-sink.sock"
BATCH_MAP_SOCK_PATH = "/var/run/numaflow/batchmap.sock"
SERVING_STORE_SOCK_PATH = "/var/run/numaflow/serving.sock"

# Server information file configs
MAP_SERVER_INFO_FILE_PATH = "/var/run/numaflow/mapper-server-info"
Expand All @@ -34,6 +35,7 @@
SIDE_INPUT_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sideinput-server-info"
SOURCE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/sourcer-server-info"
FALLBACK_SINK_SERVER_INFO_FILE_PATH = "/var/run/numaflow/fb-sinker-server-info"
SERVING_STORE_SERVER_INFO_FILE_PATH = "/var/run/numaflow/serving-server-info"

ENV_UD_CONTAINER_TYPE = "NUMAFLOW_UD_CONTAINER_TYPE"
UD_CONTAINER_FALLBACK_SINK = "fb-udsink"
Expand Down Expand Up @@ -70,3 +72,4 @@ class UDFType(str, Enum):
Source = "source"
SideInput = "sideinput"
SourceTransformer = "sourcetransformer"
ServingStore = "servingstore"
2 changes: 2 additions & 0 deletions pynumaflow/info/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ContainerType(str, Enum):
Sessionreducer = "sessionreducer"
Sideinput = "sideinput"
Fbsinker = "fb-sinker"
Serving = "serving"


# Minimum version of Numaflow required by the current SDK version
Expand All @@ -86,6 +87,7 @@ class ContainerType(str, Enum):
ContainerType.Sessionreducer: "1.4.0-z",
ContainerType.Sideinput: "1.4.0-z",
ContainerType.Fbsinker: "1.4.0-z",
ContainerType.Serving: "1.5.0-z",
}


Expand Down
Empty file.
74 changes: 74 additions & 0 deletions pynumaflow/proto/serving/store.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

package serving.v1;

// ServingStore defines a set of methods to interface with a user-defined Store.
service ServingStore {
// Put is to put the PutRequest into the Store.
rpc Put(PutRequest) returns (PutResponse);

// Get gets the GetRequest from the Store.
rpc Get(GetRequest) returns (GetResponse);

// IsReady checks the health of the container interfacing the Store.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

// Payload that represent the output that is to be written into to the store.
message Payload {
// Origin is the Vertex that generated this result.
string origin = 1;
// Value is the result of the computation.
bytes value = 2;
}

// PutRequest is the request sent to the Store.
message PutRequest {
// ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated
// uuid.
string id = 1;
// Payloads are one or more results generated (could be more than one due to flat-map).
repeated Payload payloads = 2;
}

// PutResponse is the result of the Put call.
message PutResponse {
bool success = 1;
}

// GetRequest is the call to get the result stored in the Store.
message GetRequest {
// ID is the unique id as provided by the user in the original request. If not provided, it will be a system generated
// uuid.
string id = 1;
}

// GetResponse is the result stored in the Store.
message GetResponse {
string id = 1;
// Payloads are one or more results generated (could be more than one due to flat-map).
repeated Payload payloads = 2;
}

/**
* ReadyResponse is the health check result.
*/
message ReadyResponse {
bool ready = 1;
}
43 changes: 43 additions & 0 deletions pynumaflow/proto/serving/store_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions pynumaflow/proto/serving/store_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from google.protobuf import empty_pb2 as _empty_pb2
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from google.protobuf.internal import containers as _containers
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import (
ClassVar as _ClassVar,
Iterable as _Iterable,
Mapping as _Mapping,
Optional as _Optional,
Union as _Union,
)

DESCRIPTOR: _descriptor.FileDescriptor

class Payload(_message.Message):
__slots__ = ("origin", "value")
ORIGIN_FIELD_NUMBER: _ClassVar[int]
VALUE_FIELD_NUMBER: _ClassVar[int]
origin: str
value: bytes
def __init__(self, origin: _Optional[str] = ..., value: _Optional[bytes] = ...) -> None: ...

class PutRequest(_message.Message):
__slots__ = ("id", "payloads")
ID_FIELD_NUMBER: _ClassVar[int]
PAYLOADS_FIELD_NUMBER: _ClassVar[int]
id: str
payloads: _containers.RepeatedCompositeFieldContainer[Payload]
def __init__(
self,
id: _Optional[str] = ...,
payloads: _Optional[_Iterable[_Union[Payload, _Mapping]]] = ...,
) -> None: ...

class PutResponse(_message.Message):
__slots__ = ("success",)
SUCCESS_FIELD_NUMBER: _ClassVar[int]
success: bool
def __init__(self, success: bool = ...) -> None: ...

class GetRequest(_message.Message):
__slots__ = ("id",)
ID_FIELD_NUMBER: _ClassVar[int]
id: str
def __init__(self, id: _Optional[str] = ...) -> None: ...

class GetResponse(_message.Message):
__slots__ = ("id", "payloads")
ID_FIELD_NUMBER: _ClassVar[int]
PAYLOADS_FIELD_NUMBER: _ClassVar[int]
id: str
payloads: _containers.RepeatedCompositeFieldContainer[Payload]
def __init__(
self,
id: _Optional[str] = ...,
payloads: _Optional[_Iterable[_Union[Payload, _Mapping]]] = ...,
) -> None: ...

class ReadyResponse(_message.Message):
__slots__ = ("ready",)
READY_FIELD_NUMBER: _ClassVar[int]
ready: bool
def __init__(self, ready: bool = ...) -> None: ...
Loading
Loading