Skip to content

Commit 11b126a

Browse files
committed
Add asynchronous connection primitives.
- Extend connection_status and add polling_status. - Add ?startonly parameter to connection and connect_poll method. - Add reset_start and reset_poll methods.
1 parent 3cae7b6 commit 11b126a

File tree

4 files changed

+140
-20
lines changed

4 files changed

+140
-20
lines changed

examples/async.ml

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
open Printf
12
open Postgresql
23

4+
let failwith_f fmt = ksprintf failwith fmt
5+
36
let _ =
47
if Array.length Sys.argv <> 2 then (
58
Printf.printf "\
@@ -22,29 +25,51 @@ let fetch_single_result c =
2225
| None -> assert false
2326
| Some r -> assert (fetch_result c = None); r
2427

25-
let main () =
26-
let c = new connection ~conninfo:Sys.argv.(1) () in
27-
c#set_nonblocking true;
28+
(* See http://www.postgresql.org/docs/devel/static/libpq-connect.html *)
29+
let rec finish_conn socket_fd connect_poll = function
30+
| Polling_failed ->
31+
printf "f\n%!"
32+
| Polling_reading ->
33+
printf "r,%!";
34+
ignore (Unix.select [socket_fd] [] [] (-1.0));
35+
finish_conn socket_fd connect_poll (connect_poll ())
36+
| Polling_writing ->
37+
printf "w,%!";
38+
ignore (Unix.select [] [socket_fd] [] (-1.0));
39+
finish_conn socket_fd connect_poll (connect_poll ())
40+
| Polling_ok ->
41+
printf "c\n%!"
42+
43+
let test (c : connection) =
44+
(* Create a table using a non-prepared statement. *)
2845
c#send_query "\
2946
CREATE TEMPORARY TABLE postgresql_ocaml_async \
3047
(id SERIAL PRIMARY KEY, a INTEGER NOT NULL, b TEXT NOT NULL)";
3148
assert ((fetch_single_result c)#status = Command_ok);
49+
50+
(* Populate using a prepared statement. *)
3251
c#send_prepare "test_ins"
3352
"INSERT INTO postgresql_ocaml_async (a, b) VALUES ($1, $2)";
3453
assert ((fetch_single_result c)#status = Command_ok);
3554
c#send_query_prepared ~params:[|"2"; "two"|] "test_ins";
3655
assert ((fetch_single_result c)#status = Command_ok);
3756
c#send_query_prepared ~params:[|"3"; "three"|] "test_ins";
3857
assert ((fetch_single_result c)#status = Command_ok);
58+
59+
(* Prepare a select statement. *)
3960
c#send_prepare "test_sel" "SELECT * FROM postgresql_ocaml_async";
4061
assert ((fetch_single_result c)#status = Command_ok);
62+
63+
(* Describe it. *)
4164
c#send_describe_prepared "test_sel";
4265
let r = fetch_single_result c in
4366
assert (r#status = Command_ok);
4467
assert (r#nfields = 3);
4568
assert (r#fname 0 = "id");
4669
assert (r#fname 1 = "a");
4770
assert (r#fname 2 = "b");
71+
72+
(* Run it. *)
4873
c#send_query_prepared "test_sel";
4974
let r = fetch_single_result c in
5075
assert (r#status = Tuples_ok);
@@ -55,6 +80,22 @@ let main () =
5580
(r#getvalue i 0) (r#getvalue i 1) (r#getvalue i 2)
5681
done
5782

83+
let main () =
84+
(* Async connect and test. *)
85+
let c = new connection ~conninfo:Sys.argv.(1) ~startonly:true () in
86+
finish_conn (Obj.magic c#socket) (fun () -> c#connect_poll) Polling_writing;
87+
if c#status = Bad then failwith_f "Connection failed: %s" c#error_message;
88+
assert (c#status = Ok);
89+
c#set_nonblocking true;
90+
test c;
91+
92+
(* Async reset and test again. *)
93+
if not c#reset_start then failwith_f "reset_start failed: %s" c#error_message;
94+
finish_conn (Obj.magic c#socket) (fun () -> c#reset_poll) Polling_writing;
95+
if c#status = Bad then failwith_f "Reset connection bad: %s" c#error_message;
96+
assert (c#status = Ok);
97+
test c
98+
5899
let _ =
59100
try main () with
60101
| Error e -> prerr_endline (string_of_error e)

lib/postgresql.ml

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,20 @@ let () =
238238
Callback.register "Postgresql.null" null;
239239
init ()
240240

241-
type connection_status = Ok | Bad
241+
type connection_status =
242+
| Ok | Bad
243+
| Connection_started
244+
| Connection_made
245+
| Connection_awaiting_response
246+
| Connection_auth_ok
247+
| Connection_setenv
248+
| Connection_ssl_startup
249+
250+
type polling_status =
251+
| Polling_failed
252+
| Polling_reading
253+
| Polling_writing
254+
| Polling_ok
242255

243256
type conninfo_option =
244257
{
@@ -307,7 +320,7 @@ module Stub = struct
307320
type result
308321

309322
external conn_isnull : connection -> bool = "PQconn_isnull" "noalloc"
310-
external connect : string -> connection = "PQconnectdb_stub"
323+
external connect : string -> bool -> connection = "PQconnectdb_stub"
311324
external finish : connection -> unit = "PQfinish_stub"
312325
external reset : connection -> unit = "PQreset_stub"
313326

@@ -383,6 +396,15 @@ module Stub = struct
383396

384397
(* Asynchronous Query Processing *)
385398

399+
external connect_poll :
400+
connection -> polling_status = "PQconnectPoll_stub" "noalloc"
401+
402+
external reset_start :
403+
connection -> bool = "PQresetStart_stub" "noalloc"
404+
405+
external reset_poll :
406+
connection -> polling_status = "PQresetPoll_stub" "noalloc"
407+
386408
external set_nonblocking :
387409
connection -> bool -> int = "PQsetnonblocking_stub" "noalloc"
388410

@@ -613,7 +635,7 @@ let protectx ~f x ~(finally : 'a -> unit) =
613635
res
614636

615637
class connection ?host ?hostaddr ?port ?dbname ?user ?password ?options ?tty
616-
?requiressl ?conninfo =
638+
?requiressl ?conninfo ?(startonly = false) =
617639

618640
let conn_info =
619641
match conninfo with
@@ -642,9 +664,9 @@ class connection ?host ?hostaddr ?port ?dbname ?user ?password ?options ?tty
642664
Buffer.contents b in
643665

644666
fun () ->
645-
let my_conn = Stub.connect conn_info in
667+
let my_conn = Stub.connect conn_info startonly in
646668
let () =
647-
if Stub.connection_status my_conn <> Ok then (
669+
if Stub.connection_status my_conn = Bad then (
648670
let s = Stub.error_message my_conn in
649671
Stub.finish my_conn;
650672
raise (Error (Connection_failure s)))
@@ -903,6 +925,10 @@ object (self)
903925

904926
(* Asynchronous operations and non blocking mode *)
905927

928+
method connect_poll = wrap_conn Stub.connect_poll
929+
method reset_start = wrap_conn Stub.reset_start
930+
method reset_poll = wrap_conn Stub.reset_poll
931+
906932
method set_nonblocking b =
907933
wrap_conn (fun conn ->
908934
if Stub.set_nonblocking conn b <> 0 then signal_error conn)

lib/postgresql.mli

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,22 @@ end
361361
(** {6 Handling database connections} *)
362362

363363
(** Status of a connection *)
364-
type connection_status = Ok | Bad
364+
type connection_status =
365+
| Ok | Bad
366+
(* Non-blocking: *)
367+
| Connection_started
368+
| Connection_made
369+
| Connection_awaiting_response
370+
| Connection_auth_ok
371+
| Connection_setenv
372+
| Connection_ssl_startup
373+
374+
(** Polling status used while establishing a connection asynchronously. *)
375+
type polling_status =
376+
| Polling_failed
377+
| Polling_reading
378+
| Polling_writing
379+
| Polling_ok
365380

366381
(** Record of connection options *)
367382
type conninfo_option =
@@ -383,6 +398,10 @@ external conndefaults : unit -> conninfo_option array = "PQconndefaults_stub"
383398
384399
When [conninfo] is given, it will be used instead of all other
385400
optional arguments.
401+
402+
@param startonly If true, initiate a non-blocking connect procedure, which
403+
involves cooperative calls to {!connect_poll} before the connection is
404+
usable.
386405
*)
387406
class connection :
388407
?host : string -> (* Default: none *)
@@ -395,6 +414,7 @@ class connection :
395414
?tty : string -> (* Default: none *)
396415
?requiressl : string -> (* Default: none *)
397416
?conninfo : string -> (* Default: none *)
417+
?startonly : bool -> (* Default: false *)
398418
unit ->
399419
(** @raise Error if there is a connection failure. *)
400420
object
@@ -723,6 +743,33 @@ object
723743

724744
(** Asynchronous operations and non blocking mode *)
725745

746+
method connect_poll : polling_status
747+
(** After creating a connection with [~startonly:true], {!connect_poll}
748+
must be called a number of times before the connection can be used. The
749+
precise procedure is described in libpq manual, but the following code
750+
should capture the idea, assuming monadic concurrency primitives
751+
[return] and [>>=] along with polling functions [wait_for_read] and
752+
[wait_for_write]:
753+
{[
754+
let my_async_connect () =
755+
let c = new connection () in
756+
let rec establish_connection = function
757+
| Polling_failed | Polling_ok -> return c
758+
| Polling_reading -> wait_for_read c#socket >>= fun () ->
759+
establish_connection c#connect_poll
760+
| Polling_writing -> wait_for_write c#socket >>= fun () ->
761+
establish_connection c#connect_poll in
762+
establish_connection Polling_writing
763+
]}
764+
See also [examples/async.ml]. *)
765+
766+
method reset_start : bool
767+
(** An asynchronous variant of {!reset}. Use {!reset_poll} to
768+
finish re-establishing the connection. *)
769+
770+
method reset_poll : polling_status
771+
(** Used analogously to {!connect_poll} after calling {!reset_start} *)
772+
726773
method set_nonblocking : bool -> unit
727774
(** [set_nonblocking b] sets state of the connection to nonblocking if
728775
[b] is true and to blocking otherwise.

lib/postgresql_stubs.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,6 @@ static inline void np_decr_refcount(np_callback *c)
209209
/* Database Connection Functions */
210210

211211
/* Missing:
212-
PQconnectStart, PQconnectPoll, PQresetStart, PQresetPoll:
213-
for non-blocking connection
214212
PQgetssl: the SSL structure used in the connection
215213
*/
216214

@@ -244,21 +242,26 @@ static inline void free_conn(value v_conn)
244242
}
245243
}
246244

247-
CAMLprim value PQconnectdb_stub(value v_conn_info)
245+
CAMLprim value PQconnectdb_stub(value v_conn_info, value v_startonly)
248246
{
249247
PGconn *conn;
250248
value v_conn;
251-
252-
size_t len = caml_string_length(v_conn_info) + 1;
253249
PGcancel *cancel;
254-
char *conn_info = caml_stat_alloc(len);
255-
memcpy(conn_info, String_val(v_conn_info), len);
256250

257-
caml_enter_blocking_section();
258-
conn = PQconnectdb(conn_info);
251+
if (Bool_val(v_startonly)) {
252+
conn = PQconnectStart(String_val(v_conn_info));
259253
cancel = PQgetCancel(conn);
260-
free(conn_info);
261-
caml_leave_blocking_section();
254+
}
255+
else {
256+
size_t len = caml_string_length(v_conn_info) + 1;
257+
char *conn_info = caml_stat_alloc(len);
258+
memcpy(conn_info, String_val(v_conn_info), len);
259+
caml_enter_blocking_section();
260+
conn = PQconnectdb(conn_info);
261+
cancel = PQgetCancel(conn);
262+
free(conn_info);
263+
caml_leave_blocking_section();
264+
}
262265

263266
/* One may raise this 30 to 500 for instance if the program takes
264267
responsibility of closing connections */
@@ -349,6 +352,9 @@ static inline value make_string(const char *s)
349352
return ret(fun(get_conn(v_conn))); \
350353
}
351354

355+
conn_info(PQconnectPoll, Val_int)
356+
conn_info(PQresetStart, Val_int)
357+
conn_info(PQresetPoll, Val_int)
352358
conn_info(PQdb, make_string)
353359
conn_info(PQuser, make_string)
354360
conn_info(PQpass, make_string)

0 commit comments

Comments
 (0)