Skip to content

Commit a6322f8

Browse files
authored
chore: formalize gRPC errors in case of UDF failures (#218)
Signed-off-by: veds-g <[email protected]>
1 parent d8e165d commit a6322f8

File tree

23 files changed

+193
-82
lines changed

23 files changed

+193
-82
lines changed

poetry.lock

Lines changed: 70 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pynumaflow/_constants.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@
66

77
SIDE_INPUT_DIR_PATH = "/var/numaflow/side-inputs"
88

9+
# UDF execution error prefixes
10+
ERR_SOURCE_EXCEPTION = "UDF_EXECUTION_ERROR(source)"
11+
ERR_TRANSFORMER_EXCEPTION = "UDF_EXECUTION_ERROR(transformer)"
12+
ERR_SINK_EXCEPTION = "UDF_EXECUTION_ERROR(sink)"
13+
ERR_MAP_STREAM_EXCEPTION = "UDF_EXECUTION_ERROR(mapstream)"
14+
ERR_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(map)"
15+
ERR_BATCH_MAP_EXCEPTION = "UDF_EXECUTION_ERROR(batchmap)"
16+
ERR_REDUCE_EXCEPTION = "UDF_EXECUTION_ERROR(reduce)"
17+
ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION = "UDF_EXECUTION_ERROR(sideinput)"
18+
919
# Socket configs
1020
MAP_SOCK_PATH = "/var/run/numaflow/map.sock"
1121
MAP_STREAM_SOCK_PATH = "/var/run/numaflow/mapstream.sock"

pynumaflow/batchmapper/servicer/async_servicer.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
import asyncio
22
from collections.abc import AsyncIterable
33

4-
import grpc
54
from google.protobuf import empty_pb2 as _empty_pb2
65

76
from pynumaflow.batchmapper import Datum
87
from pynumaflow.batchmapper._dtypes import BatchMapCallable, BatchMapError
98
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
109
from pynumaflow.shared.asynciter import NonBlockingIterator
11-
from pynumaflow.shared.server import exit_on_error
10+
from pynumaflow.shared.server import handle_async_error
1211
from pynumaflow.types import NumaflowServicerContext
13-
from pynumaflow._constants import _LOGGER, STREAM_EOF
12+
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_BATCH_MAP_EXCEPTION
1413

1514

1615
class AsyncBatchMapServicer(map_pb2_grpc.MapServicer):
@@ -99,10 +98,7 @@ async def MapFn(
9998

10099
except BaseException as err:
101100
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
102-
await asyncio.gather(
103-
context.abort(grpc.StatusCode.UNKNOWN, details=repr(err)), return_exceptions=True
104-
)
105-
exit_on_error(context, repr(err))
101+
await handle_async_error(context, err, ERR_BATCH_MAP_EXCEPTION)
106102
return
107103

108104
async def IsReady(

pynumaflow/mapper/_servicer/_async_servicer.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
from google.protobuf import empty_pb2 as _empty_pb2
55
from pynumaflow.shared.asynciter import NonBlockingIterator
66

7-
from pynumaflow._constants import _LOGGER, STREAM_EOF
7+
from pynumaflow._constants import _LOGGER, STREAM_EOF, ERR_MAP_EXCEPTION
88
from pynumaflow.mapper._dtypes import MapAsyncCallable, Datum, MapError
99
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
10-
from pynumaflow.shared.server import exit_on_error, handle_async_error
10+
from pynumaflow.shared.server import handle_async_error
1111
from pynumaflow.types import NumaflowServicerContext
1212

1313

@@ -56,7 +56,7 @@ async def MapFn(
5656
async for msg in consumer:
5757
# If the message is an exception, we raise the exception
5858
if isinstance(msg, BaseException):
59-
await handle_async_error(context, msg)
59+
await handle_async_error(context, msg, ERR_MAP_EXCEPTION)
6060
return
6161
# Send window response back to the client
6262
else:
@@ -65,7 +65,7 @@ async def MapFn(
6565
await producer
6666
except BaseException as e:
6767
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
68-
exit_on_error(context, repr(e))
68+
await handle_async_error(context, e, ERR_MAP_EXCEPTION)
6969
return
7070

7171
async def _process_inputs(
@@ -92,9 +92,8 @@ async def _process_inputs(
9292
# send an EOF to result queue to indicate that all tasks have completed
9393
await result_queue.put(STREAM_EOF)
9494

95-
except BaseException as e:
96-
await result_queue.put(e)
97-
return
95+
except BaseException:
96+
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
9897

9998
async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIterator):
10099
"""
@@ -116,7 +115,7 @@ async def _invoke_map(self, req: map_pb2.MapRequest, result_queue: NonBlockingIt
116115
)
117116
await result_queue.put(map_pb2.MapResponse(results=datums, id=req.id))
118117
except BaseException as err:
119-
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
118+
_LOGGER.critical("MapFn handler error", exc_info=True)
120119
await result_queue.put(err)
121120

122121
async def IsReady(

pynumaflow/mapper/_servicer/_sync_servicer.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from google.protobuf import empty_pb2 as _empty_pb2
66
from pynumaflow.shared.server import exit_on_error
77

8-
from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER
8+
from pynumaflow._constants import NUM_THREADS_DEFAULT, STREAM_EOF, _LOGGER, ERR_MAP_EXCEPTION
99
from pynumaflow.mapper._dtypes import MapSyncCallable, Datum, MapError
1010
from pynumaflow.proto.mapper import map_pb2, map_pb2_grpc
1111
from pynumaflow.shared.synciter import SyncIterator
@@ -57,7 +57,9 @@ def MapFn(
5757
# if error handler accordingly
5858
if isinstance(res, BaseException):
5959
# Terminate the current server process due to exception
60-
exit_on_error(context, repr(res), parent=self.multiproc)
60+
exit_on_error(
61+
context, f"{ERR_MAP_EXCEPTION}: {repr(res)}", parent=self.multiproc
62+
)
6163
return
6264
# return the result
6365
yield res
@@ -69,7 +71,7 @@ def MapFn(
6971
except BaseException as err:
7072
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
7173
# Terminate the current server process due to exception
72-
exit_on_error(context, repr(err), parent=self.multiproc)
74+
exit_on_error(context, f"{ERR_MAP_EXCEPTION}: {repr(err)}", parent=self.multiproc)
7375
return
7476

7577
def _process_requests(
@@ -87,9 +89,8 @@ def _process_requests(
8789
self.executor.shutdown(wait=True)
8890
# Indicate to the result queue that no more messages left to process
8991
result_queue.put(STREAM_EOF)
90-
except BaseException as e:
92+
except BaseException:
9193
_LOGGER.critical("MapFn Error, re-raising the error", exc_info=True)
92-
result_queue.put(e)
9394

9495
def _invoke_map(
9596
self,

pynumaflow/mapstreamer/servicer/async_servicer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
from pynumaflow.mapstreamer import Datum
66
from pynumaflow.mapstreamer._dtypes import MapStreamCallable, MapStreamError
77
from pynumaflow.proto.mapper import map_pb2_grpc, map_pb2
8-
from pynumaflow.shared.server import exit_on_error
8+
from pynumaflow.shared.server import handle_async_error
99
from pynumaflow.types import NumaflowServicerContext
10-
from pynumaflow._constants import _LOGGER
10+
from pynumaflow._constants import _LOGGER, ERR_MAP_STREAM_EXCEPTION
1111

1212

1313
class AsyncMapStreamServicer(map_pb2_grpc.MapServicer):
@@ -59,7 +59,7 @@ async def MapFn(
5959
yield map_pb2.MapResponse(status=map_pb2.TransmissionStatus(eot=True), id=req.id)
6060
except BaseException as err:
6161
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
62-
exit_on_error(context, repr(err))
62+
await handle_async_error(context, err, ERR_MAP_STREAM_EXCEPTION)
6363
return
6464

6565
async def __invoke_map_stream(self, keys: list[str], req: Datum):
@@ -68,7 +68,7 @@ async def __invoke_map_stream(self, keys: list[str], req: Datum):
6868
async for msg in self.__map_stream_handler(keys, req):
6969
yield map_pb2.MapResponse.Result(keys=msg.keys, value=msg.value, tags=msg.tags)
7070
except BaseException as err:
71-
_LOGGER.critical("UDFError, re-raising the error", exc_info=True)
71+
_LOGGER.critical("MapFn handler error", exc_info=True)
7272
raise err
7373

7474
async def IsReady(

pynumaflow/reducer/servicer/async_servicer.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import asyncio
21
from collections.abc import AsyncIterable
32
from typing import Union
43

5-
import grpc
64
from google.protobuf import empty_pb2 as _empty_pb2
75

8-
from pynumaflow._constants import _LOGGER
6+
from pynumaflow._constants import _LOGGER, ERR_REDUCE_EXCEPTION
97
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
108
from pynumaflow.reducer._dtypes import (
119
Datum,
@@ -15,7 +13,7 @@
1513
WindowOperation,
1614
)
1715
from pynumaflow.reducer.servicer.task_manager import TaskManager
18-
from pynumaflow.shared.server import exit_on_error
16+
from pynumaflow.shared.server import handle_async_error
1917
from pynumaflow.types import NumaflowServicerContext
2018

2119

@@ -107,10 +105,7 @@ async def ReduceFn(
107105
_LOGGER.critical("Reduce Error", exc_info=True)
108106
# Send a context abort signal for the rpc, this is required for numa container to get
109107
# the correct grpc error
110-
await asyncio.gather(
111-
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
112-
)
113-
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
108+
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
114109

115110
# send EOF to all the tasks once the request iterator is exhausted
116111
# This will signal the tasks to stop reading the data on their
@@ -141,10 +136,7 @@ async def ReduceFn(
141136
_LOGGER.critical("Reduce Error", exc_info=True)
142137
# Send a context abort signal for the rpc, this is required for numa container to get
143138
# the correct grpc error
144-
await asyncio.gather(
145-
context.abort(grpc.StatusCode.UNKNOWN, details=repr(e)), return_exceptions=True
146-
)
147-
exit_on_error(err=repr(e), parent=False, context=context, update_context=False)
139+
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
148140

149141
async def IsReady(
150142
self, request: _empty_pb2.Empty, context: NumaflowServicerContext

pynumaflow/reducestreamer/servicer/async_servicer.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from google.protobuf import empty_pb2 as _empty_pb2
66

7+
from pynumaflow._constants import ERR_REDUCE_EXCEPTION
78
from pynumaflow.proto.reducer import reduce_pb2, reduce_pb2_grpc
89
from pynumaflow.reducestreamer._dtypes import (
910
Datum,
@@ -94,20 +95,20 @@ async def ReduceFn(
9495
async for msg in consumer:
9596
# If the message is an exception, we raise the exception
9697
if isinstance(msg, BaseException):
97-
await handle_async_error(context, msg)
98+
await handle_async_error(context, msg, ERR_REDUCE_EXCEPTION)
9899
return
99100
# Send window EOF response or Window result response
100101
# back to the client
101102
else:
102103
yield msg
103104
except BaseException as e:
104-
await handle_async_error(context, e)
105+
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
105106
return
106107
# Wait for the process_input_stream task to finish for a clean exit
107108
try:
108109
await producer
109110
except BaseException as e:
110-
await handle_async_error(context, e)
111+
await handle_async_error(context, e, ERR_REDUCE_EXCEPTION)
111112
return
112113

113114
async def IsReady(

pynumaflow/shared/server.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import os
66
import socket
77
import traceback
8+
9+
from google.protobuf import any_pb2
10+
from google.rpc import code_pb2, status_pb2, error_details_pb2
11+
from grpc_status import rpc_status
812
from abc import ABCMeta, abstractmethod
913
from collections.abc import Iterator
1014
from concurrent.futures import ThreadPoolExecutor
@@ -240,6 +244,21 @@ def check_instance(instance, callable_type) -> bool:
240244
return False
241245

242246

247+
def get_grpc_status(err: str):
248+
"""
249+
Create a grpc status object with the error details.
250+
"""
251+
details = any_pb2.Any()
252+
details.Pack(
253+
error_details_pb2.DebugInfo(
254+
detail="\n".join(traceback.format_stack()),
255+
)
256+
)
257+
258+
status = status_pb2.Status(code=code_pb2.INTERNAL, message=err, details=[details])
259+
return rpc_status.to_status(status)
260+
261+
243262
def exit_on_error(
244263
context: NumaflowServicerContext, err: str, parent: bool = False, update_context=True
245264
):
@@ -255,8 +274,12 @@ def exit_on_error(
255274
the context with the error codes
256275
"""
257276
if update_context:
258-
context.set_code(grpc.StatusCode.UNKNOWN)
277+
# Create a status object with the error details
278+
grpc_status = get_grpc_status(err)
279+
280+
context.set_code(grpc.StatusCode.INTERNAL)
259281
context.set_details(err)
282+
context.set_trailing_metadata(grpc_status.trailing_metadata)
260283

261284
p = psutil.Process(os.getpid())
262285
# If the parent flag is true, we exit from the parent process
@@ -267,15 +290,19 @@ def exit_on_error(
267290
p.kill()
268291

269292

270-
def update_context_err(context: NumaflowServicerContext, e: BaseException):
293+
def update_context_err(context: NumaflowServicerContext, e: BaseException, err_msg: str):
271294
"""
272295
Update the context with the error and log the exception.
273296
"""
274297
trace = get_exception_traceback_str(e)
275298
_LOGGER.critical(trace)
276299
_LOGGER.critical(e.__str__())
277-
context.set_code(grpc.StatusCode.UNKNOWN)
278-
context.set_details(e.__str__())
300+
301+
grpc_status = get_grpc_status(err_msg)
302+
303+
context.set_code(grpc.StatusCode.INTERNAL)
304+
context.set_details(err_msg)
305+
context.set_trailing_metadata(grpc_status.trailing_metadata)
279306

280307

281308
def get_exception_traceback_str(exc) -> str:
@@ -284,12 +311,15 @@ def get_exception_traceback_str(exc) -> str:
284311
return file.getvalue().rstrip()
285312

286313

287-
async def handle_async_error(context: NumaflowServicerContext, exception: BaseException):
314+
async def handle_async_error(
315+
context: NumaflowServicerContext, exception: BaseException, exception_type: str
316+
):
288317
"""
289318
Handle exceptions for async servers by updating the context and exiting.
290319
"""
291-
update_context_err(context, exception)
320+
err_msg = f"{exception_type}: {repr(exception)}"
321+
update_context_err(context, exception, err_msg)
292322
await asyncio.gather(
293-
context.abort(grpc.StatusCode.UNKNOWN, details=repr(exception)), return_exceptions=True
323+
context.abort(grpc.StatusCode.INTERNAL, details=err_msg), return_exceptions=True
294324
)
295-
exit_on_error(err=repr(exception), parent=False, context=context, update_context=False)
325+
exit_on_error(err=err_msg, parent=False, context=context, update_context=False)

pynumaflow/sideinput/servicer/servicer.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pynumaflow._constants import (
44
_LOGGER,
5+
ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION,
56
)
67
from pynumaflow.proto.sideinput import sideinput_pb2_grpc, sideinput_pb2
78
from pynumaflow.shared.server import exit_on_error
@@ -27,9 +28,9 @@ def RetrieveSideInput(
2728
try:
2829
rspn = self.__retrieve_handler()
2930
except BaseException as err:
30-
err_msg = f"RetrieveSideInputErr: {repr(err)}"
31+
err_msg = f"{ERR_SIDE_INPUT_RETRIEVAL_EXCEPTION}: {repr(err)}"
3132
_LOGGER.critical(err_msg, exc_info=True)
32-
exit_on_error(context, repr(err))
33+
exit_on_error(context, err_msg)
3334
return
3435

3536
return sideinput_pb2.SideInputResponse(value=rspn.value, no_broadcast=rspn.no_broadcast)

0 commit comments

Comments
 (0)