Skip to content

Commit

Permalink
httpun server (no websocket yet)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxtori committed Dec 12, 2024
1 parent 5024179 commit 5d87728
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 240 deletions.
5 changes: 4 additions & 1 deletion dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
ppxlib
ppx_deriving_encoding
digestif
yaml)
yaml
httpun-lwt-unix
httpun-ws-lwt-unix
)
(conflicts
(js_of_ocaml-lwt (< 4.0.0))
(cohttp-lwt-unix (< 5.0.0))
Expand Down
1 change: 1 addition & 0 deletions src/server/default/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
(select ezServer.ml from
(ezAPIServerCohttp -> ezServer.cohttp.ml)
(ezAPIServerHttpAf -> ezServer.httpaf.ml)
(ezAPIServerHttpun -> ezServer.httpun.ml)
(-> ezServer.dummy.ml))))
11 changes: 11 additions & 0 deletions src/server/default/ezServer.httpun.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(**************************************************************************)
(* *)
(* Copyright 2018-2025 OCamlPro *)
(* *)
(* All rights reserved. This file is distributed under the terms of the *)
(* GNU Lesser General Public License version 2.1, with the special *)
(* exception on linking described in the file LICENSE. *)
(* *)
(**************************************************************************)

include EzAPIServerHttpun
8 changes: 8 additions & 0 deletions src/server/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@
(geoip -> ip.geoip.ml)
(-> ip.dummy.ml))))

(library
(name server_common)
(modules server_common)
(optional)
(package ez_api)
(libraries ezAPIServerUtils lwt.unix bigstringaf)
(foreign_stubs (language c) (names rlimit_no_file)))

(library
(name ezOpenAPI)
(public_name ez_api.openAPI)
Expand Down
3 changes: 1 addition & 2 deletions src/server/httpaf/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
(public_name ez_api.server_httpaf)
(optional)
(modules ezAPIServerHttpAf)
(libraries httpaf-lwt-unix wsHttpaf)
(foreign_stubs (language c) (names rlimit_no_file)))
(libraries httpaf-lwt-unix wsHttpaf server_common))

