Skip to content

Commit fbdfadc

Browse files
committed
Code review
1 parent b1b3d8a commit fbdfadc

File tree

1 file changed

+65
-65
lines changed

1 file changed

+65
-65
lines changed

Sources/_NIOConcurrency/AsyncSequenceSupport.swift

Lines changed: 65 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,23 @@
1616
import struct NIO.ByteBuffer
1717

1818
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
19-
public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
19+
public struct NIOByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
2020
public typealias Element = UInt8
2121
public typealias AsyncIterator = Iterator
2222

23+
@usableFromInline
24+
let upstream: Upstream
25+
26+
@inlinable
27+
init(_ upstream: Upstream) {
28+
self.upstream = upstream
29+
}
30+
31+
@inlinable
32+
public func makeAsyncIterator() -> Iterator {
33+
Iterator(self.upstream.makeAsyncIterator())
34+
}
35+
2336
public struct Iterator: AsyncIteratorProtocol {
2437
/*private but*/ @usableFromInline var state: State
2538

@@ -28,87 +41,74 @@ public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequ
2841
case hasBuffer(ByteBuffer, Upstream.AsyncIterator)
2942
case askForMore(Upstream.AsyncIterator)
3043
case finished
31-
case modifying
44+
45+
@inlinable
46+
init(buffer: ByteBuffer, upstream: Upstream.AsyncIterator) {
47+
if buffer.readableBytes > 0 {
48+
self = .hasBuffer(buffer, upstream)
49+
} else {
50+
self = .askForMore(upstream)
51+
}
52+
}
53+
54+
@inlinable
55+
mutating func next() async throws -> Element? {
56+
switch self {
57+
case .askForMore(var upstream):
58+
while true {
59+
switch try await upstream.next() {
60+
case .some(let nextBuffer) where nextBuffer.readableBytes == 0:
61+
// we received an empty buffer. for this reason, let's continue and get the
62+
// next buffer fro, the sequence
63+
continue
64+
65+
case .some(var nextBuffer):
66+
assert(nextBuffer.readableBytes > 0)
67+
let result = nextBuffer.readInteger(as: UInt8.self)
68+
self = .init(buffer: nextBuffer, upstream: upstream)
69+
return result
70+
71+
case .none:
72+
self = .finished
73+
return nil
74+
}
75+
}
76+
77+
case .hasBuffer(var buffer, let upstream):
78+
assert(buffer.readableBytes > 0)
79+
let result = buffer.readInteger(as: UInt8.self)
80+
self = .init(buffer: buffer, upstream: upstream)
81+
return result
82+
83+
case .finished:
84+
return nil
85+
}
86+
}
3287
}
3388

34-
@usableFromInline
89+
@inlinable
3590
init(_ upstream: Upstream.AsyncIterator) {
3691
self.state = .askForMore(upstream)
3792
}
3893

3994
@inlinable
4095
public mutating func next() async throws -> Element? {
41-
switch self.state {
42-
case .askForMore(var upstream):
43-
self.state = .modifying
44-
45-
while true {
46-
switch try await upstream.next() {
47-
case .some(let nextBuffer) where nextBuffer.readableBytes == 0:
48-
break
49-
50-
case .some(var nextBuffer):
51-
assert(nextBuffer.readableBytes > 0)
52-
let result = nextBuffer.readInteger(as: UInt8.self)
53-
if nextBuffer.readableBytes > 0 {
54-
self.state = .hasBuffer(nextBuffer, upstream)
55-
} else {
56-
self.state = .askForMore(upstream)
57-
}
58-
return result
59-
60-
case .none:
61-
self.state = .finished
62-
return nil
63-
}
64-
}
65-
66-
case .hasBuffer(var buffer, let upstream):
67-
assert(buffer.readableBytes > 0)
68-
self.state = .modifying
69-
70-
let result = buffer.readInteger(as: UInt8.self)
71-
if buffer.readableBytes > 0 {
72-
self.state = .hasBuffer(buffer, upstream)
73-
} else {
74-
self.state = .askForMore(upstream)
75-
}
76-
return result
77-
78-
case .finished:
79-
return nil
80-
81-
case .modifying:
82-
preconditionFailure("Invalid state: \(self.state)")
83-
}
96+
try await self.state.next()
8497
}
8598
}
8699

87-
@inlinable
88-
public func makeAsyncIterator() -> Iterator {
89-
Iterator(self.upstream.makeAsyncIterator())
90-
}
91-
92-
@usableFromInline
93-
let upstream: Upstream
94-
95-
/*private but*/ @usableFromInline init(_ upstream: Upstream) {
96-
self.upstream = upstream
97-
}
98100
}
99101

100-
@usableFromInline
101-
struct TooManyBytesError: Error {
102-
@usableFromInline
103-
init() {}
102+
public struct NIOTooManyBytesError: Error {
103+
public init() {}
104104
}
105105

106106
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
107107
extension AsyncSequence where Element == ByteBuffer {
108108
/// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes.
109109
@inlinable
110-
public func toBytes() -> ByteBufferToUInt8AsyncSequence<Self> {
111-
ByteBufferToUInt8AsyncSequence(self)
110+
public func toBytes() -> NIOByteBufferToUInt8AsyncSequence<Self> {
111+
NIOByteBufferToUInt8AsyncSequence(self)
112112
}
113113

114114
/// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`.
@@ -124,13 +124,13 @@ extension AsyncSequence where Element == ByteBuffer {
124124

125125
var receivedBytes = buffer.readableBytes
126126
if receivedBytes > maxBytes {
127-
throw TooManyBytesError()
127+
throw NIOTooManyBytesError()
128128
}
129129

130130
while var next = try await iterator.next() {
131131
receivedBytes += next.readableBytes
132132
if receivedBytes > maxBytes {
133-
throw TooManyBytesError()
133+
throw NIOTooManyBytesError()
134134
}
135135

136136
buffer.writeBuffer(&next)

0 commit comments

Comments
 (0)