-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfutures.lua
282 lines (254 loc) · 8.67 KB
/
futures.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
local Future
do
local yield = coroutine.yield
Future = {}
local meta = {__index = Future}
--[[@
@name new
@desc Creates a new instance of Future: an object that will return later. You can use EventLoop:await on it, but you can't use EventLoop:add_task.
@desc If you await it, it will return the :set_result() unpacked table. If you set the result to {"a", "b", "c"}, it will return "a", "b", "c".
@desc /!\ If you safely await it, it will return nil and you need to manually check its error.
@param loop<EventLoop> The loop that the future belongs to
@param obj?<table> The table to turn into a Future.
@returns Future The Future object
@struct {
loop = EventLoop, -- the loop that the future belongs to
_next_tasks = {}, -- the tasks that the Future is gonna run once it is done
_next_tasks_index = 0, -- the tasks table pointer
futures = {}, -- the futures to trigger after this is done
futures_index = 0, -- the futures pointer
result = nil or table, -- the Future result; if it is nil, it didn't end yet.
error = false or string, -- whether the future has thrown an error or not
cancelled = false, -- whether the future is cancelled or not
done = false -- whether the future is done or not
}
]]
function Future.new(loop, obj)
obj = obj or {}
obj.loop = loop
obj._next_tasks = {}
obj._next_tasks_index = 0
obj.futures = {}
obj.futures_index = 0
return setmetatable(obj, meta)
end
--[[@
@name _can_await
@desc Throws an error if the Future can't be awaited.
@param loop<EventLoop> The EventLoop executing await
]]
function Future:_can_await(loop)
if self.cancelled then
error("Can't await a cancelled Future.", 3)
end
end
--[[@
@name _pause_await
@desc Returns whether the task awaiting this object needs to be paused or not
@param loop<EventLoop> The EventLoop executing await
@returns boolean If the task awaiting needs to be paused or not
]]
function Future:_pause_await(loop)
return not self.done
end
--[[@
@name _await
@desc Pauses the awaiting task, and returns once the result is done.
@param loop<EventLoop> The EventLoop executing await
@returns mixed The returned value.
]]
function Future:_await(loop)
if self.done then
if self.error then
loop.current_task.error = self.error
else
loop.current_task.arguments = self.result
end
return yield()
else
self._next_tasks_index = self._next_tasks_index + 1
self._next_tasks[self._next_tasks_index] = loop.current_task
return loop:stop_task_execution()
end
end
--[[@
@name cancel
@desc Cancels the Future
]]
function Future:cancel()
self.cancelled = true
end
--[[@
@name add_future
@desc Adds a future that will be set after this one is done.
@param future<Future> The future object. Can be a variant too.
@param index?<int> The index given to the future object (used only with FutureSemaphore)
]]
function Future:add_future(future, index)
self.futures_index = self.futures_index + 1
self.futures[self.futures_index] = {obj=future, index=index}
end
--[[@
@name set_result
@desc Sets the Future result and calls all the scheduled tasks
@param result<table> A table (with no associative members) to set as the result. Can have multiple items.
@param safe?<boolean> Whether to cancel the error if the result can't be set. @default false.
]]
function Future:set_result(result, safe)
if self.done then
local msg = "The Future has already been done."
if safe then return msg
else error(msg, 2) end
elseif self.cancelled then
local msg = "The Future was cancelled."
if safe then return msg
else error(msg, 2) end
end
self.done = true
self.result = result
local future
for index = 1, self.futures_index do
future = self.futures[index]
future.obj:set_result(result, true, future.index)
end
local task
for index = 1, self._next_tasks_index do
task = self._next_tasks[index]
task.arguments = result
task.awaiting = nil
self.loop:add_task(task)
end
end
--[[@
@name set_error
@desc Sets the Future error and calls all the scheduled tasks
@param result<string> A string to set as the error message.
@param safe?<boolean> Whether to cancel the error if the result can't be set. @default false.
]]
function Future:set_error(result, index, safe)
if self.done then
local msg = "The Future has already been done."
if safe then return msg
else error(msg, 2) end
elseif self.cancelled then
local msg = "The Future was cancelled."
if safe then return msg
else error(msg, 2) end
end
result = debug.traceback(result, 2)
self.error = result
self.done = true
local future
for index = 1, self.futures_index do
future = self.futures[index]
future.obj:set_error(result, true, future.index)
end
local task
for index = 1, self._next_tasks_index do
task = self._next_tasks[index]
if task.stop_error_propagation then
task.arguments = nil
else
task.error = result
task.done = true
end
task.awaiting = nil
self.loop:add_task(task)
end
end
end
local FutureSemaphore
do
FutureSemaphore = setmetatable(
{}, {__index=Future}
)
local meta = {__index = FutureSemaphore}
--[[@
@name new
@desc Creates a new instance of FutureSemaphore: an object that will return many times later. This inherits from Future.
@desc You can use EventLoop:await on it, but you can't use add_task.
@desc If you await it, it will return a table where you can get all the appended values with their respective indexes.
@desc /!\ FutureSemaphore will never propagate an error, instead, it will append it to the result as a string.
@param loop<EventLoop> The loop that the future belongs to
@param quantity<int> The quantity of values that the object will return.
@param obj?<table> The table to turn into a FutureSemaphore.
@returns FutureSemaphore The FutureSemaphore object
@struct {
loop = EventLoop, -- the loop that the future belongs to
quantity = quantity, -- the quantity of values that the object will return
_done = 0, -- the quantity of values that the object has prepared
_next_tasks = {}, -- the tasks that the future is gonna run once it is done
_next_tasks_index = 0, -- the tasks table pointer
result = nil or table, -- the Future result; if it is nil, the future is not completely done.
_result = table -- the FutureSemaphore partial or complete result; if it is nil, no result was given in.
cancelled = false -- whether the future is cancelled or not
cancelled = false, -- whether the future is cancelled or not
done = false -- whether the future is done or not
}
]]
function FutureSemaphore.new(loop, quantity, obj)
obj = Future.new(loop, obj)
obj.quantity = quantity
obj._done = 0
obj._result = {}
return setmetatable(obj, meta)
end
--[[@
@name set_result
@desc Sets a FutureSemaphore result and calls all the scheduled tasks if it is completely done
@param result<table> A table (with no associative members) to set as the result. Can have multiple items.
@param safe<boolean> Whether to cancel the error if the result can't be set.
@param index<number> The index of the result. Can't be repeated.
]]
function FutureSemaphore:set_result(result, safe, index)
if self.done then
local msg = "The FutureSemaphore has already been done."
if safe then return msg
else error(msg, 2) end
elseif self.cancelled then
local msg = "The FutureSemaphore was cancelled."
if safe then return msg
else error(msg, 2) end
end
if not self._result[index] then
self._result[index] = result
self._done = self._done + 1
else
local msg = "The given semaphore spot is already taken."
if safe then return msg
else error(msg, 2) end
end
if self._done == self.quantity then
self.done = true
self.result = self._result
local future
for index = 1, self.futures_index do
future = self.futures[index]
future.obj:set_result(self.result, true, future.index)
end
local task_result, task = {self.result}
for _index = 1, self._next_tasks_index do
task = self._next_tasks[_index]
task.arguments = task_result
task.awaiting = nil
self.loop:add_task(task)
end
end
end
--[[@
@name set_error
@desc Sets a FutureSemaphore error and calls all the scheduled tasks if it is completely done
@param result<string> A string to set as the error message.
@param safe<boolean> Whether to cancel the error if the result can't be set.
@param index<number> The index of the result. Can't be repeated.
]]
function FutureSemaphore:set_error(result, safe, index)
result = debug.traceback(result, 2)
return self:set_result(result, safe, index)
-- The behaviour is the same on this future variation
end
end
return {
Future = Future,
FutureSemaphore = FutureSemaphore
}