Skip to content

Add AsyncSequence helpers #1939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Sources/_NIOConcurrency/AsyncAwaitSupport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import NIOCore

#if compiler(>=5.5)
import _Concurrency

extension EventLoopFuture {
/// Get the value/error from an `EventLoopFuture` in an `async` context.
Expand Down
136 changes: 136 additions & 0 deletions Sources/_NIOConcurrency/AsyncSequenceSupport.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if swift(>=5.5)
import struct NIO.ByteBuffer

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct NIOByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the name. URLSession uses AsyncBytes (i.e. URLSession.AsyncBytes) which I think is quite clear. How about ByteBuffer.AsyncBytes?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not basically an async version of FlattenSequence/.joined() from the standard library, specialised for ByteBuffer?

So... ByteBuffer.FlattenSequence, perhaps?

public typealias Element = UInt8
public typealias AsyncIterator = Iterator

@usableFromInline
let upstream: Upstream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this may as well be var. It doesn't really matter but it can't hurt.


@inlinable
init(_ upstream: Upstream) {
self.upstream = upstream
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Iterator(self.upstream.makeAsyncIterator())
}

public struct Iterator: AsyncIteratorProtocol {
/*private but*/ @usableFromInline var state: State

@usableFromInline
enum State {
case hasBuffer(ByteBuffer, Upstream.AsyncIterator)
case askForMore(Upstream.AsyncIterator)
case finished

@inlinable
init(buffer: ByteBuffer, upstream: Upstream.AsyncIterator) {
if buffer.readableBytes > 0 {
self = .hasBuffer(buffer, upstream)
} else {
self = .askForMore(upstream)
}
}
}

@inlinable
init(_ upstream: Upstream.AsyncIterator) {
self.state = .askForMore(upstream)
}

@inlinable
public mutating func next() async throws -> Element? {
switch self.state {
case .askForMore(var upstream):
while true {
switch try await upstream.next() {
case .some(let nextBuffer) where nextBuffer.readableBytes == 0:
// we received an empty buffer. for this reason, let's continue and get the
// next buffer fro, the sequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// next buffer fro, the sequence
// next buffer from the sequence

continue

case .some(var nextBuffer):
assert(nextBuffer.readableBytes > 0)
let result = nextBuffer.readInteger(as: UInt8.self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be banged? We assert that there must readable bytes so reading one mustn't fail

self.state = .init(buffer: nextBuffer, upstream: upstream)
return result

case .none:
self.state = .finished
return nil
}
}

case .hasBuffer(var buffer, let upstream):
assert(buffer.readableBytes > 0)
let result = buffer.readInteger(as: UInt8.self)
self.state = .init(buffer: buffer, upstream: upstream)
return result

case .finished:
return nil
}
}
}

}

public struct NIOTooManyBytesError: Error {
public init() {}
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension AsyncSequence where Element == ByteBuffer {
/// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes.
@inlinable
public func toBytes() -> NIOByteBufferToUInt8AsyncSequence<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a computed property?

for try await byte in body.bytes {
  // ...
}

vs.

for try await byte in body.toBytes() {
  // ...
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joined() seems like a better name for this, to mirror both FlattenSequence and the String concatenation helper from the standard library.

NIOByteBufferToUInt8AsyncSequence(self)
}

/// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`.
///
/// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have.
/// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily all the bytes

@inlinable
public func consume(maxBytes: Int) async throws -> ByteBuffer? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is returning nil here a good idea? I'm a bit inclined to want to return an empty buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consume feels to me like we're actually "reading" the bytes, collect might be more appropriate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if an additional lower-level variant accepting an inout ByteBuffer would also be useful? Callers could for example, reserve capacity or collect from multiple sequences into the same buffer without an intermediate copy.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it would be more useful as a general "collect bytes from this stream of buffers" API on ByteBuffer itself.

Which means I think there might be benefit to moving it, and loosening the constraints to allow for async sequences of any buffer type (Data, Array, etc).

var iterator = self.makeAsyncIterator()
guard var buffer = try await iterator.next() else {
return nil
}

var receivedBytes = buffer.readableBytes
if receivedBytes > maxBytes {
throw NIOTooManyBytesError()
}

while var next = try await iterator.next() {
receivedBytes += next.readableBytes
if receivedBytes > maxBytes {
throw NIOTooManyBytesError()
}

buffer.writeBuffer(&next)
}
return buffer
}
}
#endif