@@ -123,7 +123,7 @@ async def resolve_scheduler_request(
123
123
results_queue : Queue [WorkerProcessResult [RequestT , ResponseT ]],
124
124
process_id : int ,
125
125
) -> WorkerProcessRequest [RequestT , ResponseT ]:
126
- request = process_request .session . get_next_request ()
126
+ request = process_request .request
127
127
timeout_time = process_request .timeout_time
128
128
queued_time = process_request .queued_time
129
129
@@ -170,22 +170,19 @@ async def resolve_scheduler_request(
170
170
)
171
171
asyncio .create_task (self .send_result (results_queue , result ))
172
172
173
- process_request .session .push_response (response )
174
173
return process_request
175
174
176
175
def process_loop_asynchronous (
177
176
self ,
178
177
queues : MPQueues [RequestT , ResponseT ],
179
178
strategy : SchedulingStrategy ,
180
179
stop_event : Event ,
181
- prioritize_sessions : bool ,
182
180
max_concurrency : int ,
183
181
process_id : int ,
184
182
num_processes : int ,
185
183
):
186
184
async def _process_runner ():
187
185
lock = asyncio .Semaphore (max_concurrency )
188
- pending_requests : list [WorkerProcessRequest [RequestT , ResponseT ]] = []
189
186
times_iter = islice (
190
187
strategy .request_times (),
191
188
process_id ,
@@ -202,50 +199,18 @@ async def _process_runner():
202
199
await asyncio .sleep (start_time - time .time () - 1 )
203
200
await lock .acquire ()
204
201
205
- process_request = None
206
202
try :
207
- process_request = (
208
- pending_requests .pop ()
209
- if pending_requests
210
- else queues .requests .get_nowait ()
211
- )
203
+ process_request = queues .requests .get_nowait ()
212
204
dequeued_time = time .time ()
213
205
except QueueEmpty :
214
206
lock .release ()
215
207
continue
216
208
217
- async def wait_then_requeue (
218
- process_request : WorkerProcessRequest [RequestT , ResponseT ],
219
- ):
220
- # Wait to requeue the request session if it specifies a delay
221
- if delay := process_request .session .get_next_delay ():
222
- await asyncio .sleep (delay )
223
-
224
- # Push session to the stack
225
- process_request .queued_time = time .time ()
226
- pending_requests .append (process_request )
227
- if prioritize_sessions :
228
- # Release the lock with the session on top of the stack
229
- lock .release ()
230
-
231
209
def _request_callback (
232
- future : asyncio .Future [WorkerProcessRequest [RequestT , ResponseT ]],
210
+ _ : asyncio .Future [WorkerProcessRequest [RequestT , ResponseT ]],
233
211
):
234
- # If we are prioritizing sessions, hold
235
- # the lock until the session is done
236
212
nonlocal lock
237
- if not prioritize_sessions :
238
- lock .release ()
239
-
240
- try :
241
- process_request = future .result ()
242
- except asyncio .CancelledError :
243
- return
244
- if not process_request .session .complete :
245
- asyncio .create_task (wait_then_requeue (process_request ))
246
- elif prioritize_sessions :
247
- # no more requests in this session, release the lock
248
- lock .release ()
213
+ lock .release ()
249
214
250
215
task = asyncio .create_task (
251
216
self .resolve_scheduler_request (
@@ -319,7 +284,6 @@ def process_loop_asynchronous(
319
284
queues : MPQueues [GenerationRequest , ResponseSummary ],
320
285
strategy : SchedulingStrategy ,
321
286
stop_event : Event ,
322
- prioritize_sessions : bool ,
323
287
max_concurrency : int ,
324
288
process_id : int ,
325
289
num_processes : int ,
@@ -329,7 +293,6 @@ def process_loop_asynchronous(
329
293
queues = queues ,
330
294
strategy = strategy ,
331
295
stop_event = stop_event ,
332
- prioritize_sessions = prioritize_sessions ,
333
296
max_concurrency = max_concurrency ,
334
297
process_id = process_id ,
335
298
num_processes = num_processes ,
0 commit comments