Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 52 additions & 37 deletions c_src/py_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -3084,14 +3084,17 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
return make_error(env, "invalid_fd");
}

/* Acquire GIL and call Python */
gil_guard_t guard = gil_acquire();
/* Acquire context (handles both worker mode and subinterpreter mode) */
py_context_guard_t guard = py_context_acquire(ctx);
if (!guard.acquired) {
return make_error(env, "acquire_failed");
}

/* Import erlang_reactor module */
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
/* Import erlang.reactor module */
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
if (reactor_module == NULL) {
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "import_erlang_reactor_failed");
}

Expand All @@ -3102,7 +3105,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,

if (result == NULL) {
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "on_read_ready_failed");
}

Expand All @@ -3122,7 +3125,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
}

Py_DECREF(result);
gil_release(guard);
py_context_release(&guard);

return enif_make_tuple2(env, ATOM_OK, action);
}
Expand All @@ -3149,14 +3152,17 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
return make_error(env, "invalid_fd");
}

/* Acquire GIL and call Python */
gil_guard_t guard = gil_acquire();
/* Acquire context (handles both worker mode and subinterpreter mode) */
py_context_guard_t guard = py_context_acquire(ctx);
if (!guard.acquired) {
return make_error(env, "acquire_failed");
}

/* Import erlang_reactor module */
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
/* Import erlang.reactor module */
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
if (reactor_module == NULL) {
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "import_erlang_reactor_failed");
}

Expand All @@ -3167,7 +3173,7 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,

if (result == NULL) {
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "on_write_ready_failed");
}

Expand All @@ -3187,7 +3193,7 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
}

Py_DECREF(result);
gil_release(guard);
py_context_release(&guard);

return enif_make_tuple2(env, ATOM_OK, action);
}
Expand Down Expand Up @@ -3219,23 +3225,26 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
return make_error(env, "invalid_client_info");
}

/* Acquire GIL and call Python */
gil_guard_t guard = gil_acquire();
/* Acquire context (handles both worker mode and subinterpreter mode) */
py_context_guard_t guard = py_context_acquire(ctx);
if (!guard.acquired) {
return make_error(env, "acquire_failed");
}

/* Convert Erlang map to Python dict */
PyObject *client_info = term_to_py(env, argv[2]);
if (client_info == NULL) {
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "client_info_conversion_failed");
}

/* Import erlang_reactor module */
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
/* Import erlang.reactor module */
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
if (reactor_module == NULL) {
Py_DECREF(client_info);
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "import_erlang_reactor_failed");
}

Expand All @@ -3247,18 +3256,18 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,

if (result == NULL) {
PyErr_Clear();
gil_release(guard);
py_context_release(&guard);
return make_error(env, "init_connection_failed");
}

Py_DECREF(result);
gil_release(guard);
py_context_release(&guard);

return ATOM_OK;
}

/**
* reactor_close_fd(FdRef) -> ok | {error, Reason}
* reactor_close_fd(ContextRef, FdRef) -> ok | {error, Reason}
*
* Close an FD and clean up the protocol handler.
* Calls Python's erlang_reactor.close_connection(fd) if registered.
Expand All @@ -3267,8 +3276,13 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

py_context_t *ctx;
if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) {
return make_error(env, "invalid_context");
}

fd_resource_t *fd_res;
if (!enif_get_resource(env, argv[0], FD_RESOURCE_TYPE, (void **)&fd_res)) {
if (!enif_get_resource(env, argv[1], FD_RESOURCE_TYPE, (void **)&fd_res)) {
return make_error(env, "invalid_fd_ref");
}

Expand All @@ -3284,20 +3298,21 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,

/* Call Python to clean up protocol handler */
if (fd >= 0) {
gil_guard_t guard = gil_acquire();

PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
if (reactor_module != NULL) {
PyObject *result = PyObject_CallMethod(reactor_module,
"close_connection", "i", fd);
Py_XDECREF(result);
Py_DECREF(reactor_module);
PyErr_Clear(); /* Ignore errors during cleanup */
} else {
PyErr_Clear();
}
py_context_guard_t guard = py_context_acquire(ctx);
if (guard.acquired) {
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
if (reactor_module != NULL) {
PyObject *result = PyObject_CallMethod(reactor_module,
"close_connection", "i", fd);
Py_XDECREF(result);
Py_DECREF(reactor_module);
PyErr_Clear(); /* Ignore errors during cleanup */
} else {
PyErr_Clear();
}

gil_release(guard);
py_context_release(&guard);
}
}

/* Take ownership for cleanup */
Expand Down
68 changes: 68 additions & 0 deletions c_src/py_event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,4 +799,72 @@ int create_default_event_loop(ErlNifEnv *env);
*/
int init_subinterpreter_event_loop(ErlNifEnv *env);

/* ============================================================================
* Reactor NIF Functions (Erlang-as-Reactor architecture)
* ============================================================================ */

