Skip to content

Add AsyncSequence helpers #1939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

fabianfett
Copy link
Member

This PR does not have any tests yet. It is here, to demonstrate some AsyncSequence and ByteBuffer helpers that we might want to add to make our consumers lifes' easier.

Motivation:

In the [RFC] for an async/await API in AsyncHTTPClient @adam-fowler asked, if there could be a helper method to transform an AsyncSequence of ByteBuffers into a single ByteBuffer. Following our strict rule of implementing conformances only to types you we own we must add those convenience helpers to SwiftNIO directly.

Modifications:

  • Add a convenience helper func consume(maxBytes: Int) async throws -> ByteBuffer that consumes an AsyncSequence of ByteBuffers into a single ByteBuffer
  • Add a convenience helper func bytes() that translates an AsyncSequence of ByteBuffers into an AsyncSequence of Bytes (UInt8)

@fabianfett fabianfett force-pushed the ff-async-sequence-support branch from 4e590a8 to 3884bd1 Compare August 16, 2021 16:14
@fabianfett fabianfett force-pushed the ff-async-sequence-support branch from 3884bd1 to fbdfadc Compare August 16, 2021 16:17
@fabianfett fabianfett force-pushed the ff-async-sequence-support branch from 5613b51 to 7d09613 Compare August 16, 2021 16:26
@fabianfett fabianfett requested a review from Lukasa August 16, 2021 18:28
public typealias AsyncIterator = Iterator

@usableFromInline
let upstream: Upstream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this may as well be var. It doesn't really matter but it can't hurt.

/// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have.
/// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence
@inlinable
public func consume(maxBytes: Int) async throws -> ByteBuffer? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is returning nil here a good idea? I'm a bit inclined to want to return an empty buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consume feels to me like we're actually "reading" the bytes, collect might be more appropriate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if an additional lower-level variant accepting an inout ByteBuffer would also be useful? Callers could for example, reserve capacity or collect from multiple sequences into the same buffer without an intermediate copy.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it would be more useful as a general "collect bytes from this stream of buffers" API on ByteBuffer itself.

Which means I think there might be benefit to moving it, and loosening the constraints to allow for async sequences of any buffer type (Data, Array, etc).

switch try await upstream.next() {
case .some(let nextBuffer) where nextBuffer.readableBytes == 0:
// we received an empty buffer. for this reason, let's continue and get the
// next buffer fro, the sequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// next buffer fro, the sequence
// next buffer from the sequence


case .some(var nextBuffer):
assert(nextBuffer.readableBytes > 0)
let result = nextBuffer.readInteger(as: UInt8.self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be banged? We assert that there must readable bytes so reading one mustn't fail

import struct NIO.ByteBuffer

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct NIOByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love the name. URLSession uses AsyncBytes (i.e. URLSession.AsyncBytes) which I think is quite clear. How about ByteBuffer.AsyncBytes?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not basically an async version of FlattenSequence/.joined() from the standard library, specialised for ByteBuffer?

So... ByteBuffer.FlattenSequence, perhaps?

/// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`.
///
/// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have.
/// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily all the bytes

/// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have.
/// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence
@inlinable
public func consume(maxBytes: Int) async throws -> ByteBuffer? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consume feels to me like we're actually "reading" the bytes, collect might be more appropriate.

extension AsyncSequence where Element == ByteBuffer {
/// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes.
@inlinable
public func toBytes() -> NIOByteBufferToUInt8AsyncSequence<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a computed property?

for try await byte in body.bytes {
  // ...
}

vs.

for try await byte in body.toBytes() {
  // ...
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joined() seems like a better name for this, to mirror both FlattenSequence and the String concatenation helper from the standard library.

/// - Parameter maxBytes: The maximum number of bytes that the result ByteBuffer is allowed to have.
/// - Returns: A ``NIO/ByteBuffer`` that holds all the bytes of the AsyncSequence
@inlinable
public func consume(maxBytes: Int) async throws -> ByteBuffer? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if an additional lower-level variant accepting an inout ByteBuffer would also be useful? Callers could for example, reserve capacity or collect from multiple sequences into the same buffer without an intermediate copy.

@Lukasa Lukasa closed this Jan 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants