|  | 
| 1 | 1 | (ns hello.handler | 
| 2 | 2 |   (:require | 
| 3 |  | -   [aleph.http              :as http] | 
| 4 |  | -   [aleph.netty             :as netty] | 
| 5 |  | -   [byte-streams            :as bs] | 
| 6 |  | -   [clj-async-profiler.core :as prof] | 
| 7 |  | -   [hiccup.page             :as hp] | 
| 8 |  | -   [hiccup.util             :as hu] | 
| 9 |  | -   [jsonista.core           :as json] | 
| 10 |  | -   [manifold.deferred       :as d] | 
| 11 |  | -   [porsas.async            :as async]) | 
| 12 |  | -  (:import (clojure.lang IDeref) | 
| 13 |  | -           (io.netty.channel ChannelOption) | 
|  | 3 | +    [aleph.http :as http] | 
|  | 4 | +    [aleph.netty :as netty] | 
|  | 5 | +    [hiccup.page :as hp] | 
|  | 6 | +    [hiccup.util :as hu] | 
|  | 7 | +    [jsonista.core :as json] | 
|  | 8 | +    [manifold.deferred :as d] | 
|  | 9 | +    [next.jdbc :as jdbc] | 
|  | 10 | +    [next.jdbc.connection :as connection] | 
|  | 11 | +    [next.jdbc.result-set :as rs]) | 
|  | 12 | + | 
|  | 13 | +  (:import (com.zaxxer.hikari HikariDataSource) | 
| 14 | 14 |            (io.netty.buffer PooledByteBufAllocator) | 
| 15 |  | -           (java.util.function Supplier) | 
| 16 |  | -           (java.util.concurrent ThreadLocalRandom) | 
| 17 |  | -           (porsas.async Context)) | 
|  | 15 | +           (io.netty.channel ChannelOption) | 
|  | 16 | +           (java.util.concurrent ThreadLocalRandom)) | 
| 18 | 17 |   (:gen-class)) | 
| 19 | 18 | 
 | 
|  | 19 | +(def jdbc-opts {:builder-fn rs/as-unqualified-maps}) | 
|  | 20 | + | 
|  | 21 | +(def db-spec | 
|  | 22 | +  {:jdbcUrl "jdbc:postgresql://tfb-database/hello_world?user=benchmarkdbuser&password=benchmarkdbpass"}) | 
|  | 23 | + | 
|  | 24 | +(def datasource | 
|  | 25 | +  (connection/->pool HikariDataSource db-spec)) | 
|  | 26 | + | 
| 20 | 27 | (def plaintext-response | 
| 21 |  | -  {:status 200 | 
|  | 28 | +  {:status  200 | 
| 22 | 29 |    :headers {"Content-Type" "text/plain"} | 
| 23 |  | -   :body (bs/to-byte-array "Hello, World!")}) | 
|  | 30 | +   :body    (.getBytes "Hello, World!")}) | 
| 24 | 31 | 
 | 
