Skip to content

Commit 7d09613

Browse files
committed
Code review
1 parent b1b3d8a commit 7d09613

File tree

1 file changed

+35
-40
lines changed

1 file changed

+35
-40
lines changed

Sources/_NIOConcurrency/AsyncSequenceSupport.swift

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,23 @@
1616
import struct NIO.ByteBuffer
1717

1818
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
19-
public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
19+
public struct NIOByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequence where Upstream.Element == ByteBuffer {
2020
public typealias Element = UInt8
2121
public typealias AsyncIterator = Iterator
2222

23+
@usableFromInline
24+
let upstream: Upstream
25+
26+
@inlinable
27+
init(_ upstream: Upstream) {
28+
self.upstream = upstream
29+
}
30+
31+
@inlinable
32+
public func makeAsyncIterator() -> Iterator {
33+
Iterator(self.upstream.makeAsyncIterator())
34+
}
35+
2336
public struct Iterator: AsyncIteratorProtocol {
2437
/*private but*/ @usableFromInline var state: State
2538

@@ -28,10 +41,18 @@ public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequ
2841
case hasBuffer(ByteBuffer, Upstream.AsyncIterator)
2942
case askForMore(Upstream.AsyncIterator)
3043
case finished
31-
case modifying
44+
45+
@inlinable
46+
init(buffer: ByteBuffer, upstream: Upstream.AsyncIterator) {
47+
if buffer.readableBytes > 0 {
48+
self = .hasBuffer(buffer, upstream)
49+
} else {
50+
self = .askForMore(upstream)
51+
}
52+
}
3253
}
3354

34-
@usableFromInline
55+
@inlinable
3556
init(_ upstream: Upstream.AsyncIterator) {
3657
self.state = .askForMore(upstream)
3758
}
@@ -40,21 +61,17 @@ public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequ
4061
public mutating func next() async throws -> Element? {
4162
switch self.state {
4263
case .askForMore(var upstream):
43-
self.state = .modifying
44-
4564
while true {
4665
switch try await upstream.next() {
4766
case .some(let nextBuffer) where nextBuffer.readableBytes == 0:
48-
break
67+
// we received an empty buffer. for this reason, let's continue and get the
68+
// next buffer fro, the sequence
69+
continue
4970

5071
case .some(var nextBuffer):
5172
assert(nextBuffer.readableBytes > 0)
5273
let result = nextBuffer.readInteger(as: UInt8.self)
53-
if nextBuffer.readableBytes > 0 {
54-
self.state = .hasBuffer(nextBuffer, upstream)
55-
} else {
56-
self.state = .askForMore(upstream)
57-
}
74+
self.state = .init(buffer: nextBuffer, upstream: upstream)
5875
return result
5976

6077
case .none:
@@ -65,50 +82,28 @@ public struct ByteBufferToUInt8AsyncSequence<Upstream: AsyncSequence>: AsyncSequ
6582

6683
case .hasBuffer(var buffer, let upstream):
6784
assert(buffer.readableBytes > 0)
68-
self.state = .modifying
69-
7085
let result = buffer.readInteger(as: UInt8.self)
71-
if buffer.readableBytes > 0 {
72-
self.state = .hasBuffer(buffer, upstream)
73-
} else {
74-
self.state = .askForMore(upstream)
75-
}
86+
self.state = .init(buffer: buffer, upstream: upstream)
7687
return result
7788

7889
case .finished:
7990
return nil
80-
81-
case .modifying:
82-
preconditionFailure("Invalid state: \(self.state)")
8391
}
8492
}
8593
}
8694

87-
@inlinable
88-
public func makeAsyncIterator() -> Iterator {
89-
Iterator(self.upstream.makeAsyncIterator())
90-
}
91-
92-
@usableFromInline
93-
let upstream: Upstream
94-
95-
/*private but*/ @usableFromInline init(_ upstream: Upstream) {
96-
self.upstream = upstream
97-
}
9895
}
9996

100-
@usableFromInline
101-
struct TooManyBytesError: Error {
102-
@usableFromInline
103-
init() {}
97+
public struct NIOTooManyBytesError: Error {
98+
public init() {}
10499
}
105100

106101
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
107102
extension AsyncSequence where Element == ByteBuffer {
108103
/// Transform an AsyncSequence of ByteBuffers into an AsyncSequence of single bytes.
109104
@inlinable
110-
public func toBytes() -> ByteBufferToUInt8AsyncSequence<Self> {
111-
ByteBufferToUInt8AsyncSequence(self)
105+
public func toBytes() -> NIOByteBufferToUInt8AsyncSequence<Self> {
106+
NIOByteBufferToUInt8AsyncSequence(self)
112107
}
113108

114109
/// Consume an ``Swift/AsyncSequence`` of ``NIO/ByteBuffer``s into a single `ByteBuffer`.
@@ -124,13 +119,13 @@ extension AsyncSequence where Element == ByteBuffer {
124119

125120
var receivedBytes = buffer.readableBytes
126121
if receivedBytes > maxBytes {
127-
throw TooManyBytesError()
122+
throw NIOTooManyBytesError()
128123
}
129124

130125
while var next = try await iterator.next() {
131126
receivedBytes += next.readableBytes
132127
if receivedBytes > maxBytes {
133-
throw TooManyBytesError()
128+
throw NIOTooManyBytesError()
134129
}
135130

136131
buffer.writeBuffer(&next)

0 commit comments

Comments
 (0)