Skip to content

Commit 90936e1

Browse files
committed
Add automatic env reuse for py_event_loop:run and spawn_task
When a process has a local Python environment (from py:context/py:exec), py_event_loop functions now automatically pass that env to the NIF. This allows functions defined via py:exec to be called directly without manual env parameter passing. Key implementation detail: env resources are stored in a PID->env mapping (not serialized through the task queue) to avoid memory safety issues with resource serialization across NIF environments. Changes: - Add pid_env_mapping_t struct for storing PID->env mappings - Add register_pid_env/lookup_pid_env helper functions - Modify nif_submit_task_with_env to register env by PID - Modify process_ready_tasks to look up env by caller PID - Move py_env_resource_t definition to py_nif.h for shared access - Update py_event_loop:create_task to auto-detect and pass process env - Update py_event_loop:spawn_task to use caller's env - Add tests for env reuse with sync, async, and spawn_task functions
1 parent 522c42a commit 90936e1

File tree

9 files changed

+477
-29
lines changed

9 files changed

+477
-29
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434

3535
### Added
3636

37+
- **Automatic Env Reuse for Event Loop Tasks** - Functions defined via `py:exec(Ctx, Code)`
38+
can now be called directly using `py_event_loop:run/3,4`, `create_task/3,4`, and `spawn_task/3,4`
39+
without manual env passing. The process-local environment is automatically detected and used
40+
for function lookup when targeting `__main__` module.
41+
3742
- **PyBuffer API** - Zero-copy WSGI input buffer for streaming HTTP bodies
3843
- `py_buffer:new/0,1` - Create buffer (chunked or with content_length)
3944
- `py_buffer:write/2` - Append data, signals waiting Python readers

c_src/py_event_loop.c

Lines changed: 207 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,18 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
502502
}
503503
loop->namespaces_head = NULL;
504504

505+
/* Clean up PID-to-env mappings */
506+
pid_env_mapping_t *mapping = loop->pid_env_head;
507+
while (mapping != NULL) {
508+
pid_env_mapping_t *next = mapping->next;
509+
if (mapping->env != NULL) {
510+
enif_release_resource(mapping->env);
511+
}
512+
enif_free(mapping);
513+
mapping = next;
514+
}
515+
loop->pid_env_head = NULL;
516+
505517
pthread_mutex_unlock(&loop->namespaces_mutex);
506518
PyGILState_Release(gstate);
507519
} else {
@@ -517,6 +529,18 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
517529
}
518530
loop->namespaces_head = NULL;
519531

532+
/* Clean up PID-to-env mappings */
533+
pid_env_mapping_t *mapping = loop->pid_env_head;
534+
while (mapping != NULL) {
535+
pid_env_mapping_t *next = mapping->next;
536+
if (mapping->env != NULL) {
537+
enif_release_resource(mapping->env);
538+
}
539+
enif_free(mapping);
540+
mapping = next;
541+
}
542+
loop->pid_env_head = NULL;
543+
520544
pthread_mutex_unlock(&loop->namespaces_mutex);
521545
}
522546
pthread_mutex_destroy(&loop->namespaces_mutex);
@@ -1128,6 +1152,7 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc,
11281152

