|
27 | 27 | (remove-watch rt/connections watch-key)
|
28 | 28 | result)))
|
29 | 29 |
|
| 30 | +(defn- request! [session-id message] |
| 31 | + (if-let [send! (get-connection session-id)] |
| 32 | + (let [id (rt/next-id) |
| 33 | + response (promise) |
| 34 | + message (assoc message :portal.rpc/id id)] |
| 35 | + (swap! rt/pending-requests assoc id response) |
| 36 | + (send! message) |
| 37 | + (let [response (deref response timeout ::timeout)] |
| 38 | + (swap! rt/pending-requests dissoc id) |
| 39 | + (if-not (= response ::timeout) |
| 40 | + response |
| 41 | + (throw (ex-info "Portal request timeout" |
| 42 | + {:session-id session-id :message message}))))) |
| 43 | + (throw (ex-info "No such portal session" |
| 44 | + {:session-id session-id :message message})))) |
| 45 | + |
| 46 | +(defn- broadcast! [message] |
| 47 | + (when-let [sessions (keys @rt/connections)] |
| 48 | + (let [response (promise)] |
| 49 | + (doseq [session-id sessions] |
| 50 | + (future |
| 51 | + (try |
| 52 | + (deliver response (request! session-id message)) |
| 53 | + (catch Exception ex |
| 54 | + (when (-> ex ex-data ::timeout) |
| 55 | + (swap! rt/connections dissoc session-id)) |
| 56 | + (deliver response ex))))) |
| 57 | + (let [response (deref response timeout ::timeout)] |
| 58 | + (cond |
| 59 | + (instance? Exception response) |
| 60 | + (throw response) |
| 61 | + (not= response ::timeout) |
| 62 | + response |
| 63 | + :else |
| 64 | + (throw (ex-info |
| 65 | + "Portal request timeout" |
| 66 | + {::timeout true |
| 67 | + :session-id :all |
| 68 | + :message message}))))))) |
| 69 | + |
30 | 70 | (defn request
|
31 | 71 | ([message]
|
32 |
| - (last |
33 |
| - (for [session-id (keys @rt/connections)] |
34 |
| - (request session-id message)))) |
| 72 | + (broadcast! message)) |
35 | 73 | ([session-id message]
|
36 |
| - (if-let [send! (get-connection session-id)] |
37 |
| - (let [id (rt/next-id) |
38 |
| - response (promise) |
39 |
| - message (assoc message :portal.rpc/id id)] |
40 |
| - (swap! rt/pending-requests assoc id response) |
41 |
| - (send! message) |
42 |
| - (let [response (deref response timeout ::timeout)] |
43 |
| - (swap! rt/pending-requests dissoc id) |
44 |
| - (if-not (= response ::timeout) |
45 |
| - response |
46 |
| - (throw (ex-info "Portal request timeout" |
47 |
| - {:session-id session-id :message message}))))) |
48 |
| - (throw (ex-info "No such portal session" |
49 |
| - {:session-id session-id :message message}))))) |
| 74 | + (request! session-id message))) |
50 | 75 |
|
51 | 76 | (defn- push-state [session-id new-value]
|
52 | 77 | (request session-id {:op :portal.rpc/push-state :state new-value})
|
|
0 commit comments