@@ -147,10 +147,13 @@ def forward(target_connection, operation)
147147 def self . dispatch ( connection , target , id , message )
148148 Async do
149149 call = self . new ( connection , id , message )
150+ # Track the call in the connection's calls hash:
150151 connection . calls [ id ] = call
151152
153+ # Dispatch the call to the target (synchronously):
152154 target . dispatch ( call )
153155
156+ # Stream responses back to the connection (asynchronously):
154157 while response = call . pop
155158 connection . write ( id : id , **response )
156159 end
@@ -160,6 +163,9 @@ def self.dispatch(connection, target, id, message)
160163
161164 # If the queue is closed, we don't need to send a finished message:
162165 unless call . closed?
166+ # Ensure the call is closed, to prevent messages being buffered:
167+ call . close
168+
163169 # If the above write failed, this is likely to fail too, and we can safely ignore it.
164170 connection . write ( id : id , finished : true ) rescue nil
165171 end
@@ -276,16 +282,19 @@ def call(...)
276282 #
277283 # @parameter target [Dispatchable] The target to dispatch calls to.
278284 def run ( target )
285+ # Process incoming messages from the connection:
279286 self . each do |message |
287+ # If the message has an ID, it is a response to a call:
280288 if id = message . delete ( :id )
289+ # Find the call in the connection's calls hash:
281290 if call = @calls [ id ]
282- # Response to a call:
291+ # Enqueue the response for the call:
283292 call . push ( **message )
284293 elsif message . key? ( :do )
285- # Incoming call:
294+ # Otherwise, if we couldn't find an existing call, it must be a new call:
286295 Call . dispatch ( self , target , id , message )
287296 else
288- # Likely a response to a timed-out call, ignore it:
297+ # Finally, if none of the above, it is likely a response to a timed-out call, so ignore it:
289298 Console . debug ( self , "Ignoring message:" , message )
290299 end
291300 else
0 commit comments