Skip to content

Commit c5c7945

Browse files
committed
Show in-order processing patterns, equivalent to queue.async {}
1 parent cec256d commit c5c7945

File tree

1 file changed

+195
-0
lines changed

1 file changed

+195
-0
lines changed

Guide.docc/IncrementalAdoption.md

+195
Original file line numberDiff line numberDiff line change
@@ -227,3 +227,198 @@ NS_SWIFT_ASYNC_NAME
227227
NS_SWIFT_ASYNC_NOTHROW
228228
NS_SWIFT_UNAVAILABLE_FROM_ASYNC(msg)
229229
```
230+
231+
## Dispatch
232+
233+
### Ordered work processing in actors, when enqueueing from a synchronous contexts
234+
235+
Swift concurrency naturally enforces program order for asynchronous code as long
236+
as the execution remains in a single Task - this is equivalent to using "a single
237+
thread" to execute some work, but is more resource efficient because the task may
238+
suspend while waiting for some work, for example:
239+
240+
```
241+
// ✅ Guaranteed order, since caller is a single task
242+
let printer: Printer = ...
243+
await printer.print(1)
244+
await printer.print(2)
245+
await printer.print(3)
246+
```
247+
248+
This code is structurally guaranteed to execute the prints in the expected "1, 2, 3"
249+
order, because the caller is a single task. Things
250+
251+
Dispatch queues offered the common `queue.async { ... }` way to kick off some
252+
asynchronous work without waiting for its result. In dispatch, if one were to
253+
write the following code:
254+
255+
```swift
256+
let queue = DispatchSerialQueue(label: "queue")
257+
258+
queue.async { print(1) }
259+
queue.async { print(2) }
260+
queue.async { print(3) }
261+
```
262+
263+
The order of the elements printed is guaranteed to be `1`, `2` and finally `3`.
264+
265+
At first, it may seem like `Task { ... }` is exactly the same, because it also
266+
kicks off some asynchronous computation without waiting for it to complete.
267+
A naively port of the same code might look like this:
268+
269+
```swift
270+
// ⚠️ any order of prints is expected
271+
Task { print(1) }
272+
Task { print(2) }
273+
Task { print(3) }
274+
```
275+
276+
This example **does not** guarantee anything about the order of the printed values,
277+
because Tasks are enqueued on a global (concurrent) threadpool which uses multiple
278+
threads to schedule the tasks. Because of this, any of the tasks may be executed first.
279+
280+
Another attempt at recovering serial execution may be to use an actor, like this:
281+
282+
```swift
283+
// ⚠️ still any order of prints is possible
284+
actor Printer {}
285+
func go() {
286+
// Notice the tasks don't capture `self`!
287+
Task { print(1) }
288+
Task { print(2) }
289+
Task { print(3) }
290+
}
291+
}
292+
```
293+
294+
This specific example still does not even guarantee enqueue order (!) of the tasks,
295+
and much less actual execution order. The tasks this is because lack of capturing
296+
`self` of the actor, those tasks are effectively going to run on the global concurrent
297+
pool, and not on the actor. This behavior may be unexpected, but it is the current semantics.
298+
299+
We can correct this a bit more in order to ensure the enqueue order, by isolating
300+
the tasks to the actor, this is done as soon as we capture the `self` of the actor:
301+
302+
```swift
303+
// ✅ enqueue order of tasks is guaranteed
304+
//
305+
// ⚠️ however. due to priority escalation, still any order of prints is possible (!)
306+
actor Printer {}
307+
func go() { // assume this method must be synchronous
308+
// Notice the tasks do capture self
309+
Task { self.log(1) }
310+
Task { self.log(2) }
311+
Task { self.log(3) }
312+
}
313+
314+
func log(_ int: Int) { print(int) }
315+
}
316+
```
317+
318+
This improves the situation because the tasks are now isolated to the printer
319+
instance actor (by means of using `Task{}` which inherits isolation, and refering
320+
to the actor's `self`), however their specific execution order is _still_ not deterministic.
321+
322+
Actors in Swift are **not** strictly FIFO ordered, and tasks
323+
processed by an actor may be reordered by the runtime for example because
324+
of _priority escalation_.
325+
326+
**Priority escalation** takes place when a low-priority task suddenly becomes
327+
await-ed on by a high priority task. The Swift runtime is able to move such
328+
task "in front of the queue" and effectively will process the now priority-escalated
329+
task, before any other low-priority tasks. This effectively leads to FIFO order
330+
violations, because such task "jumped ahead" of other tasks which may have been
331+
enqueue on the actor well ahead of it. This does does help make actors very
332+
responsive to high priority work which is a valuable property to have!
333+
334+
> Note: Priority escalation is not supported on actors with custom executors.
335+
336+
The safest and correct way to enqueue a number of items to be processed by an actor,
337+
in a specific order is to use an `AsyncStream` to form a single, well-ordered
338+
sequence of items, which can be emitted to even from synchronous code.
339+
And then consume it using a _single_ task running on the actor, like this:
340+
341+
```swift
342+
// ✅ Guaranteed order in which log() are invoked,
343+
// regardless of priority escalations, because the disconnect
344+
// between the producing and consuming task
345+
actor Printer {
346+
let stream: AsyncStream<Int>
347+
let streamContinuation: AsyncStream<Int>.Continuation
348+
var streamConsumer: Task<Void, Never>!
349+
350+
init() async {
351+
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
352+
self.stream = stream
353+
self.streamContinuation = continuation
354+
355+
// Consuming Option A)
356+
// Start consuming immediately,
357+
// or better have a caller of Printer call `startStreamConsumer()`
358+
// which would make it run on the caller's task, allowing for better use of structured concurrency.
359+
self.streamConsumer = Task { await self.consumeStream() }
360+
}
361+
362+
deinit {
363+
self.streamContinuation.finish()
364+
}
365+
366+
nonisolated func enqueue(_ item: Int) {
367+
self.streamContinuation.yield(item)
368+
}
369+
370+
nonisolated func cancel() {
371+
self.streamConsumer?.cancel()
372+
}
373+
374+
func consumeStream() async {
375+
for await item in self.stream {
376+
if Task.isCancelled { break }
377+
378+
log(item)
379+
}
380+
}
381+
382+
func log(_ int: Int) { print(int) }
383+
}
384+
```
385+
386+
and invoke it like:
387+
388+
```
389+
let printer: Printer = ...
390+
printer.enqueue(1)
391+
printer.enqueue(2)
392+
printer.enqueue(3)
393+
```
394+
395+
We're assuming that the caller has to be in synchronous code, and this is why we make the `enqueue`
396+
method `nonisolated` but we use it to funnel work items into the actor's stream.
397+
398+
The actor uses a single task to consume the async sequence of items, and this way guarantees
399+
the specific order of items being handled.
400+
401+
This approach has both up and down-sides, because now the item processing cannot be affected by
402+
priority escalation. The items will always be processed in their strict enqueue order,
403+
and we cannot easily await for their results -- since the caller is in synchronous code,
404+
so we might need to resort to callbacks if we needed to report completion of an item
405+
getting processed.
406+
407+
Notice that we kick off an unstructured task in the actor's initializer, to handle the
408+
consuming of the stream. This also may be sub-optimal, because as cancellation must
409+
now be handled manually. You may instead prefer to _not_ create the consumer task
410+
at all in this Printer type, but require that some existing task invokes `await consumeStream()`, like this:
411+
412+
```
413+
let printer: Printer = ...
414+
Task { // or some structured task
415+
await printer.consumeStream()
416+
}
417+
418+
printer.enqueue(1)
419+
printer.enqueue(2)
420+
printer.enqueue(3)
421+
```
422+
423+
In this case, you'd should make sure to only have at-most one task consuming the stream,
424+
e.g. by using a boolean flag inside the printer actor.

0 commit comments

Comments
 (0)