| 25 | 32 | (def json-response | 
| 26 |  | -  {:status 200 | 
|  | 33 | +  {:status  200 | 
| 27 | 34 |    :headers {"Content-Type" "application/json"}}) | 
| 28 | 35 | 
 | 
| 29 | 36 | (def html-response | 
| 30 |  | -  {:status 200 | 
|  | 37 | +  {:status  200 | 
| 31 | 38 |    :headers {"Content-Type" "text/html; charset=utf-8"}}) | 
| 32 | 39 | 
 | 
| 33 |  | -(def db-spec | 
| 34 |  | -  {:uri "postgresql://tfb-database:5432/hello_world" | 
| 35 |  | -   :user "benchmarkdbuser" | 
| 36 |  | -   :password "benchmarkdbpass" | 
| 37 |  | -   :size 1}) | 
| 38 |  | - | 
| 39 |  | -(defmacro thread-local [& body] | 
| 40 |  | -  `(let [tl# (ThreadLocal/withInitial (reify Supplier (get [_] ~@body)))] | 
| 41 |  | -     (reify IDeref (deref [_] (.get tl#))))) | 
| 42 | 40 | 
 | 
| 43 |  | -(def pool | 
| 44 |  | -  "PostgreSQL pool of connections (`PgPool`)." | 
| 45 |  | -  (thread-local (async/pool db-spec))) | 
| 46 |  | - | 
| 47 |  | -(defn random | 
|  | 41 | +(defn- random | 
| 48 | 42 |   "Generate a random number between 1 and 10'000." | 
| 49 | 43 |   [] | 
| 50 | 44 |   (unchecked-inc (.nextInt (ThreadLocalRandom/current) 10000))) | 
| 51 | 45 | 
 | 
| 52 |  | -(defn sanitize-queries-param | 
| 53 |  | -  "Sanitizes the `queries` parameter. Clamps the value between 1 and 500. | 
| 54 |  | -  Invalid (string) values become 1." | 
|  | 46 | +(defn- sanitize-queries-param | 
| 55 | 47 |   [request] | 
| 56 | 48 |   (let [queries (-> request | 
| 57 | 49 |                     :query-string | 
| 58 | 50 |                     (subs 8)) | 
| 59 | 51 |         n (try (Integer/parseInt queries) | 
| 60 |  | -               (catch Exception _ 1))] ; default to 1 on parse failure | 
|  | 52 | +               (catch Exception _ 1))]                      ; default to 1 on parse failure | 
| 61 | 53 |     (cond | 
| 62 | 54 |       (< n 1) 1 | 
| 63 | 55 |       (> n 500) 500 | 
| 64 | 56 |       :else n))) | 
| 65 | 57 | 
 | 
| 66 |  | -(def ^Context | 
| 67 |  | -  query-mapper | 
| 68 |  | -  "Map each row into a record." | 
| 69 |  | -  (async/context {:row (async/rs->compiled-record)})) | 
| 70 | 58 | 
 | 
| 71 |  | -(defn query-one-random-world | 
| 72 |  | -  "Query a random world on the database. | 
| 73 |  | -  Return a `CompletableFuture`." | 
| 74 |  | -  [] | 
| 75 |  | -  (async/query-one query-mapper | 
| 76 |  | -                   @pool | 
| 77 |  | -                   ["SELECT id, randomnumber FROM world WHERE id=$1" (random)])) | 
| 78 |  | - | 
| 79 |  | -(defn update-world | 
| 80 |  | -  "Update a world on the database. | 
| 81 |  | -  Return a `CompletableFuture`." | 
| 82 |  | -  [{:keys [randomNumber id]}] | 
| 83 |  | -  (async/query @pool | 
| 84 |  | -               ["UPDATE world SET randomnumber=$1 WHERE id=$2" randomNumber id])) | 
| 85 |  | - | 
| 86 |  | -(defn run-queries | 
|  | 59 | +(defn- query-one-random-world [] | 
|  | 60 | +  (jdbc/execute-one! datasource | 
|  | 61 | +                     ["select * from \"World\" where id = ?;" (random)] | 
|  | 62 | +                     jdbc-opts)) | 
|  | 63 | + | 
|  | 64 | +(defn- update-world | 
|  | 65 | +  [{:keys [randomnumber  id]}] | 
|  | 66 | +  (jdbc/execute-one! datasource | 
|  | 67 | +                     ["update \"World\" set randomNumber = ? where id = ? returning *;" randomnumber id] | 
|  | 68 | +                     jdbc-opts)) | 
|  | 69 | + | 
|  | 70 | +(defn- run-queries | 
| 87 | 71 |   "Run a number of `queries` on the database to fetch a random world. | 
| 88 | 72 |   Return a `manifold.deferred`." | 
| 89 | 73 |   [queries] | 
| 90 | 74 |   (apply d/zip | 
| 91 | 75 |          (take queries | 
| 92 | 76 |                (repeatedly query-one-random-world)))) | 
| 93 | 77 | 
 | 
| 94 |  | -(defn query-fortunes | 
| 95 |  | -  "Query the fortunes on database. | 
| 96 |  | -  Return a `CompletableFuture`." | 
| 97 |  | -  [] | 
| 98 |  | -  (async/query query-mapper @pool ["SELECT id, message from FORTUNE"])) | 
| 99 | 78 | 
 | 
| 100 |  | -(defn get-fortunes | 
|  | 79 | +(defn query-fortunes [] | 
|  | 80 | +  (jdbc/execute! datasource | 
|  | 81 | +                 ["select * from \"Fortune\";"] | 
|  | 82 | +                 jdbc-opts)) | 
|  | 83 | + | 
|  | 84 | +(defn- get-fortunes | 
| 101 | 85 |   "Fetch the full list of Fortunes from the database, sort them by the fortune | 
| 102 | 86 |   message text. | 
| 103 | 87 |   Return a `CompletableFuture` with the results." | 
|  | 
| 106 | 90 |            (fn [fortunes] | 
| 107 | 91 |              (sort-by :message | 
| 108 | 92 |                       (conj fortunes | 
| 109 |  | -                            {:id 0 | 
|  | 93 | +                            {:id      0 | 
| 110 | 94 |                              :message "Additional fortune added at request time."}))))) | 
| 111 | 95 | 
 | 
| 112 |  | -(defn update-and-persist | 
| 113 |  | -  "Fetch a number of `queries` random world from the database. | 
| 114 |  | -  Compute a new `randomNumber` for each of them a return a `CompletableFuture` | 
| 115 |  | -  with the updated worlds." | 
| 116 |  | -  [queries] | 
|  | 96 | +(defn- update-and-persist [queries] | 
| 117 | 97 |   (d/chain' (run-queries queries) | 
| 118 | 98 |             (fn [worlds] | 
| 119 |  | -              (let [worlds' (mapv #(assoc % :randomNumber (random)) worlds)] | 
|  | 99 | +              (let [worlds' (mapv #(assoc % :randomnumber (random)) worlds)] | 
| 120 | 100 |                 (d/chain' (apply d/zip (mapv update-world worlds')) | 
| 121 | 101 |                           (fn [_] worlds')))))) | 
| 122 | 102 | 
 | 
| 123 |  | -(defn fortunes-hiccup | 
|  | 103 | +(defn- fortunes-hiccup | 
| 124 | 104 |   "Render the given fortunes to simple HTML using Hiccup." | 
| 125 | 105 |   [fortunes] | 
| 126 | 106 |   (hp/html5 | 
| 127 |  | -   [:head | 
| 128 |  | -    [:title "Fortunes"]] | 
| 129 |  | -   [:body | 
| 130 |  | -    [:table | 
| 131 |  | -     [:tr | 
| 132 |  | -      [:th "id"] | 
| 133 |  | -      [:th "message"]] | 
| 134 |  | -     (for [x fortunes] | 
| 135 |  | -       [:tr | 
| 136 |  | -        [:td (:id x)] | 
| 137 |  | -        [:td (hu/escape-html (:message x))]])]])) | 
|  | 107 | +    [:head | 
|  | 108 | +     [:title "Fortunes"]] | 
|  | 109 | +    [:body | 
|  | 110 | +     [:table | 
|  | 111 | +      [:tr | 
|  | 112 | +       [:th "id"] | 
|  | 113 | +       [:th "message"]] | 
|  | 114 | +      (for [x fortunes] | 
|  | 115 | +        [:tr | 
|  | 116 | +         [:td (:id x)] | 
|  | 117 | +         [:td (hu/escape-html (:message x))]])]])) | 
| 138 | 118 | 
 | 
| 139 | 119 | (defn handler | 
| 140 | 120 |   "Ring handler representing the different tests." | 
| 141 | 121 |   [req] | 
| 142 | 122 |   (let [uri (:uri req)] | 
| 143 | 123 |     (cond | 
| 144 | 124 |       (.equals "/plaintext" uri) plaintext-response | 
| 145 |  | -      (.equals "/json" uri)      (assoc json-response | 
| 146 |  | -                                        :body (json/write-value-as-bytes {:message "Hello, World!"})) | 
| 147 |  | -      (.equals "/db" uri)        (-> (query-one-random-world) | 
| 148 |  | -                                     (d/chain (fn [world] | 
| 149 |  | -                                                (assoc json-response | 
| 150 |  | -                                                       :body (json/write-value-as-bytes world))))) | 
| 151 |  | -      (.equals "/queries" uri)   (-> (sanitize-queries-param req) | 
| 152 |  | -                                     (run-queries) | 
| 153 |  | -                                     (d/chain (fn [worlds] | 
| 154 |  | -                                                (assoc json-response | 
| 155 |  | -                                                       :body (json/write-value-as-bytes worlds))))) | 
| 156 |  | -      (.equals "/fortunes" uri)  (d/chain' (get-fortunes) | 
| 157 |  | -                                           fortunes-hiccup | 
| 158 |  | -                                           (fn [body] | 
| 159 |  | -                                             (assoc html-response :body body))) | 
| 160 |  | -      (.equals "/updates" uri)   (-> (sanitize-queries-param req) | 
| 161 |  | -                                     (update-and-persist) | 
| 162 |  | -                                     (d/chain (fn [worlds] | 
| 163 |  | -                                                (assoc json-response | 
| 164 |  | -                                                       :body (json/write-value-as-bytes worlds))))) | 
| 165 |  | -      :else {:status 404}))) | 
| 166 |  | - | 
| 167 |  | -;;; | 
|  | 125 | +      (.equals "/json" uri) (assoc json-response | 
|  | 126 | +                              :body (json/write-value-as-bytes {:message "Hello, World!"})) | 
|  | 127 | +      (.equals "/db" uri) (-> (query-one-random-world) | 
|  | 128 | +                              (d/chain (fn [world] | 
|  | 129 | +                                         (assoc json-response | 
|  | 130 | +                                           :body (json/write-value-as-bytes world))))) | 
|  | 131 | +      (.equals "/queries" uri) (-> (sanitize-queries-param req) | 
|  | 132 | +                                   (run-queries) | 
|  | 133 | +                                   (d/chain (fn [worlds] | 
|  | 134 | +                                              (assoc json-response | 
|  | 135 | +                                                :body (json/write-value-as-bytes worlds))))) | 
|  | 136 | +      (.equals "/fortunes" uri) (d/chain' (get-fortunes) | 
|  | 137 | +                                          fortunes-hiccup | 
|  | 138 | +                                          (fn [body] | 
|  | 139 | +                                            (assoc html-response :body body))) | 
|  | 140 | +      (.equals "/updates" uri) (-> (sanitize-queries-param req) | 
|  | 141 | +                                   (update-and-persist) | 
|  | 142 | +                                   (d/chain (fn [worlds] | 
|  | 143 | +                                              (assoc json-response | 
|  | 144 | +                                                :body (json/write-value-as-bytes worlds))))) | 
|  | 145 | +      :else {:body   "Not found" | 
|  | 146 | +             :status 404}))) | 
|  | 147 | + | 
| 168 | 148 | 
 | 
| 169 | 149 | (defn -main [& _] | 
| 170 | 150 |   (netty/leak-detector-level! :disabled) | 
| 171 |  | -  (http/start-server handler {:port 8080 | 
| 172 |  | -                              :raw-stream? true | 
| 173 |  | -                              :epoll? true | 
| 174 |  | -                              :executor :none | 
|  | 151 | +  (println "starting server on port 8080") | 
|  | 152 | +  (http/start-server handler {:port                8080 | 
|  | 153 | +                              :raw-stream?         true | 
|  | 154 | +                              :executor            :none | 
| 175 | 155 |                               :bootstrap-transform (fn [bootstrap] | 
| 176 | 156 |                                                      (.option bootstrap ChannelOption/ALLOCATOR PooledByteBufAllocator/DEFAULT) | 
| 177 | 157 |                                                      (.childOption bootstrap ChannelOption/ALLOCATOR PooledByteBufAllocator/DEFAULT)) | 
| 178 |  | -                              :pipeline-transform (fn [pipeline] | 
| 179 |  | -                                                    (.remove pipeline "continue-handler"))}) | 
| 180 |  | -  ;; Uncomment to enable async-profiler | 
| 181 |  | -  #_ | 
| 182 |  | -  (do | 
| 183 |  | -    (prof/profile-for 60 | 
| 184 |  | -                      #_ | 
| 185 |  | -                      {:transform (fn [s] | 
| 186 |  | -                                    (when-not (re-find #"(writev|__libc|epoll_wait|write|__pthread)" s) | 
| 187 |  | -                                      s))}) | 
| 188 |  | -    (prof/serve-files 8081))) | 
|  | 158 | +                              :pipeline-transform  (fn [pipeline] | 
|  | 159 | +                                                     (.remove pipeline "continue-handler"))})) | 
0 commit comments