11291153
/* Initialize per-process namespace registry */
11301154
loop->namespaces_head = NULL;
1155+
loop->pid_env_head = NULL;
11311156
if (pthread_mutex_init(&loop->namespaces_mutex, NULL) != 0) {
11321157
pthread_mutex_destroy(&loop->task_queue_mutex);
11331158
enif_ioq_destroy(loop->task_queue);
@@ -2501,6 +2526,164 @@ ERL_NIF_TERM nif_submit_task(ErlNifEnv *env, int argc,
25012526
return ATOM_OK;
25022527
}
25032528

2529+
/* ============================================================================
2530+
* PID-to-Env Mapping Helpers
2531+
* ============================================================================ */
2532+
2533+
/**
2534+
* @brief Register or update an env mapping for a PID
2535+
*
2536+
* Increments refcount if mapping exists, otherwise creates new mapping.
2537+
* Calls enif_keep_resource to keep the env alive.
2538+
*
2539+
* @param loop Event loop containing the mapping registry
2540+
* @param pid PID to register
2541+
* @param env_res Environment resource (will be kept via enif_keep_resource)
2542+
* @return true on success, false on allocation failure
2543+
*/
2544+
static bool register_pid_env(erlang_event_loop_t *loop, const ErlNifPid *pid,
2545+
void *env_res) {
2546+
pthread_mutex_lock(&loop->namespaces_mutex);
2547+
2548+
/* Check if mapping already exists */
2549+
pid_env_mapping_t *mapping = loop->pid_env_head;
2550+
while (mapping != NULL) {
2551+
if (enif_compare_pids(&mapping->pid, pid) == 0) {
2552+
/* Found existing mapping - increment refcount */
2553+
mapping->refcount++;
2554+
pthread_mutex_unlock(&loop->namespaces_mutex);
2555+
return true;
2556+
}
2557+
mapping = mapping->next;
2558+
}
2559+
2560+
/* Create new mapping */
2561+
mapping = enif_alloc(sizeof(pid_env_mapping_t));
2562+
if (mapping == NULL) {
2563+
pthread_mutex_unlock(&loop->namespaces_mutex);
2564+
return false;
2565+
}
2566+
2567+
mapping->pid = *pid;
2568+
mapping->env = env_res;
2569+
mapping->refcount = 1;
2570+
mapping->next = loop->pid_env_head;
2571+
loop->pid_env_head = mapping;
2572+
2573+
/* Keep the resource alive */
2574+
enif_keep_resource(env_res);
2575+
2576+
pthread_mutex_unlock(&loop->namespaces_mutex);
2577+
return true;
2578+
}
2579+
2580+
/**
2581+
* @brief Look up env for a PID
2582+
*
2583+
* @param loop Event loop containing the mapping registry
2584+
* @param pid PID to look up
2585+
* @return Environment resource or NULL if not found
2586+
*/
2587+
static void *lookup_pid_env(erlang_event_loop_t *loop, const ErlNifPid *pid) {
2588+
pthread_mutex_lock(&loop->namespaces_mutex);
2589+
2590+
pid_env_mapping_t *mapping = loop->pid_env_head;
2591+
while (mapping != NULL) {
2592+
if (enif_compare_pids(&mapping->pid, pid) == 0) {
2593+
void *env_res = mapping->env;
2594+
pthread_mutex_unlock(&loop->namespaces_mutex);
2595+
return env_res;
2596+
}
2597+
mapping = mapping->next;
2598+
}
2599+
2600+
pthread_mutex_unlock(&loop->namespaces_mutex);
2601+
return NULL;
2602+
}
2603+
2604+
/**
2605+
* submit_task_with_env(LoopRef, CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef) -> ok | {error, Reason}
2606+
*
2607+
* Like submit_task but registers the process-local env for the caller PID.
2608+
* The env's globals dict is used for function lookup, allowing functions
2609+
* defined via py:exec() to be called from the event loop.
2610+
*
2611+
* Note: The env resource is stored in a PID->env mapping, not serialized.
2612+
* This avoids the issue of resource references not surviving serialization.
2613+
*/
2614+
ERL_NIF_TERM nif_submit_task_with_env(ErlNifEnv *env, int argc,
2615+
const ERL_NIF_TERM argv[]) {
2616+
(void)argc;
2617+
2618+
erlang_event_loop_t *loop;
2619+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
2620+
(void **)&loop)) {
2621+
return make_error(env, "invalid_loop");
2622+
}
2623+
2624+
if (!loop->task_queue_initialized) {
2625+
return make_error(env, "task_queue_not_initialized");
2626+
}
2627+
2628+
/* Validate caller_pid */
2629+
ErlNifPid caller_pid;
2630+
if (!enif_get_local_pid(env, argv[1], &caller_pid)) {
2631+
return make_error(env, "invalid_caller_pid");
2632+
}
2633+
2634+
/* Get and register the env resource */
2635+
void *env_res;
2636+
if (!enif_get_resource(env, argv[7], get_env_resource_type(), &env_res)) {
2637+
return make_error(env, "invalid_env");
2638+
}
2639+
2640+
/* Register the env for this PID (increments refcount if exists) */
2641+
if (!register_pid_env(loop, &caller_pid, env_res)) {
2642+
return make_error(env, "env_registration_failed");
2643+
}
2644+
2645+
/* Create task tuple: {CallerPid, Ref, Module, Func, Args, Kwargs}
2646+
* Note: We use 6-tuple, NOT 7-tuple. The env is looked up by PID. */
2647+
ERL_NIF_TERM task_tuple = enif_make_tuple6(env,
2648+
argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]);
2649+
2650+
/* Serialize to binary */
2651+
ErlNifBinary task_bin;
2652+
if (!enif_term_to_binary(env, task_tuple, &task_bin)) {
2653+
return make_error(env, "serialization_failed");
2654+
}
2655+
2656+
/* Thread-safe enqueue */
2657+
pthread_mutex_lock(&loop->task_queue_mutex);
2658+
int enq_result = enif_ioq_enq_binary(loop->task_queue, &task_bin, 0);
2659+
pthread_mutex_unlock(&loop->task_queue_mutex);
2660+
2661+
if (enq_result != 1) {
2662+
enif_release_binary(&task_bin);
2663+
return make_error(env, "enqueue_failed");
2664+
}
2665+
2666+
/* Increment task count */
2667+
atomic_fetch_add(&loop->task_count, 1);
2668+
2669+
/* Coalesced wakeup (uvloop-style) */
2670+
if (loop->has_worker) {
2671+
if (!atomic_exchange(&loop->task_wake_pending, true)) {
2672+
ErlNifEnv *msg_env = enif_alloc_env();
2673+
if (msg_env != NULL) {
2674+
if (ATOM_TASK_READY == 0) {
2675+
ATOM_TASK_READY = enif_make_atom(msg_env, "task_ready");
2676+
}
2677+
ERL_NIF_TERM msg = enif_make_atom(msg_env, "task_ready");
2678+
enif_send(NULL, &loop->worker_pid, msg_env, msg);
2679+
enif_free_env(msg_env);
2680+
}
2681+
}
2682+
}
2683+
2684+
return ATOM_OK;
2685+
}
2686+
25042687
/**
25052688
* Maximum tasks to dequeue in one batch before acquiring GIL.
25062689
* This bounds memory usage while still amortizing GIL acquisition cost.
@@ -2792,7 +2975,8 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
27922975
/* Extract: {CallerPid, Ref, Module, Func, Args, Kwargs} */
27932976
int arity;
27942977
const ERL_NIF_TERM *tuple_elems;
2795-
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) || arity != 6) {
2978+
if (!enif_get_tuple(term_env, task_term, &arity, &tuple_elems) ||
2979+
arity != 6) {
27962980
enif_free_env(term_env);
27972981
continue;
27982982
}
@@ -2810,6 +2994,9 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28102994
continue;
28112995
}
28122996