/**
* @brief Register a file descriptor for reactor monitoring
*
* NIF: reactor_register_fd(ContextRef, Fd, OwnerPid) -> {ok, FdRef} | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_register_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Re-register for read events after a one-shot event
*
* NIF: reactor_reselect_read(FdRef) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_reselect_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Register for write events
*
* NIF: reactor_select_write(FdRef) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_select_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Get the FD integer from an FD resource
*
* NIF: get_fd_from_resource(FdRef) -> Fd | {error, Reason}
*/
ERL_NIF_TERM nif_get_fd_from_resource(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Call Python protocol on_read_ready
*
* NIF: reactor_on_read_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Call Python protocol on_write_ready
*
* NIF: reactor_on_write_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Initialize connection with Python protocol
*
* NIF: reactor_init_connection(ContextRef, Fd, ClientInfo) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Close FD and cleanup Python protocol
*
* NIF: reactor_close_fd(ContextRef, FdRef) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

#endif /* PY_EVENT_LOOP_H */
13 changes: 12 additions & 1 deletion c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "py_nif.h"
#include "py_asgi.h"
#include "py_wsgi.h"
#include "py_event_loop.h"

/* ============================================================================
* Global state definitions
Expand Down Expand Up @@ -3865,7 +3866,17 @@ static ErlNifFunc nif_funcs[] = {
{"ref_interp_id", 1, nif_ref_interp_id, 0},
{"ref_to_term", 1, nif_ref_to_term, 0},
{"ref_getattr", 2, nif_ref_getattr, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"ref_call_method", 3, nif_ref_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND}
{"ref_call_method", 3, nif_ref_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND},

/* Reactor NIFs - Erlang-as-Reactor architecture */
{"reactor_register_fd", 3, nif_reactor_register_fd, 0},
{"reactor_reselect_read", 1, nif_reactor_reselect_read, 0},
{"reactor_select_write", 1, nif_reactor_select_write, 0},
{"get_fd_from_resource", 1, nif_get_fd_from_resource, 0},
{"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"reactor_close_fd", 2, nif_reactor_close_fd, 0}
};

ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)
29 changes: 28 additions & 1 deletion priv/_erlang_impl/_reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def write_ready(self):
'on_read_ready',
'on_write_ready',
'close_connection',
'signal_write_ready',
]


Expand Down Expand Up @@ -153,6 +154,7 @@ def write(self, data: bytes) -> int:
# =============================================================================

_protocols: Dict[int, Protocol] = {}
_reactor_pids: Dict[int, object] = {} # fd -> reactor PID
_protocol_factory: Optional[Callable[[], Protocol]] = None


Expand Down Expand Up @@ -193,11 +195,15 @@ def init_connection(fd: int, client_info: dict):
fd: File descriptor
client_info: Connection metadata from Erlang
"""
global _protocols, _protocol_factory
global _protocols, _protocol_factory, _reactor_pids
if _protocol_factory is not None:
proto = _protocol_factory()
proto.connection_made(fd, client_info)
_protocols[fd] = proto
# Store reactor PID for signal_write_ready
reactor_pid = client_info.get('reactor_pid')
if reactor_pid is not None:
_reactor_pids[fd] = reactor_pid


def on_read_ready(fd: int) -> str:
Expand Down Expand Up @@ -246,6 +252,27 @@ def close_connection(fd: int):
fd: File descriptor
"""
proto = _protocols.pop(fd, None)
_reactor_pids.pop(fd, None)
if proto is not None:
proto.closed = True
proto.connection_lost()


def signal_write_ready(fd: int) -> bool:
"""Signal the reactor that a response is ready for the given fd.

Call this after an async task completes and the response buffer is ready.
The reactor will then trigger write selection for the fd.

Args:
fd: File descriptor with pending response

Returns:
True if signal was sent, False if no reactor PID registered
"""
import erlang
reactor_pid = _reactor_pids.get(fd)
if reactor_pid is not None:
erlang.send(reactor_pid, ('write_ready', fd))
return True
return False
7 changes: 4 additions & 3 deletions src/py_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
reactor_on_read_ready/2,
reactor_on_write_ready/2,
reactor_init_connection/3,
reactor_close_fd/1
reactor_close_fd/2
]).

-on_load(load_nif/0).
Expand Down Expand Up @@ -1524,8 +1524,9 @@ reactor_init_connection(_ContextRef, _Fd, _ClientInfo) ->
%% Calls Python's erlang_reactor.close_connection(fd) to clean up
%% the protocol handler, then closes the FD.
%%
%% @param ContextRef Context resource reference
%% @param FdRef FD resource reference
%% @returns ok | {error, Reason}
-spec reactor_close_fd(reference()) -> ok | {error, term()}.
reactor_close_fd(_FdRef) ->
-spec reactor_close_fd(reference(), reference()) -> ok | {error, term()}.
reactor_close_fd(_ContextRef, _FdRef) ->
?NIF_STUB.
Loading