-
Notifications
You must be signed in to change notification settings - Fork 7.9k
Make FFI callbacks thread safe #12823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 13 commits
16be72b
1de9669
56c24d4
6423d3a
98eb079
d762cfe
29d6550
ba483e6
bce091d
083cc9e
34353aa
9164b16
20dd9b1
e20a564
af12a99
261a1d3
2f71fa3
391ca94
e8b2b5f
e2fce83
dc5496f
37ee7de
d2b2192
38f88d0
7e5697e
6f6a737
9d2e24a
bd953ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -227,6 +227,10 @@ static ZEND_COLD void zend_ffi_return_unsupported(zend_ffi_type *type); | |
static ZEND_COLD void zend_ffi_pass_unsupported(zend_ffi_type *type); | ||
static ZEND_COLD void zend_ffi_assign_incompatible(zval *arg, zend_ffi_type *type); | ||
static bool zend_ffi_is_compatible_type(zend_ffi_type *dst_type, zend_ffi_type *src_type); | ||
static void zend_ffi_wait_cond( | ||
pthread_mutex_t *mutex, pthread_cond_t *cond, | ||
zend_atomic_bool *flag, bool wanted_value, bool release | ||
); | ||
|
||
#if FFI_CLOSURES | ||
static void *zend_ffi_create_callback(zend_ffi_type *type, zval *value); | ||
|
@@ -935,9 +939,28 @@ static void zend_ffi_callback_hash_dtor(zval *zv) /* {{{ */ | |
} | ||
/* }}} */ | ||
|
||
static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, void* data) /* {{{ */ | ||
{ | ||
zend_ffi_callback_data *callback_data = (zend_ffi_callback_data*)data; | ||
static void (*orig_interrupt_function)(zend_execute_data *execute_data); | ||
|
||
static void zend_ffi_dispatch_callback_end(void){ /* {{{ */ | ||
zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), false); | ||
bool is_main_thread = FFI_G(callback_tid) == FFI_G(main_tid); | ||
if(!is_main_thread){ | ||
// unlock interrupt handler | ||
pthread_cond_broadcast(&FFI_G(vm_unlock)); | ||
pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
} | ||
} | ||
/* }}} */ | ||
|
||
static void zend_ffi_dispatch_callback(void){ /* {{{ */ | ||
// this function must always run on the main thread | ||
//ZEND_ASSERT(pthread_self() == FFI_G(main_tid)); | ||
|
||
if (!zend_atomic_bool_load_ex(&FFI_G(callback_in_progress))) { | ||
return; | ||
} | ||
|
||
zend_ffi_callback_data *callback_data = FFI_G(callback_data).data; | ||
zend_fcall_info fci; | ||
zend_ffi_type *ret_type; | ||
zval retval; | ||
|
@@ -951,13 +974,14 @@ static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, v | |
fci.param_count = callback_data->arg_count; | ||
fci.named_params = NULL; | ||
|
||
|
||
if (callback_data->type->func.args) { | ||
int n = 0; | ||
zend_ffi_type *arg_type; | ||
|
||
ZEND_HASH_PACKED_FOREACH_PTR(callback_data->type->func.args, arg_type) { | ||
arg_type = ZEND_FFI_TYPE(arg_type); | ||
zend_ffi_cdata_to_zval(NULL, args[n], arg_type, BP_VAR_R, &fci.params[n], (zend_ffi_flags)(arg_type->attr & ZEND_FFI_ATTR_CONST), 0, 0); | ||
zend_ffi_cdata_to_zval(NULL, FFI_G(callback_data).args[n], arg_type, BP_VAR_R, &fci.params[n], (zend_ffi_flags)(arg_type->attr & ZEND_FFI_ATTR_CONST), 0, 0); | ||
n++; | ||
} ZEND_HASH_FOREACH_END(); | ||
} | ||
|
@@ -977,18 +1001,113 @@ static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, v | |
free_alloca(fci.params, use_heap); | ||
|
||
if (EG(exception)) { | ||
// we're about to do a hard exit. unlock all mutexes | ||
zend_ffi_dispatch_callback_end(); | ||
pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
zend_error_noreturn(E_ERROR, "Throwing from FFI callbacks is not allowed"); | ||
} | ||
|
||
ret_type = ZEND_FFI_TYPE(callback_data->type->func.ret_type); | ||
if (ret_type->kind != ZEND_FFI_TYPE_VOID) { | ||
zend_ffi_zval_to_cdata(ret, ret_type, &retval); | ||
zend_ffi_zval_to_cdata(FFI_G(callback_data).ret, ret_type, &retval); | ||
} | ||
|
||
zval_ptr_dtor(&retval); | ||
} | ||
/* }}} */ | ||
|
||
static void zend_ffi_interrupt_function(zend_execute_data *execute_data){ /* {{{ */ | ||
pthread_mutex_lock(&FFI_G(vm_request_lock)); | ||
if (!zend_atomic_bool_load_ex(&FFI_G(callback_in_progress))) { | ||
goto end; | ||
} | ||
|
||
bool is_main_tid = FFI_G(callback_tid) == FFI_G(main_tid); | ||
if(!is_main_tid){ | ||
|
||
// notify calling thread and release | ||
pthread_cond_broadcast(&FFI_G(vm_ack)); | ||
|
||
// release mutex and wait for the unlock signal | ||
pthread_cond_wait(&FFI_G(vm_unlock), &FFI_G(vm_request_lock)); | ||
} | ||
|
||
end: | ||
pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
if (orig_interrupt_function) { | ||
orig_interrupt_function(execute_data); | ||
} | ||
} | ||
/* }}} */ | ||
|
||
static void zend_ffi_wait_cond( | ||
pthread_mutex_t *mutex, pthread_cond_t *cond, | ||
zend_atomic_bool *flag, bool wanted_value, bool release | ||
){ /* {{{ */ | ||
// get lock, first | ||
pthread_mutex_lock(mutex); | ||
|
||
// if we acquired the lock before the request could be serviced | ||
// unlock it and wait for the flag | ||
if(flag == NULL){ | ||
pthread_cond_wait(cond, mutex); | ||
} else { | ||
while(zend_atomic_bool_load_ex(flag) != wanted_value){ | ||
pthread_cond_wait(cond, mutex); | ||
} | ||
} | ||
|
||
if(release){ | ||
pthread_mutex_unlock(mutex); | ||
} | ||
} | ||
/* }}} */ | ||
|
||
static void zend_ffi_wait_request_barrier(bool release){ /* {{{ */ | ||
zend_ffi_wait_cond(&FFI_G(vm_request_lock), &FFI_G(vm_unlock), &FFI_G(callback_in_progress), false, release); | ||
} | ||
/* }}} */ | ||
|
||
static void zend_ffi_callback_trampoline(ffi_cif* cif, void* ret, void** args, void* data) /* {{{ */ | ||
{ | ||
// wait for a previously initiated request to complete | ||
zend_ffi_wait_request_barrier(false); | ||
Comment on lines
+1038
to
+1041
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may a regular in-main-thread callback, that doesn't require any locks. Ideally, we should make distinct between regular and "thread" callback $libc->pthread_create(
FFI::addr($tid), NULL,
FFI::thread_callback($thread_func), FFI::addr($arg)
); |
||
{ | ||
// mutex is now locked, and request is not pending. | ||
// start a new one | ||
zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), true); | ||
|
||
zend_ffi_call_data call_data = { | ||
.cif = cif, | ||
.ret = ret, | ||
.args = args, | ||
.data = (zend_ffi_callback_data *)data | ||
}; | ||
FFI_G(callback_data) = call_data; | ||
|
||
FFI_G(callback_tid) = pthread_self(); | ||
bool is_main_thread = FFI_G(callback_tid) == FFI_G(main_tid); | ||
|
||
if(!is_main_thread){ | ||
// post interrupt request to synchronize with the main thread | ||
zend_atomic_bool_store_ex(&EG(vm_interrupt), true); | ||
|
||
// release mutex and wait for ack | ||
pthread_cond_wait(&FFI_G(vm_ack), &FFI_G(vm_request_lock)); | ||
|
||
// prepare the stack call info/limits for the current thread | ||
zend_call_stack_init(); | ||
} | ||
|
||
// dispatch the callback | ||
zend_ffi_dispatch_callback(); | ||
|
||
zend_ffi_dispatch_callback_end(); | ||
} | ||
pthread_mutex_unlock(&FFI_G(vm_request_lock)); | ||
} | ||
/* }}} */ | ||
|
||
static void *zend_ffi_create_callback(zend_ffi_type *type, zval *value) /* {{{ */ | ||
{ | ||
zend_fcall_info_cache fcc; | ||
|
@@ -5518,13 +5637,25 @@ ZEND_MINIT_FUNCTION(ffi) | |
return zend_ffi_preload(FFI_G(preload)); | ||
} | ||
|
||
FFI_G(main_tid) = pthread_self(); | ||
|
||
zend_atomic_bool_store_ex(&FFI_G(callback_in_progress), false); | ||
orig_interrupt_function = zend_interrupt_function; | ||
zend_interrupt_function = zend_ffi_interrupt_function; | ||
|
||
pthread_mutex_init(&FFI_G(vm_request_lock), NULL); | ||
pthread_cond_init(&FFI_G(vm_ack), NULL); | ||
pthread_cond_init(&FFI_G(vm_unlock), NULL); | ||
|
||
return SUCCESS; | ||
} | ||
/* }}} */ | ||
|
||
/* {{{ ZEND_RSHUTDOWN_FUNCTION */ | ||
ZEND_RSHUTDOWN_FUNCTION(ffi) | ||
{ | ||
zend_ffi_wait_request_barrier(true); | ||
|
||
if (FFI_G(callbacks)) { | ||
zend_hash_destroy(FFI_G(callbacks)); | ||
efree(FFI_G(callbacks)); | ||
|
@@ -5636,6 +5767,7 @@ static ZEND_GINIT_FUNCTION(ffi) | |
/* {{{ ZEND_GINIT_FUNCTION */ | ||
static ZEND_GSHUTDOWN_FUNCTION(ffi) | ||
{ | ||
zend_ffi_wait_request_barrier(true); | ||
if (ffi_globals->scopes) { | ||
zend_hash_destroy(ffi_globals->scopes); | ||
free(ffi_globals->scopes); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,9 @@ | |
#ifndef PHP_FFI_H | ||
#define PHP_FFI_H | ||
|
||
#include <ffi.h> | ||
#include <pthread.h> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Windows build is broken. There are no <pthread.h> there. |
||
|
||
extern zend_module_entry ffi_module_entry; | ||
#define phpext_ffi_ptr &ffi_module_entry | ||
|
||
|
@@ -27,6 +30,15 @@ typedef enum _zend_ffi_api_restriction { | |
} zend_ffi_api_restriction; | ||
|
||
typedef struct _zend_ffi_type zend_ffi_type; | ||
typedef struct _zend_ffi_callback_data zend_ffi_callback_data; | ||
|
||
|
||
typedef struct _zend_ffi_call_data { | ||
ffi_cif* cif; | ||
void* ret; | ||
void** args; | ||
zend_ffi_callback_data* data; | ||
} zend_ffi_call_data; | ||
|
||
ZEND_BEGIN_MODULE_GLOBALS(ffi) | ||
zend_ffi_api_restriction restriction; | ||
|
@@ -35,6 +47,15 @@ ZEND_BEGIN_MODULE_GLOBALS(ffi) | |
/* predefined ffi_types */ | ||
HashTable types; | ||
|
||
zend_atomic_bool callback_in_progress; | ||
|
||
pthread_mutex_t vm_request_lock; | ||
pthread_cond_t vm_ack; | ||
pthread_cond_t vm_unlock; | ||
pthread_t callback_tid; | ||
pthread_t main_tid; | ||
zend_ffi_call_data callback_data; | ||
|
||
/* preloading */ | ||
char *preload; | ||
HashTable *scopes; /* list of preloaded scopes */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
--TEST-- | ||
FFI Thread safe callbacks | ||
--EXTENSIONS-- | ||
ffi | ||
--SKIPIF-- | ||
<?php | ||
try { | ||
$libc = FFI::cdef(' | ||
int pthread_create(void *restrict thread, | ||
const void *restrict attr, | ||
void *(*start_routine)(void *), | ||
void *arg); | ||
', 'libc.so.6'); | ||
} catch(Throwable $_){ | ||
die('skip libc.so.6 not available'); | ||
} | ||
?> | ||
--INI-- | ||
ffi.enable=1 | ||
--FILE-- | ||
<?php | ||
$libc = FFI::cdef(' | ||
typedef uint64_t pthread_t; | ||
int pthread_create(pthread_t *thread, | ||
const void* attr, | ||
void *(*start_routine)(void *), | ||
void *arg); | ||
int pthread_detach(pthread_t thread); | ||
', 'libc.so.6'); | ||
|
||
$tid = $libc->new($libc->type('pthread_t')); | ||
$accum = 0; | ||
$thread_func = function($arg) use($libc, &$accum){ | ||
//$v = $libc->cast('int *', $arg)[0]; | ||
FFI::free($arg); | ||
usleep(10 * 1000); | ||
$accum++; | ||
//print("."); | ||
}; | ||
|
||
for($i=0; $i<100; $i++){ | ||
$arg = $libc->new('int', false); | ||
$arg->cdata = $i; | ||
|
||
$libc->pthread_create( | ||
FFI::addr($tid), NULL, | ||
$thread_func, FFI::addr($arg) | ||
); | ||
$libc->pthread_detach($tid->cdata); | ||
} | ||
|
||
while($accum != 100){ | ||
//print("w"); | ||
usleep(1000); | ||
} | ||
print($accum); | ||
?> | ||
--EXPECT-- | ||
100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should check if this is FFI related interrupt. This may be a POSIX signal or something else...