2997+
/* Look up env by PID (registered via submit_task_with_env) */
2998+
py_env_resource_t *task_env = (py_env_resource_t *)lookup_pid_env(loop, &caller_pid);
2999+
28133000
/* Convert module/func to C strings */
28143001
char *module_name = enif_alloc(module_bin.size + 1);
28153002
char *func_name = enif_alloc(func_bin.size + 1);
@@ -2824,11 +3011,27 @@ ERL_NIF_TERM nif_process_ready_tasks(ErlNifEnv *env, int argc,
28243011
memcpy(func_name, func_bin.data, func_bin.size);
28253012
func_name[func_bin.size] = '\0';
28263013

2827-
/* Look up namespace for caller process (only exists if they called exec/eval) */
3014+
/* Look up namespace for caller process (used for reentrant calls) */
28283015
process_namespace_t *ns = lookup_process_namespace(loop, &caller_pid);
28293016

2830-
/* Look up function (checks process namespace for __main__, then cache/import) */
2831-
PyObject *func = get_function_for_task(loop, ns, module_name, func_name);
3017+
/* Look up function - check task_env first, then process namespace, then import */
3018+
PyObject *func = NULL;
3019+
3020+
/* First, check the passed env's globals (from py:exec) */
3021+
if (task_env != NULL && task_env->globals != NULL) {
3022+
if (strcmp(module_name, "__main__") == 0 ||
3023+
strcmp(module_name, "_process_") == 0) {
3024+
func = PyDict_GetItemString(task_env->globals, func_name);
3025+
if (func != NULL) {
3026+
Py_INCREF(func);
3027+
}
3028+
}
3029+
}
3030+
3031+
/* Fallback to process namespace and cache/import */
3032+
if (func == NULL) {
3033+
func = get_function_for_task(loop, ns, module_name, func_name);
3034+
}
28323035

28333036
enif_free(module_name);
28343037
enif_free(func_name);

c_src/py_event_loop.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,27 @@ typedef struct process_namespace {
124124
struct process_namespace *next;
125125
} process_namespace_t;
126126

127+
/**
128+
* @struct pid_env_mapping_t
129+
* @brief Mapping from Erlang PID to process-local Python environment
130+
*
131+
* Used to pass env resources across the task queue without serialization.
132+
* The env is kept alive by enif_keep_resource until the mapping is removed.
133+
*/
134+
typedef struct pid_env_mapping {
135+
/** @brief PID of the owning Erlang process */
136+
ErlNifPid pid;
137+
138+
/** @brief Environment resource (kept via enif_keep_resource) */
139+
void *env; /* py_env_resource_t* - forward declared to avoid header deps */
140+
141+
/** @brief Reference count for this mapping (multiple tasks may use it) */
142+
int refcount;
143+
144+
/** @brief Next mapping in linked list */
145+
struct pid_env_mapping *next;
146+
} pid_env_mapping_t;
147+
127148
/** @brief Event types for pending callbacks */
128149
typedef enum {
129150
EVENT_TYPE_READ = 1,
@@ -372,6 +393,12 @@ typedef struct erlang_event_loop {
372393

373394
/** @brief Mutex protecting namespace registry */
374395
pthread_mutex_t namespaces_mutex;
396+
397+
/* ========== PID-to-Env Mapping Registry ========== */
398+
/* Protected by namespaces_mutex (shared with namespace registry) */
399+
400+
/** @brief Head of PID-to-env mapping linked list */
401+
pid_env_mapping_t *pid_env_head;
375402
} erlang_event_loop_t;
376403

377404
/* ============================================================================
@@ -624,6 +651,18 @@ ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
624651
ERL_NIF_TERM nif_submit_task(ErlNifEnv *env, int argc,
625652
const ERL_NIF_TERM argv[]);
626653

654+
/**
655+
* @brief Submit an async task with process-local env (thread-safe)
656+
*
657+
* Like submit_task but includes an env resource reference. The env's globals
658+
* dict is used for function lookup, allowing functions defined via py:exec()
659+
* to be called from the event loop.
660+
*
661+
* NIF: submit_task_with_env(LoopRef, CallerPid, Ref, Module, Func, Args, Kwargs, EnvRef) -> ok | {error, Reason}
662+
*/
663+
ERL_NIF_TERM nif_submit_task_with_env(ErlNifEnv *env, int argc,
664+
const ERL_NIF_TERM argv[]);
665+
627666
/**
628667
* @brief Process all pending tasks from the task queue
629668
*

c_src/py_nif.c

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ ErlNifResourceType *INLINE_CONTINUATION_RESOURCE_TYPE = NULL;
7070
/* Process-local Python environment resource type */
7171
ErlNifResourceType *PY_ENV_RESOURCE_TYPE = NULL;
7272

73+
/* Getter for PY_ENV_RESOURCE_TYPE (used by py_event_loop.c) */
74+
ErlNifResourceType *get_env_resource_type(void) {
75+
return PY_ENV_RESOURCE_TYPE;
76+
}
77+
7378
_Atomic uint32_t g_context_id_counter = 1;
7479

7580
/* ============================================================================
@@ -80,27 +85,7 @@ _Atomic uint32_t g_context_id_counter = 1;
8085
* resource destructor frees the Python dicts.
8186
*/
8287

83-
/**
84-
* @struct py_env_resource_t
85-
* @brief Process-local Python environment (globals/locals)
86-
*
87-
* Stored in process dictionary as py_local_env. When the process exits,
88-
* Erlang GC drops the reference, triggering the destructor which frees
89-
* the Python dicts.
90-
*
91-
* Each env is bound to a specific interpreter (identified by interp_id).
92-
* The dicts must be freed in the same interpreter that created them.
93-
*/
94-
typedef struct {
95-
/** @brief Global namespace dictionary */
96-
PyObject *globals;
97-
/** @brief Local namespace dictionary (same as globals for module-level execution) */
98-
PyObject *locals;
99-
/** @brief Interpreter ID that owns these dicts (0 = main interpreter) */
100-
int64_t interp_id;
101-
/** @brief Pool slot index (-1 for main interpreter) */
102-
int pool_slot;
103-
} py_env_resource_t;
88+
/* py_env_resource_t is now defined in py_nif.h */
10489

10590
/**
10691
* @brief Destructor for py_env_resource_t
@@ -6534,6 +6519,7 @@ static ErlNifFunc nif_funcs[] = {
65346519
{"event_loop_run_async", 7, nif_event_loop_run_async, ERL_NIF_DIRTY_JOB_IO_BOUND},
65356520
/* Async task queue NIFs (uvloop-inspired) */
65366521
{"submit_task", 7, nif_submit_task, 0}, /* Thread-safe, no GIL needed */
6522+
{"submit_task_with_env", 8, nif_submit_task_with_env, 0}, /* With process-local env */
65376523
{"process_ready_tasks", 1, nif_process_ready_tasks, ERL_NIF_DIRTY_JOB_CPU_BOUND},
65386524
{"event_loop_set_py_loop", 2, nif_event_loop_set_py_loop, 0},
65396525
/* Per-process namespace NIFs */

0 commit comments

Comments
 (0)