Skip to content

Commit de27aa1

Browse files
authored
Add Stream ReadAtLeast and ReadExactly (#69272)
* Add Stream ReadAtLeast and ReadExactly Adds methods to Stream to read at least a minimum amount of bytes, or a full buffer, of data from the stream. ReadAtLeast allows for the caller to specify whether an exception should be thrown or not on the end of the stream. Make use of the new methods where appropriate in net7.0. Fix #16598 * Add ReadAtLeast and ReadExactly unit tests * Add XML docs to the new APIs * Preserve behavior in StreamReader.FillBuffer when passed 0. * Handle ReadExactly with an empty buffer, and ReadAtLeast with minimumBytes == 0. Both of these cases are a no-op. No exception is thrown. A read won't be issued to the underlying stream. They won't block until data is available.
1 parent 8d4a724 commit de27aa1

File tree

26 files changed

+969
-218
lines changed

26 files changed

+969
-218
lines changed

src/libraries/System.Formats.Tar/src/System/Formats/Tar/TarHeader.Read.cs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ internal bool TryGetNextHeader(Stream archiveStream, bool copyData)
2626
Span<byte> buffer = rented.AsSpan(0, TarHelpers.RecordSize); // minimumLength means the array could've been larger
2727
buffer.Clear(); // Rented arrays aren't clean
2828

29-
TarHelpers.ReadOrThrow(archiveStream, buffer);
29+
archiveStream.ReadExactly(buffer);
3030

3131
try
3232
{
@@ -486,10 +486,7 @@ private void ReadExtendedAttributesBlock(Stream archiveStream)
486486
}
487487

488488
byte[] buffer = new byte[(int)_size];
489-
if (archiveStream.Read(buffer.AsSpan()) != _size)
490-
{
491-
throw new EndOfStreamException();
492-
}
489+
archiveStream.ReadExactly(buffer);
493490

494491
string dataAsString = TarHelpers.GetTrimmedUtf8String(buffer);
495492

@@ -520,11 +517,7 @@ private void ReadGnuLongPathDataBlock(Stream archiveStream)
520517
}
521518

522519
byte[] buffer = new byte[(int)_size];
523-
524-
if (archiveStream.Read(buffer.AsSpan()) != _size)
525-
{
526-
throw new EndOfStreamException();
527-
}
520+
archiveStream.ReadExactly(buffer);
528521

529522
string longPath = TarHelpers.GetTrimmedUtf8String(buffer);
530523