(library
(name ezServerHttpaf)
Expand Down
250 changes: 13 additions & 237 deletions src/server/httpaf/ezAPIServerHttpAf.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(**************************************************************************)
(* *)
(* Copyright 2018-2023 OCamlPro *)
(* Copyright 2018-2025 OCamlPro *)
(* *)
(* All rights reserved. This file is distributed under the terms of the *)
(* GNU Lesser General Public License version 2.1, with the special *)
Expand All @@ -9,197 +9,15 @@
(**************************************************************************)

open Lwt
open EzAPI
open EzAPIServerUtils
open Httpaf

external limit_open_file : unit -> int = "rlimit_no_file_c"

type lwt_server = {
shutdown : unit Lwt.t Lazy.t;
}

let set_debug () = ()

let () =
Lwt.async_exception_hook := (fun exn -> EzDebug.printf "Exception %s" (Printexc.to_string exn))

let nb_live_connections = ref 0

let listen_cond = Lwt_condition.create ()

let incr_connections () =
incr nb_live_connections

let decr_connections nb_max =
if !nb_live_connections = nb_max - 1 then
Lwt_condition.signal listen_cond () ;
decr nb_live_connections

let close_socket fd =
Lwt.finalize
(fun () ->
Lwt.catch
(fun () ->
Lwt_unix.shutdown fd Unix.SHUTDOWN_ALL;
Lwt.return_unit)
(function
(* Occurs if the peer closes the connection first. *)
| Unix.Unix_error (Unix.ENOTCONN, _, _) -> Lwt.return_unit
| exn -> Lwt.fail exn) [@ocaml.warning "-4"])
(fun () ->
Lwt_unix.close fd)

(* There are several variants of establish_server that have accumulated over the
years in Lwt_io. This is their underlying implementation. The functions
exposed in the API are various wrappers around this one. *)
let establish_server_generic
bind_function
?fd:preexisting_socket_for_listening
?(backlog = 5)
listening_address
connection_handler_callback
nb_max_connections =

let listening_socket =
match preexisting_socket_for_listening with
| None ->
Lwt_unix.socket
(Unix.domain_of_sockaddr listening_address) Unix.SOCK_STREAM 0
| Some socket ->
socket
in
Lwt_unix.setsockopt listening_socket Unix.SO_REUSEADDR true;

(* This promise gets resolved with `Should_stop when the user calls
Lwt_io.shutdown_server. This begins the shutdown procedure. *)
let should_stop, notify_should_stop =
Lwt.wait () in

(* Some time after Lwt_io.shutdown_server is called, this function
establish_server_generic will actually close the listening socket. At that
point, this promise is resolved. This ends the shutdown procedure. *)
let wait_until_listening_socket_closed, notify_listening_socket_closed =
Lwt.wait () in

let rec accept_loop () =
let try_to_accept =
incr_connections () ;
Lwt_unix.accept listening_socket >|= fun x ->
`Accepted x
in
Lwt.pick [ try_to_accept; should_stop ] >>= function
| `Accepted (client_socket, client_address) ->
begin
try Lwt_unix.set_close_on_exec client_socket
with Invalid_argument _ -> ()
end;
connection_handler_callback client_address client_socket;
if !nb_live_connections >= nb_max_connections then
Lwt_condition.wait listen_cond >>= fun () ->
accept_loop ()
else accept_loop ()
| `Should_stop ->
Lwt_unix.close listening_socket >|= fun () ->
begin match listening_address with
| Unix.ADDR_UNIX path when path <> "" && path.[0] <> '\x00' ->
Unix.unlink path
| _ -> ()
end [@ocaml.warning "-4"];
Lwt.wakeup_later notify_listening_socket_closed ()
in

let server =
{shutdown =
lazy begin
Lwt.wakeup_later notify_should_stop `Should_stop;
wait_until_listening_socket_closed
end}
in

(* Actually start the server. *)
let server_has_started =
bind_function listening_socket listening_address >>= fun () ->
Lwt_unix.listen listening_socket backlog;

Lwt.async accept_loop;

Lwt.return_unit
in
let mk_uri { Request.meth ; Request.target ; Request.headers ; _ } =
Server_common.mk_uri ~meth ~target ~header:(Headers.get headers)

server, server_has_started

let establish_server_with_client_socket
?server_fd ?backlog ?(no_close = false) ~nb_max_connections sockaddr f =
let handler client_address client_socket =
Lwt.async @@ fun () ->
Lwt.catch
(fun () -> f client_address client_socket)
(fun exn ->
!Lwt.async_exception_hook exn;
Lwt.return_unit)
>>= fun () ->
decr_connections nb_max_connections ;
if no_close then
Lwt.return_unit
else if Lwt_unix.state client_socket = Lwt_unix.Closed then
Lwt.return_unit
else
Lwt.catch
(fun () -> close_socket client_socket)
(fun exn ->
!Lwt.async_exception_hook exn;
Lwt.return_unit) in

let server, server_started =
establish_server_generic
Lwt_unix.bind ?fd:server_fd ?backlog sockaddr handler nb_max_connections in
server_started >|= fun () ->
server

let mk_uri { Request.meth ; Request.target ; Request.headers ; _ } =
match target with
| "*" ->
begin match Headers.get headers "host" with
| None -> Uri.of_string ""
| Some host ->
let host_uri = Uri.of_string ("//"^host) in
let uri = Uri.(with_host (of_string "") (host host_uri)) in
Uri.(with_port uri (port host_uri))
end
| authority when meth = `CONNECT -> Uri.of_string ("//" ^ authority)
| path ->
let uri = Uri.of_string path in
begin match Uri.scheme uri with
| Some _ -> (* we have an absoluteURI *)
Uri.(match path uri with "" -> with_path uri "/" | _ -> uri)
| None ->
let empty = Uri.of_string "" in
let empty_base = Uri.of_string "///" in
let pqs = match Stringext.split ~max:2 path ~on:'?' with
| [] -> empty_base
| [path] ->
Uri.resolve "http" empty_base (Uri.with_path empty path)
| path::qs::_ ->
let path_base =
Uri.resolve "http" empty_base (Uri.with_path empty path)
in
Uri.with_query path_base (Uri.query_of_encoded qs)
in
let uri = match Headers.get headers "host" with
| None -> Uri.(with_scheme (with_host pqs None) None)
| Some host ->
let host_uri = Uri.of_string ("//"^host) in
let uri = Uri.with_host pqs (Uri.host host_uri) in
Uri.with_port uri (Uri.port host_uri)
in
uri
end

let meth_from_httpaf req = match req.Request.meth with
| `Other "patch" | `Other "PATCH" | `Other "Patch" -> Some `PATCH
| `GET | `PUT | `OPTIONS | `POST | `DELETE | `HEAD as m -> Some m
| _ -> None
let meth_from_httpaf req = Server_common.meth_from_ext req.Request.meth

let headers_from_httpaf req =
Headers.fold ~f:(fun k v acc -> StringMap.add (String.lowercase_ascii k) [v] acc)
Expand All @@ -209,39 +27,17 @@ let version_from_httpaf req =
if req.Request.version.Version.minor = 0 then `HTTP_1_0
else `HTTP_1_1

let read_body body =
let w, n = Lwt.wait () in
let body_str = ref "" in
let on_eof () = Lwt.wakeup n !body_str in
let rec on_read bs ~off ~len =
body_str := !body_str ^ Bigstringaf.substring bs ~off ~len;
Body.schedule_read body ~on_read ~on_eof in
Body.schedule_read
body
~on_eof
~on_read;
w
let read_body body = Server_common.read_body ~read:Body.schedule_read body

let debug_httpaf req =
debug "[%t] REQUEST: %s %S" pp_time
(Method.to_string req.Request.meth)
req.Request.target;
debugf ~v:1 (fun () ->
List.iter (fun (name, value) -> EzDebug.printf " %s: %s" name value)
(Headers.to_list req.Request.headers))
let meth = Method.to_string req.Request.meth in
let headers = Headers.to_list req.Request.headers in
Server_common.debug_http_ext ~meth ~target:req.Request.target ~headers

let register_ip req time = function
| Unix.ADDR_INET (iaddr, _port) ->
let ip = Unix.string_of_inet_addr iaddr in
let ip = match Headers.get (req.Request.headers) "x-forwarded-for" with
| None -> ip
| Some ip -> ip
in
Ip.register time ip
| Unix.ADDR_UNIX _ -> ()
let register_ip req time addr = Server_common.register_ip ~header:(Headers.get req.Request.headers) time addr

let connection_handler ?allow_origin ?allow_headers ?allow_methods
?allow_credentials ?catch s sockaddr fd =
let connection_handler ?catch ?allow_origin ?allow_headers ?allow_methods
?allow_credentials s sockaddr fd =
let request_handler sockaddr reqd =
let req = Reqd.request reqd in
let time = GMTime.time () in
Expand Down Expand Up @@ -310,26 +106,6 @@ let connection_handler ?allow_origin ?allow_headers ?allow_methods
sockaddr
fd

let create_server ?catch ?allow_origin ?allow_headers ?allow_methods ?allow_credentials
~max_connections server_port server_kind =
let s = { server_port; server_kind } in
Timings.init (GMTime.time ()) @@ Doc.nservices ();
ignore @@ Doc.all_services_registered ();
let listen_address = Unix.(ADDR_INET (inet_addr_any, server_port)) in
EzDebug.printf "Starting HTTPAF server (port=%d, connection=%d)" server_port max_connections;
establish_server_with_client_socket
~nb_max_connections:max_connections
listen_address (fun sockaddr fd ->
connection_handler ?catch ?allow_origin ?allow_headers ?allow_methods
?allow_credentials s sockaddr fd) >>= fun _server ->
Lwt.return_unit

let server ?catch ?allow_origin ?allow_headers ?allow_methods ?allow_credentials servers =
let max_connections =
let n = List.length servers in
if n = 0 then 0 else limit_open_file () / 2 / n in
let waiter = fst @@ Lwt.wait () in
Lwt.join (List.map (fun (port,kind) ->
create_server ?catch ?allow_origin ?allow_headers ?allow_methods
?allow_credentials ~max_connections port kind) servers) >>= fun () ->
waiter (* keep the server running *)
Server_common.server ~name:"HTTPAF" ?catch ?allow_origin ?allow_headers
?allow_methods ?allow_credentials connection_handler servers
14 changes: 14 additions & 0 deletions src/server/httpun/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
(library
(name ezAPIServerHttpun)
(public_name ez_api.server_httpun)
(optional)
(modules ezAPIServerHttpun)
(libraries server_common httpun-lwt-unix wsHttpun))

(library
(name ezServerHttpun)
(public_name ez_api.iserver_httpun)
(optional)
(implements ezServer)
(modules ezServer)
(libraries ezAPIServerHttpun))
Loading

0 comments on commit 5d87728

Please sign in to comment.