Skip to content

feat: add async source transformer #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions examples/sourcetransform/async_event_time_filter/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

FROM python:3.10-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
POETRY_VERSION=1.2.2 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup"

ENV EXAMPLE_PATH="$PYSETUP_PATH/examples/sourcetransform/async_event_time_filter"
ENV VENV_PATH="$EXAMPLE_PATH/.venv"
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init \
&& curl -sSL https://install.python-poetry.org | python3 -

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./ ./

WORKDIR $EXAMPLE_PATH
RUN poetry lock
RUN poetry install --no-cache --no-root && \
rm -rf ~/.cache/pypoetry/

RUN chmod +x entry.sh

ENTRYPOINT ["/dumb-init", "--"]
CMD ["sh", "-c", "$EXAMPLE_PATH/entry.sh"]

EXPOSE 5000
22 changes: 22 additions & 0 deletions examples/sourcetransform/async_event_time_filter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-python/async-mapt-event-time-filter:${TAG}
DOCKER_FILE_PATH = examples/sourcetransform/async_event_time_filter/Dockerfile

.PHONY: update
update:
poetry update -vv

.PHONY: image-push
image-push: update
cd ../../../ && docker buildx build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} \
--platform linux/amd64,linux/arm64 . --push

.PHONY: image
image: update
cd ../../../ && docker build \
-f ${DOCKER_FILE_PATH} \
-t ${IMAGE_REGISTRY} .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}:${TAG}; fi
4 changes: 4 additions & 0 deletions examples/sourcetransform/async_event_time_filter/entry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
set -eux

python example.py
48 changes: 48 additions & 0 deletions examples/sourcetransform/async_event_time_filter/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import datetime
import logging

from pynumaflow.sourcetransformer import Messages, Message, Datum
from pynumaflow.sourcetransformer import SourceTransformAsyncServer

"""
This is a simple User Defined Function example which receives a message, applies the following
data transformation, and returns the message.
If the message event time is before year 2022, drop the message with event time unchanged.
If it's within year 2022, update the tag to "within_year_2022" and
update the message event time to Jan 1st 2022.
Otherwise, (exclusively after year 2022), update the tag to "after_year_2022" and update the
message event time to Jan 1st 2023.
"""

january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


async def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
event_time = datum.event_time
messages = Messages()

if event_time < january_first_2022:
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
messages.append(Message.to_drop(event_time))
elif event_time < january_first_2023:
logging.info(
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
event_time,
)
messages.append(
Message(value=val, event_time=january_first_2022, tags=["within_year_2022"])
)
else:
logging.info(
"Got event time:%s, it is after year 2022, so forwarding to after_year_2022", event_time
)
messages.append(Message(value=val, event_time=january_first_2023, tags=["after_year_2022"]))

return messages


if __name__ == "__main__":
grpc_server = SourceTransformAsyncServer(my_handler)
grpc_server.start()
15 changes: 15 additions & 0 deletions examples/sourcetransform/async_event_time_filter/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[tool.poetry]
name = "async-mapt-event-time-filter"
version = "0.2.4"
description = ""
authors = ["Numaflow developers"]
readme = "README.md"
packages = [{include = "mapt_event_time_filter"}]

[tool.poetry.dependencies]
python = ">=3.9, <3.12"
pynumaflow = { path = "../../../"}

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
2 changes: 2 additions & 0 deletions pynumaflow/sourcetransformer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
from pynumaflow.sourcetransformer.multiproc_server import SourceTransformMultiProcServer
from pynumaflow.sourcetransformer.server import SourceTransformServer
from pynumaflow.sourcetransformer.async_server import SourceTransformAsyncServer

__all__ = [
"Message",
Expand All @@ -16,4 +17,5 @@
"SourceTransformServer",
"SourceTransformer",
"SourceTransformMultiProcServer",
"SourceTransformAsyncServer",
]
7 changes: 7 additions & 0 deletions pynumaflow/sourcetransformer/_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from datetime import datetime
from typing import TypeVar, Callable, Union, Optional
from collections.abc import Awaitable
from warnings import warn

from pynumaflow._constants import DROP
Expand Down Expand Up @@ -210,3 +211,9 @@ def handler(self, keys: list[str], datum: Datum) -> Messages:
# SourceTransformCallable is the type of the handler function for the
# Source Transformer UDFunction.
SourceTransformCallable = Union[SourceTransformHandler, SourceTransformer]


# SourceTransformAsyncCallable is a callable which can be used as a handler
# for the Asynchronous Transformer UDF
SourceTransformHandlerAsyncHandlerCallable = Callable[[list[str], Datum], Awaitable[Messages]]
SourceTransformAsyncCallable = Union[SourceTransformer, SourceTransformHandlerAsyncHandlerCallable]
157 changes: 157 additions & 0 deletions pynumaflow/sourcetransformer/async_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import aiorun
import grpc