src/libraries/System.Formats.Tar/src/System/Formats/Tar/TarHelpers.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,22 +151,6 @@ private static string GetTrimmedString(ReadOnlySpan<byte> buffer, Encoding encod
151151
// removing the trailing null or space chars.
152152
internal static string GetTrimmedUtf8String(ReadOnlySpan<byte> buffer) => GetTrimmedString(buffer, Encoding.UTF8);
153153

154-
// Reads the specified number of bytes and stores it in the byte buffer passed by reference.
155-
// Throws if end of stream is reached.
156-
internal static void ReadOrThrow(Stream archiveStream, Span<byte> buffer)
157-
{
158-
int totalRead = 0;
159-
while (totalRead < buffer.Length)
160-
{
161-
int bytesRead = archiveStream.Read(buffer.Slice(totalRead));
162-
if (bytesRead == 0)
163-
{
164-
throw new EndOfStreamException();
165-
}
166-
totalRead += bytesRead;
167-
}
168-
}
169-
170154
// Returns true if it successfully converts the specified string to a DateTimeOffset, false otherwise.
171155
internal static bool TryConvertToDateTimeOffset(string value, out DateTimeOffset timestamp)
172156
{

src/libraries/System.IO.Compression/src/System/IO/Compression/ZipHelper.cs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,10 @@ internal static Encoding GetEncoding(string text)
4040
/// </summary>
4141
internal static void ReadBytes(Stream stream, byte[] buffer, int bytesToRead)
4242
{
43-
int bytesLeftToRead = bytesToRead;
44-
45-
int totalBytesRead = 0;
46-
47-
while (bytesLeftToRead > 0)
43+
int bytesRead = stream.ReadAtLeast(buffer.AsSpan(0, bytesToRead), bytesToRead, throwOnEndOfStream: false);
44+
if (bytesRead < bytesToRead)
4845
{
49-
int bytesRead = stream.Read(buffer, totalBytesRead, bytesLeftToRead);
50-
if (bytesRead == 0) throw new IOException(SR.UnexpectedEndOfStream);
51-
52-
totalBytesRead += bytesRead;
53-
bytesLeftToRead -= bytesRead;
46+
throw new IOException(SR.UnexpectedEndOfStream);
5447
}
5548
}
5649

src/libraries/System.IO/tests/BinaryReader/BinaryReaderTests.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,5 +437,28 @@ public void Read_CharSpan_ThrowIfDisposed()
437437
Assert.Throws<ObjectDisposedException>(() => binaryReader.Read(new Span<char>()));
438438
}
439439
}
440+
441+
private class DerivedBinaryReader : BinaryReader
442+
{
443+
public DerivedBinaryReader(Stream input) : base(input) { }
444+
445+
public void CallFillBuffer0()
446+
{
447+
FillBuffer(0);
448+
}
449+
}
450+
451+
[Fact]
452+
public void FillBuffer_Zero_Throws()
453+
{
454+
using Stream stream = CreateStream();
455+
456+
string hello = "Hello";
457+
stream.Write(Encoding.ASCII.GetBytes(hello));
458+
stream.Position = 0;
459+
460+
using var derivedReader = new DerivedBinaryReader(stream);
461+
Assert.Throws<EndOfStreamException>(derivedReader.CallFillBuffer0);
462+
}
440463
}
441464
}
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Xunit;
7+
8+
namespace System.IO.Tests
9+
{
10+
public class Stream_ReadAtLeast
11+
{
12+
[Theory]
13+
[InlineData(true)]
14+
[InlineData(false)]
15+
public async Task DelegatesToRead_Success(bool async)
16+
{
17+
bool readInvoked = false;
18+
var s = new DelegateStream(
19+
canReadFunc: () => true,
20+
readFunc: (array, offset, count) =>
21+
{
22+
readInvoked = true;
23+
Assert.NotNull(array);
24+
Assert.Equal(0, offset);
25+
Assert.Equal(30, count);
26+
27+
for (int i = 0; i < 10; i++) array[offset + i] = (byte)i;
28+
return 10;
29+
});
30+
31+
byte[] buffer = new byte[30];
32+
33+
Assert.Equal(10, async ? await s.ReadAtLeastAsync(buffer, 10) : s.ReadAtLeast(buffer, 10));
34+
Assert.True(readInvoked);
35+
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
36+
for (int i = 10; i < 30; i++) Assert.Equal(0, buffer[i]);
37+
}
38+
39+
[Theory]
40+
[InlineData(true)]
41+
[InlineData(false)]
42+
public async Task ReadMoreThanOnePage(bool async)
43+
{
44+
int readInvokedCount = 0;
45+
var s = new DelegateStream(
46+
canReadFunc: () => true,
47+
readFunc: (array, offset, count) =>
48+
{
49+
readInvokedCount++;
50+
51+
for (int i = 0; i < 10; i++) array[offset + i] = (byte)i;
52+
return 10;
53+
});
54+
55+
byte[] buffer = new byte[30];
56+
57+
Assert.Equal(20, async ? await s.ReadAtLeastAsync(buffer, 20) : s.ReadAtLeast(buffer, 20));
58+
Assert.Equal(2, readInvokedCount);
59+
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
60+
for (int i = 10; i < 20; i++) Assert.Equal(i - 10, buffer[i]);
61+
for (int i = 20; i < 30; i++) Assert.Equal(0, buffer[i]);
62+
}
63+
64+
[Theory]
65+
[InlineData(true)]
66+
[InlineData(false)]
67+
public async Task ReadMoreThanMinimumBytes(bool async)
68+
{
69+
int readInvokedCount = 0;
70+
var s = new DelegateStream(
71+
canReadFunc: () => true,
72+
readFunc: (array, offset, count) =>
73+
{
74+
readInvokedCount++;
75+
76+
int byteCount = Math.Min(count, 10);
77+
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
78+
return byteCount;
79+
});
80+
81+
// first try with a buffer that doesn't fill 3 full pages
82+
byte[] buffer = new byte[28];
83+
84+
Assert.Equal(28, async ? await s.ReadAtLeastAsync(buffer, 22) : s.ReadAtLeast(buffer, 22));
85+
Assert.Equal(3, readInvokedCount);
86+
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
87+
for (int i = 10; i < 20; i++) Assert.Equal(i - 10, buffer[i]);
88+
for (int i = 20; i < 28; i++) Assert.Equal(i - 20, buffer[i]);
89+
90+
// now try with a buffer that is bigger than 3 pages
91+
readInvokedCount = 0;
92+
buffer = new byte[32];
93+
94+
Assert.Equal(30, async ? await s.ReadAtLeastAsync(buffer, 22) : s.ReadAtLeast(buffer, 22));
95+
Assert.Equal(3, readInvokedCount);
96+
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
97+
for (int i = 10; i < 20; i++) Assert.Equal(i - 10, buffer[i]);
98+
for (int i = 20; i < 30; i++) Assert.Equal(i - 20, buffer[i]);
99+
for (int i = 30; i < 32; i++) Assert.Equal(0, buffer[i]);
100+
}
101+
102+
[Theory]
103+
[InlineData(true)]
104+
[InlineData(false)]
105+
public async Task ReadAtLeastZero(bool async)
106+
{
107+
int readInvokedCount = 0;
108+
var s = new DelegateStream(
109+
canReadFunc: () => true,
110+
readFunc: (array, offset, count) =>
111+
{
112+
readInvokedCount++;
113+
114+
int byteCount = Math.Min(count, 10);
115+
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
116+
return byteCount;
117+
});
118+
119+
byte[] buffer = new byte[20];
120+
121+
// ReadAtLeast minimumBytes=0 is a no-op
122+
Assert.Equal(0, async ? await s.ReadAtLeastAsync(buffer, 0) : s.ReadAtLeast(buffer, 0));
123+
Assert.Equal(0, readInvokedCount);
124+
125+
// now try with an empty buffer
126+
byte[] emptyBuffer = Array.Empty<byte>();
127+
128+
Assert.Equal(0, async ? await s.ReadAtLeastAsync(emptyBuffer, 0) : s.ReadAtLeast(emptyBuffer, 0));
129+
Assert.Equal(0, readInvokedCount);
130+
}
131+
132+
[Theory]
133+
[InlineData(true)]
134+
[InlineData(false)]
135+
public async Task NegativeMinimumBytes(bool async)
136+
{
137+
int readInvokedCount = 0;
138+
var s = new DelegateStream(
139+
canReadFunc: () => true,
140+
readFunc: (array, offset, count) =>
141+
{
142+
readInvokedCount++;
143+
144+
int byteCount = Math.Min(count, 10);
145+
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
146+
return byteCount;
147+
});
148+
149+
byte[] buffer = new byte[10];
150+
if (async)
151+
{
152+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, -1));
153+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, -10));
154+
}
155+
else
156+
{
157+
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, -1));
158+
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, -10));
159+
}
160+
Assert.Equal(0, readInvokedCount);
161+
}
162+
163+
[Theory]
164+
[InlineData(true)]
165+
[InlineData(false)]
166+
public async Task BufferSmallerThanMinimumBytes(bool async)
167+
{
168+
int readInvokedCount = 0;
169+
var s = new DelegateStream(
170+
canReadFunc: () => true,
171+
readFunc: (array, offset, count) =>
172+
{
173+
readInvokedCount++;
174+
175+
int byteCount = Math.Min(count, 10);
176+
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
177+
return byteCount;
178+
});
179+
180+
byte[] buffer = new byte[20];
181+
byte[] emptyBuffer = Array.Empty<byte>();
182+
if (async)
183+
{
184+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, 21));
185+
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(emptyBuffer, 1));
186+
}
187+
else
188+
{
189+
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, 21));
190+
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(emptyBuffer, 1));
191+
}
192+
}
193+
194+
[Theory]
195+
[InlineData(true)]
196+
[InlineData(false)]
197+
public async Task HandleEndOfStream(bool async)
198+
{
199+
int readInvokedCount = 0;
200+
var s = new DelegateStream(
201+
canReadFunc: () => true,
202+
readFunc: (array, offset, count) =>
203+
{
204+
readInvokedCount++;
205+
206+
if (readInvokedCount == 1)
207+
{
208+
int byteCount = Math.Min(count, 10);
209+
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
210+
return byteCount;
211+
}
212+
else
213+
{
214+
return 0;
215+
}
216+
});
217+
218+
byte[] buffer = new byte[20];
219+
if (async)
220+
{
221+
await Assert.ThrowsAsync<EndOfStreamException>(async () => await s.ReadAtLeastAsync(buffer, 11));
222+
}
223+
else
224+
{
225+
Assert.Throws<EndOfStreamException>(() => s.ReadAtLeast(buffer, 11));
226+
}
227+
Assert.Equal(2, readInvokedCount);
228+
229+
readInvokedCount = 0;
230+
231+
Assert.Equal(10, async ? await s.ReadAtLeastAsync(buffer, 11, throwOnEndOfStream: false) : s.ReadAtLeast(buffer, 11, throwOnEndOfStream: false));
232+
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
233+
for (int i = 10; i < 20; i++) Assert.Equal(0, buffer[i]);
234+
Assert.Equal(2, readInvokedCount);
235+
}
236+
237+
[Fact]
238+
public async Task CancellationTokenIsPassedThrough()
239+
{
240+
int readInvokedCount = 0;
241+
var s = new DelegateStream(
242+
canReadFunc: () => true,
243+
readAsyncFunc: (array, offset, count, cancellationToken) =>
244+
{
245+
readInvokedCount++;
246+
cancellationToken.ThrowIfCancellationRequested();
247+
248+
int byteCount = Math.Min(count, 10);
249+
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
250+
return Task.FromResult(10);
251+
});
252+
253+
byte[] buffer = new byte[20];
254+
255+
using CancellationTokenSource cts = new CancellationTokenSource();
256+
CancellationToken token = cts.Token;
257+
cts.Cancel();
258+
259+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await s.ReadAtLeastAsync(buffer, 10, cancellationToken: token));
260+
Assert.Equal(1, readInvokedCount);
261+
}
262+
}
263+
}

0 commit comments

Comments
 (0)