@@ -137,14 +137,14 @@ static PHP_METHOD(swoole_postgresql_coro, fetchRow);
137
137
static void php_pgsql_fetch_hash (INTERNAL_FUNCTION_PARAMETERS, zend_long result_type, int into_object);
138
138
139
139
static void _free_result (zend_resource *rsrc);
140
- static int swoole_pgsql_coro_onRead (swReactor *reactor, swEvent *event);
141
- static int swoole_pgsql_coro_onWrite (swReactor *reactor, swEvent *event);
142
- static int swoole_pgsql_coro_onError (swReactor *reactor, swEvent *event);
140
+ static int swoole_pgsql_coro_onRead (Reactor *reactor, Event *event);
141
+ static int swoole_pgsql_coro_onWrite (Reactor *reactor, Event *event);
142
+ static int swoole_pgsql_coro_onError (Reactor *reactor, Event *event);
143
143
static int swoole_postgresql_coro_close (zval *zobject);
144
144
static int query_result_parse (pg_object *object);
145
145
static int prepare_result_parse (pg_object *object);
146
146
static int meta_data_result_parse (pg_object *object);
147
- static void swoole_pgsql_coro_onTimeout (swTimer *timer, swoole:: TimerNode *tnode);
147
+ static void swoole_pgsql_coro_onTimeout (Timer *timer, TimerNode *tnode);
148
148
static void _php_pgsql_free_params (char **params, int num_params);
149
149
150
150
// clang-format off
@@ -472,17 +472,22 @@ static void connect_callback(pg_object *object, Reactor *reactor, Event *event)
472
472
PHPCoroutine::resume_m (context, &return_value);
473
473
}
474
474
475
- static int swoole_pgsql_coro_onWrite (swReactor *reactor, swEvent *event) {
475
+ static int swoole_pgsql_coro_onWrite (Reactor *reactor, Event *event) {
476
476
pg_object *object = (pg_object *) event->socket ->object ;
477
477
if (object->connected ) {
478
- return sw_reactor ()->default_write_handler (sw_reactor (), event);
478
+ if (object->co ) {
479
+ object->co ->resume ();
480
+ return SW_OK;
481
+ } else {
482
+ return reactor->default_write_handler (reactor, event);
483
+ }
479
484
} else {
480
485
connect_callback (object, reactor, event);
481
486
}
482
487
return SW_OK;
483
488
}
484
489
485
- static int swoole_pgsql_coro_onRead (swReactor *reactor, swEvent *event) {
490
+ static int swoole_pgsql_coro_onRead (Reactor *reactor, Event *event) {
486
491
pg_object *object = (pg_object *) (event->socket ->object );
487
492
488
493
if (!object->connected ) {
@@ -737,6 +742,43 @@ static int prepare_result_parse(pg_object *object) {
737
742
return SW_OK;
738
743
}
739
744
745
+ static bool swoole_pgsql_wait_write_ready (PGconn *pgsql, pg_object *object, zval *zthis) {
746
+ int retval = 0 ;
747
+ object->co = Coroutine::get_current_safe ();;
748
+
749
+ ON_SCOPE_EXIT {
750
+ if (object->socket ->isset_writable_event ()) {
751
+ swoole_event_del (object->socket );
752
+ }
753
+ object->co = nullptr ;
754
+ };
755
+
756
+ while ((retval = PQflush (pgsql)) == 1 ) {
757
+ if (!object->socket ->isset_writable_event () && !swoole_event_add (object->socket , SW_EVENT_WRITE)) {
758
+ return false ;
759
+ }
760
+ object->co ->yield_ex (object->timeout );
761
+ if (object->co ->is_canceled ()) {
762
+ zend_update_property_string (swoole_postgresql_coro_ce, SW_Z8_OBJ_P (zthis), ZEND_STRL (" error" ),
763
+ swoole_strerror (SW_ERROR_CO_CANCELED));
764
+ return false ;
765
+ }
766
+ if (object->co ->is_timedout ()) {
767
+ zend_update_property_string (swoole_postgresql_coro_ce, SW_Z8_OBJ_P (zthis), ZEND_STRL (" error" ),
768
+ swoole_strerror (SW_ERROR_CO_TIMEDOUT));
769
+ return false ;
770
+ }
771
+ }
772
+
773
+ if (retval == -1 ) {
774
+ char *err_msg = PQerrorMessage (pgsql);
775
+ zend_update_property_string (swoole_postgresql_coro_ce, SW_Z8_OBJ_P (zthis), ZEND_STRL (" error" ), err_msg);
776
+ return false ;
777
+ }
778
+
779
+ return true ;
780
+ }
781
+
740
782
static PHP_METHOD (swoole_postgresql_coro, query) {
741
783
zval *query;
742
784
PGconn *pgsql;
@@ -761,18 +803,7 @@ static PHP_METHOD(swoole_postgresql_coro, query) {
761
803
RETURN_FALSE;
762
804
}
763
805
764
- int retval = 0 ;
765
- while ((retval = PQflush (pgsql)) == 1 ) {
766
- if (System::wait_event (PQsocket (pgsql), SW_EVENT_WRITE, object->timeout ) <= 0 ) {
767
- zend_update_property_string (swoole_postgresql_coro_ce, SW_Z8_OBJ_P (ZEND_THIS), ZEND_STRL (" error" ),
768
- swoole_strerror (swoole_get_last_error ()));
769
- RETURN_FALSE;
770
- }
771
- }
772
-
773
- if (retval == -1 ) {
774
- char *err_msg = PQerrorMessage (pgsql);
775
- zend_update_property_string (swoole_postgresql_coro_ce, SW_Z8_OBJ_P (ZEND_THIS), ZEND_STRL (" error" ), err_msg);
806
+ if (!swoole_pgsql_wait_write_ready (pgsql, object, ZEND_THIS)) {
776
807
RETURN_FALSE;
777
808
}
778
809
@@ -828,6 +859,10 @@ static PHP_METHOD(swoole_postgresql_coro, prepare) {
828
859
}
829
860
}
830
861
862
+ if (!swoole_pgsql_wait_write_ready (pgsql, object, ZEND_THIS)) {
863
+ RETURN_FALSE;
864
+ }
865
+
831
866
if (swoole_event_add (object->socket , SW_EVENT_READ) < 0 ) {
832
867
RETURN_FALSE;
833
868
}
@@ -915,6 +950,10 @@ static PHP_METHOD(swoole_postgresql_coro, execute) {
915
950
}
916
951
}
917
952
953
+ if (!swoole_pgsql_wait_write_ready (pgsql, object, ZEND_THIS)) {
954
+ RETURN_FALSE;
955
+ }
956
+
918
957
FutureTask *context = php_swoole_postgresql_coro_get_context (ZEND_THIS);
919
958
context->coro_params = *ZEND_THIS;
920
959
@@ -1400,7 +1439,7 @@ static void _free_result(zend_resource *rsrc) {
1400
1439
PQclear (pg_result);
1401
1440
}
1402
1441
1403
- static int swoole_pgsql_coro_onError (swReactor *reactor, swEvent *event) {
1442
+ static int swoole_pgsql_coro_onError (Reactor *reactor, Event *event) {
1404
1443
zval _result;
1405
1444
zval *result = &_result;
1406
1445
pg_object *object = (pg_object *) (event->socket ->object );
0 commit comments