from pynumaflow._constants import (
NUM_THREADS_DEFAULT,
MAX_MESSAGE_SIZE,
MAX_NUM_THREADS,
SOURCE_TRANSFORMER_SOCK_PATH,
SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
)
from pynumaflow.info.types import (
ServerInfo,
MINIMUM_NUMAFLOW_VERSION,
ContainerType,
)
from pynumaflow.proto.sourcetransformer import transform_pb2_grpc
from pynumaflow.shared.server import (
NumaflowServer,
start_async_server,
)
from pynumaflow.sourcetransformer._dtypes import SourceTransformAsyncCallable
from pynumaflow.sourcetransformer.servicer._async_servicer import SourceTransformAsyncServicer


class SourceTransformAsyncServer(NumaflowServer):
"""
Create a new grpc Source Transformer Server instance.
A new servicer instance is created and attached to the server.
The server instance is returned.
Args:
source_transform_instance: The source transformer instance to be used for
Source Transformer UDF
sock_path: The UNIX socket path to be used for the server
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16

Example Invocation:

import datetime
import logging

from pynumaflow.sourcetransformer import Messages, Message, Datum, SourceTransformServer
# This is a simple User Defined Function example which receives a message,
# applies the following
# data transformation, and returns the message.
# If the message event time is before year 2022, drop the message with event time unchanged.
# If it's within year 2022, update the tag to "within_year_2022" and
# update the message event time to Jan 1st 2022.
# Otherwise, (exclusively after year 2022), update the tag to
# "after_year_2022" and update the
# message event time to Jan 1st 2023.

january_first_2022 = datetime.datetime.fromtimestamp(1640995200)
january_first_2023 = datetime.datetime.fromtimestamp(1672531200)


async def my_handler(keys: list[str], datum: Datum) -> Messages:
val = datum.value
event_time = datum.event_time
messages = Messages()

if event_time < january_first_2022:
logging.info("Got event time:%s, it is before 2022, so dropping", event_time)
messages.append(Message.to_drop(event_time))
elif event_time < january_first_2023:
logging.info(
"Got event time:%s, it is within year 2022, so forwarding to within_year_2022",
event_time,
)
messages.append(
Message(value=val, event_time=january_first_2022,
tags=["within_year_2022"])
)
else:
logging.info(
"Got event time:%s, it is after year 2022, so forwarding to
after_year_2022", event_time
)
messages.append(Message(value=val, event_time=january_first_2023,
tags=["after_year_2022"]))

return messages


if __name__ == "__main__":
grpc_server = SourceTransformAsyncServer(my_handler)
grpc_server.start()
"""

def __init__(
self,
source_transform_instance: SourceTransformAsyncCallable,
sock_path=SOURCE_TRANSFORMER_SOCK_PATH,
max_message_size=MAX_MESSAGE_SIZE,
max_threads=NUM_THREADS_DEFAULT,
server_info_file=SOURCE_TRANSFORMER_SERVER_INFO_FILE_PATH,
):
"""
Create a new grpc Asynchronous Map Server instance.
A new servicer instance is created and attached to the server.
The server instance is returned.
Args:
mapper_instance: The mapper instance to be used for Map UDF
sock_path: The UNIX socket path to be used for the server
max_message_size: The max message size in bytes the server can receive and send
max_threads: The max number of threads to be spawned;
defaults to 4 and max capped at 16
"""
self.sock_path = f"unix://{sock_path}"
self.max_threads = min(max_threads, MAX_NUM_THREADS)
self.max_message_size = max_message_size
self.server_info_file = server_info_file

self.source_transform_instance = source_transform_instance

self._server_options = [
("grpc.max_send_message_length", self.max_message_size),
("grpc.max_receive_message_length", self.max_message_size),
]
self.servicer = SourceTransformAsyncServicer(handler=source_transform_instance)

def start(self) -> None:
"""
Starter function for the Async server class, need a separate caller
so that all the async coroutines can be started from a single context
"""
aiorun.run(self.aexec(), use_uvloop=True)

Check warning on line 128 in pynumaflow/sourcetransformer/async_server.py

View check run for this annotation

Codecov / codecov/patch

pynumaflow/sourcetransformer/async_server.py#L128

Added line #L128 was not covered by tests

async def aexec(self) -> None:
"""
Starts the Async gRPC server on the given UNIX socket with
given max threads.
"""

# As the server is async, we need to create a new server instance in the
# same thread as the event loop so that all the async calls are made in the
# same context

server_new = grpc.aio.server(options=self._server_options)
server_new.add_insecure_port(self.sock_path)
transform_pb2_grpc.add_SourceTransformServicer_to_server(self.servicer, server_new)

serv_info = ServerInfo.get_default_server_info()
serv_info.minimum_numaflow_version = MINIMUM_NUMAFLOW_VERSION[
ContainerType.Sourcetransformer
]

# Start the async server
await start_async_server(
server_async=server_new,
sock_path=self.sock_path,
max_threads=self.max_threads,
cleanup_coroutines=list(),
server_info_file=self.server_info_file,
server_info=serv_info,
)
Loading