-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_loop.lua
583 lines (512 loc) · 16.2 KB
/
event_loop.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
local timers = require "lua-asyncio/timer_list"
local futures = require "lua-asyncio/futures"
local TimerList = timers.TimerList
local Future, FutureSemaphore = futures.Future, futures.FutureSemaphore
local get_event_loop
do
local yield = coroutine.yield
--[[@
@name get_event_loop
@desc Returns the event loop where the task is running on
@returns EventLoop The EventLoop.
]]
function get_event_loop()
return yield("get_event_loop")
end
end
local EventLoop
do
local time = os.time
local unpack = table.unpack
local yield = coroutine.yield
EventLoop = {}
local meta = {__index = EventLoop}
--[[@
@name new
@desc Creates a new instance of EventLoop: an object that runs tasks concurrently (pseudo-paralellism)
@desc /!\ EventLoop.tasks_index might be lower than the quantity of items in the EventLoop.tasks list. You must trust tasks_index.
@desc /!\ EventLoop.removed_index might be lower than the quantity of items in the EventLoop.removed list. You must trust removed_index.
@desc /!\ The tasks might run in a different order than the one they acquire once you run EventLoop:add_task
@param obj?<table> The table to turn into an EventLoop.
@returns EventLoop The EventLoop.
@struct {
timers = TimerList, -- A list of the timers the EventLoop will handle
tasks = {}, -- The list of tasks the EventLoop is running
removed = {}, -- The list of indexes in the tasks list to remove
tasks_index = 0, -- The tasks list pointer
removed_index = 0, -- The removed list pointer
error_handler = nil -- The error handler. It must be a function (which receive the error and the task), and if it returns a Task it will be awaited.
}
]]
function EventLoop.new(obj)
obj = obj or {}
obj.timers = TimerList.new()
obj.tasks = {}
obj.removed = {}
obj.tasks_index = 0
obj.removed_index = 0
return setmetatable(obj, meta)
end
--[[@
@name timers_callback
@desc This function is called when a timer executes. It adds the timer task to the EventLoop.
@desc This function doesn't use all the Timer arguments. Passsing the ones listed here is enough.
@desc /!\ This function shouldn't be called by the user code, it should be called by timers only.
@param callback<Timer> The timer that is being executed
@paramstruct callback {
task<Task> The task to execute.
}
]]
function EventLoop.timers_callback(callback)
callback.task.timer = nil
return callback.event_loop:add_task(callback.task)
end
--[[@
@name cancel_awaitable
@desc This function is called when an awaitable times out.
@desc This function doesn't use all the Timer arguments. Passsing the ones listed here is enough.
@desc /!\ This function shouldn't be called by the user code, it should be called by timers only.
@param callback<Timer> The timer that is being executed
@paramstruct callback {
awaitable<Future,Task> The awaitable to cancel.
}
]]
function EventLoop.cancel_awaitable(callback)
if not callback.awaitable.done then
callback.awaitable:cancel()
end
end
--[[@
@name sleep
@desc This function pauses the current task execution and resumes it after some time.
@param delay<number> The time to sleep
]]
function EventLoop:sleep(delay)
return self:sleep_until(time() + delay, self.current_task)
end
--[[@
@name call_soon
@desc This function appends the given task to the EventLoop list after some time.
@param delay<number> The time to wait until the task can be appended
@param task<Task> The task to append
@param no_future?<boolean> Either to cancel the creation or not a Future object that will return after the task ends. @default false.
@returns Future Returns the Future object if no_future is false.
]]
function EventLoop:call_soon(delay, task, no_future)
return self:schedule(time() + delay, task, no_future)
end
--[[@
@name sleep_until
@desc The same as EventLoop:sleep, but with an absolute time.
@param when<number> The time when the task will be resumed
]]
function EventLoop:sleep_until(when)
self.current_task.timer = self.timers:add {
callback = self.timers_callback,
when = when or 0,
task = self.current_task,
event_loop = self
}
return self:stop_task_execution()
end
--[[@
@name schedule
@desc The same as EventLoop:call_soon, but with an absolute time.
@param when<number> The time when the task can be appended
@param task<Task> The task to append
@param no_future?<boolean> Either to cancel the creation or not a Future object that will return after the task ends. @default false.
@returns Future Returns the Future object if no_future is false.
]]
function EventLoop:schedule(when, task, no_future)
if task.ran_once then
error("Can't schedule a task that did already run or is running.")
end
task.timer = self.timers:add {
callback = self.timers_callback,
when = when or 0,
task = task,
event_loop = self
}
if not no_future then
local future = self:new_object(Future)
task:add_future(future)
return future
end
end
--[[@
@name add_task
@desc Adds a task to run on the next loop iteration.
@param task<Task> The task to append
]]
function EventLoop:add_task(task)
if not task._scheduled then
self.tasks_index = self.tasks_index + 1
self.tasks[self.tasks_index] = task
task._scheduled = true
end
end
--[[@
@name new_object
@desc Creates a new asyncio object that belongs to this EventLoop
@param object<Future,Lock,Event,Queue> The object class
@param vararg<mixed> The object arguments
@returns mixed The new object
]]
function EventLoop:new_object(object, ...)
return object.new(self, ...)
end
--[[@
@name stop_task_execution
@desc Pauses the task execution. This can be called from inside the task only.
@desc /!\ The task doesn't resume again if you don't append it back later. If you don't do it, this will just stop forever the task execution.
]]
function EventLoop:stop_task_execution()
self.current_task.paused = true
return yield()
end
--[[@
@name is_awaitable
@desc Checks if an object is awaitable or not.
@returns boolean Whether the object is awaitable or not
]]
function EventLoop:is_awaitable(aw)
return aw._pre_await and aw._await and aw._pause_await and aw.cancel
end
--[[@
@name await
@desc Awaits a Future or Task to complete. Pauses the current task and resumes it again once the awaitable is done.
@param aw<Future,Task> The awaitable to wait.
@returns mixed The Future or Task return values.
]]
function EventLoop:await(aw)
if not self:is_awaitable(aw) then
error("The given object is not awaitable.", 2)
end
aw:_pre_await(self)
if aw:_pause_await(self) then
self.current_task.awaiting = aw
end
return aw:_await(self)
end
--[[@
@name await_safe
@desc Awaits a Future or Task to complete, but safely. Returns nil if an error happened.
@param aw<Future,Task> The awaitable to wait.
@returns mixed The awaitable return values, or nil if it had an error.
]]
function EventLoop:await_safe(aw)
self.current_task.stop_error_propagation = true
return self:await(aw)
end
--[[@
@name add_timeout
@desc Adds a timeout for an awaitable. Basically it cancels the awaitable once the timeout is reached.
@param aw<Future,Task> The awaitable to wait.
]]
function EventLoop:add_timeout(aw, timeout)
self.timers:add {
callback = self.cancel_awaitable,
when = time() + timeout,
awaitable = aw
}
end
--[[@
@name await_for
@desc A shorthand method for add_timeout and await_safe.
@param aw<Future,Task> The awaitable to wait
@param timeout<number> The timeout
@returns mixed The Future or Task return values.
]]
function EventLoop:await_for(aw, timeout)
self:add_timeout(aw, timeout)
return self:await_safe(aw)
end
--[[@
@name await_many
@desc Awaits many awaitables at once. Runs them concurrently, and requires a FutureSemaphore object to do so.
@param vararg<Future,Task> The awaitables to wait
@returns FutureSemaphore The FutureSemaphore that will result once every awaitable is done.
]]
function EventLoop:await_many(...)
local length = select("#", ...)
local semaphore = self:new_object(FutureSemaphore, length)
local task
for index = 1, length do
task = select(index, ...)
task:add_future(semaphore, index)
if not task._is_future then
self:add_task(task)
end
end
return semaphore
end
--[[@
@name run
@desc Runs a loop iteration.
]]
function EventLoop:run()
self.timers:run()
self:run_tasks()
self:remove_tasks()
end
--[[@
@name run_until_complete
@desc Schedules the task, adds a future and runs the loop until the task is done.
@param task<Task> The task
@returns mixed The values the task returns
]]
function EventLoop:run_until_complete(task)
local future = self:new_object(Future)
task:add_future(future)
self:add_task(task)
while not future.done do
self:run()
end
return unpack(future.result)
end
--[[@
@name handle_error
@desc Handles a task error and calls EventLoop:remove_later
@param task<Task> The task
@param index<int> The task index in the list, to be removed later.
]]
function EventLoop:handle_error(task, index)
self:remove_later(index)
task.done = true
if task.cancelled then
task.error = "The task was cancelled."
end
local future
for index = 1, task.futures_index do
future = task.futures[index]
future.obj:set_error(task.error, true, future.index)
end
if task._next_task then
local _next = task._next_task
if _next.stop_error_propagation then
_next.arguments = nil
else
_next.error = task.error
_next.done = true
end
self:add_task(_next)
elseif self.error_handler and not task._is_error_handler then
-- If the eventloop has an error handler, and the task is the product of it,
-- we can't run the error handler again, otherwise it will create an infinite loop.
local result = self.error_handler(task.error, task)
if self:is_awaitable(result) and result.coro then -- It is a task!
result._is_error_handler = true
self:add_task(result)
end
else
error(task.error)
end
end
--[[@
@name remove_later
@desc Schedules a task removal
@param index<int> The task to remove
]]
function EventLoop:remove_later(index)
self.removed_index = self.removed_index + 1
self.removed[self.removed_index] = index
self.tasks[index]._scheduled = false
end
--[[@
@name run_tasks
@desc Runs the tasks in the list only.
]]
function EventLoop:run_tasks()
local tasks, now, task = self.tasks, time()
for index = 1, self.tasks_index do
task = tasks[index]
if not task.cancelled then
if task.error then
self:handle_error(task, index)
else
self.current_task = task
task:run(self)
if task.cancelled or task.error then
self:handle_error(task, index)
elseif task.done or task.paused then
task.paused = false
self:remove_later(index)
end
end
else
self:handle_error(task, index)
end
end
self.current_task = nil
end
--[[@
@name remove_tasks
@desc Removes the tasks that are waiting to be removed from the list.
]]
function EventLoop:remove_tasks()
local tasks, removed, remove = self.tasks, self.removed
for index = 1, self.removed_index do
remove = removed[index]
if remove < self.tasks_index then
tasks[remove] = tasks[self.tasks_index]
-- tasks[self.tasks_index] = nil -- uncomment if you want to make #self.tasks and self.tasks_index match
-- Remember that uncommenting the line won't make the loop respect the tasks order. Use OrderedEventLoop for that.
end
self.tasks_index = self.tasks_index - 1
end
self.removed_index = 0
end
end
local OrderedEventLoop
do
local remove = table.remove
OrderedEventLoop = setmetatable(
{}, {__index = EventLoop}
)
local meta = {__index = OrderedEventLoop}
--[[@
@name new
@desc Creates a new instance of OrderedEventLoop: the same as EventLoop but respecting the tasks order.
@desc /!\ This is different from EventLoop since here, tasks_index and the quantity of items of tasks match.
@desc /!\ EventLoop.removed_index might be lower than the quantity of items in the EventLoop.removed list. You must trust removed_index.
@desc /!\ The tasks orders is the one they acquire once you run OrderedEventLoop:add_task.
@param obj?<table> The table to turn into an EventLoop.
@returns OrderedEventLoop The OrderedEventLoop.
@struct {
timers = TimerList, -- A list of the timers the EventLoop will handle
tasks = {}, -- The list of tasks the EventLoop is running
removed = {}, -- The list of indexes in the tasks list to remove
tasks_index = 0, -- The tasks list pointer
removed_index = 0 -- The removed list pointer
}
]]
function OrderedEventLoop.new(obj)
return setmetatable(EventLoop.new(obj), meta)
end
--[[@
@name remove_tasks
@desc Removes the tasks that are waiting to be removed from the list.
]]
function OrderedEventLoop:remove_tasks()
local tasks, removed = self.tasks, self.removed
for index = self.removed_index, 1, -1 do
remove(tasks, removed[index])
end
self.tasks_index = self.tasks_index - self.removed_index
self.removed_index = 0
-- self.removed = {} -- uncomment if you want to make #self.removed and self.removed_index always match
end
end
local LimitedEventLoop
do
local time = os.time
LimitedEventLoop = setmetatable(
{}, {__index = EventLoop}
)
local meta = {__index = LimitedEventLoop}
--[[@
@name new
@desc Creates a new instance of LimitedEventLoop: the same as EventLoop but with runtime limitations
@desc This inherits from EventLoop
@param obj<table,nil> The table to turn into an EventLoop.
@param runtime<int> The maximum runtime that can be used.
@param reset<int> How many time it needs to wait until the used runtime is resetted.
@returns LimitedEventLoop The LimitedEventLoop.
@struct {
timers = TimerList, -- A list of the timers the EventLoop will handle
tasks = {}, -- The list of tasks the EventLoop is running
removed = {}, -- The list of indexes in the tasks list to remove
tasks_index = 0, -- The tasks list pointer
removed_index = 0, -- The removed list pointer
runtime = runtime, -- The maximum runtime
reset = reset, -- The reset interval
used = 0, -- The used runtime
initialized = 0, -- When was the last runtime reset
step = 0 -- The iteration step (0 -> needs to run timers, 1 -> needs to run tasks, 2 -> needs to remove tasks)
}
]]
function LimitedEventLoop.new(obj, runtime, reset)
obj = EventLoop.new(obj)
obj.runtime = runtime
obj.reset = reset
obj.used = 0
obj.initialized = 0
obj.step = 0
return setmetatable(obj, meta)
end
--[[@
@name can_run
@desc Checks if the EventLoop can run something.
@param now<int> How many runtime is being used and not counted in LimitedEventLoop.used
@returns boolean Whether it can run something or not
]]
function LimitedEventLoop:can_run(now)
return self.used + now < self.runtime
end
--[[@
@name run
@desc Runs (or partially runs) a loop iteration if it is possible.
]]
function LimitedEventLoop:run()
local start = time()
if start - self.initialized >= self.reset then
self.initialized = start
self.used = 0
end
if self.step == 0 then
if self:can_run(0) then
self.timers:run()
self.step = 1
end
end
if self.step == 1 then
if self:can_run(time() - start) then
self:run_tasks()
self.step = 2
end
end
if self.step == 2 then
if self:can_run(time() - start) then
self:remove_tasks()
self.step = 0
end
end
self.used = self.used + time() - start
end
end
--[[@
@name MixedEventLoop
@desc Creates a new object which is a mix of any EventLoop's variants.
@param eventloop<table> The table to turn into the mix
@param vararg<EventLoop> The classes to mix
@returns EventLoop The mixed event loop.
]]
local function MixedEventLoop(eventloop, ...)
local classes = {...}
local length = #classes
setmetatable(eventloop, {
__index = function(tbl, key)
local v
for i = 1, length do
v = classes[i][key]
if v then return v end
end
end
})
local meta = {__index = eventloop}
function eventloop.new(obj)
local obj = obj or {}
for i = 1, length do
obj = classes[i].new(obj)
end
return setmetatable(obj, meta)
end
return eventloop
end
return {
EventLoop = EventLoop,
OrderedEventLoop = OrderedEventLoop,
LimitedEventLoop = LimitedEventLoop,
MixedEventLoop = MixedEventLoop,
get_event_loop = get_event_loop
}