Skip to content

Commit

Permalink
Use a pool to allocated app chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
guhetier committed Feb 8, 2025
1 parent d4778ca commit 6078320
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 124 deletions.
11 changes: 8 additions & 3 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,7 @@ MsQuicStreamProvideReceiveBuffers(
{
QUIC_STATUS Status;
QUIC_OPERATION* Oper;
QUIC_CONNECTION* Connection = NULL;
CXPLAT_LIST_ENTRY ChunkList;
CxPlatListInitializeHead(&ChunkList);

Expand Down Expand Up @@ -1398,7 +1399,7 @@ MsQuicStreamProvideReceiveBuffers(
CXPLAT_TEL_ASSERT(!Stream->Flags.HandleClosed);
CXPLAT_TEL_ASSERT(!Stream->Flags.Freed);

QUIC_CONNECTION* Connection = Stream->Connection;
Connection = Stream->Connection;
QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

//
Expand Down Expand Up @@ -1432,7 +1433,8 @@ MsQuicStreamProvideReceiveBuffers(
// The allocation is done here to make the worker thread task failure free.
//
for (uint32_t i = 0; i < BufferCount; ++i) {
QUIC_RECV_CHUNK* Chunk = CXPLAT_ALLOC_NONPAGED(sizeof(QUIC_RECV_CHUNK), QUIC_POOL_RECVBUF);
QUIC_RECV_CHUNK* Chunk =
CxPlatPoolAlloc(&Connection->Worker->AppBufferChunkPool);
if (Chunk == NULL) {
QuicTraceEvent(
AllocFailure,
Expand All @@ -1442,7 +1444,7 @@ MsQuicStreamProvideReceiveBuffers(
Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Error;
}
QuicRecvChunkInitialize(Chunk, Buffers[i].Length, Buffers[i].Buffer);
QuicRecvChunkInitialize(Chunk, Buffers[i].Length, Buffers[i].Buffer, TRUE);
CxPlatListInsertTail(&ChunkList, &Chunk->Link);
}

Expand Down Expand Up @@ -1492,6 +1494,9 @@ MsQuicStreamProvideReceiveBuffers(
// Cleanup allocated chunks if the operation failed.
//
while (!CxPlatListIsEmpty(&ChunkList)) {
CXPLAT_DBG_ASSERT(Connection != NULL);
CxPlatPoolFree(&Connection->Worker->AppBufferChunkPool,
CXPLAT_CONTAINING_RECORD(CxPlatListRemoveHead(&ChunkList), QUIC_RECV_CHUNK, Link));
CXPLAT_FREE(
CXPLAT_CONTAINING_RECORD(
CxPlatListRemoveHead(&ChunkList),
Expand Down
1 change: 1 addition & 0 deletions src/core/crypto.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ QuicCryptoInitialize(
InitialRecvBufferLength,
QUIC_DEFAULT_STREAM_FC_WINDOW_SIZE / 2,
QUIC_RECV_BUF_MODE_SINGLE,
&Connection->Worker->AppBufferChunkPool,
NULL);
if (QUIC_FAILED(Status)) {
goto Exit;
Expand Down
90 changes: 54 additions & 36 deletions src/core/recv_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,44 @@ void
QuicRecvChunkInitialize(
_Inout_ QUIC_RECV_CHUNK* Chunk,
_In_ uint32_t AllocLength,
_Inout_updates_(AllocLength) uint8_t* Buffer
_Inout_updates_(AllocLength) uint8_t* Buffer,
_In_ BOOLEAN AppOwnedBuffer
)
{
Chunk->AllocLength = AllocLength;
Chunk->Buffer = Buffer;
Chunk->ExternalReference = FALSE;
Chunk->AppOwnedBuffer = AppOwnedBuffer;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS // TODO - Can only fail if PreallocatedChunk == NULL
void
QuicRecvChunkFree(
_In_ QUIC_RECV_BUFFER* RecvBuffer,
_In_ QUIC_RECV_CHUNK* Chunk
)
{
CXPLAT_DBG_ASSERT(!Chunk->ExternalReference);

if (Chunk == RecvBuffer->PreallocatedChunk) {
return;
}

if (Chunk->AppOwnedBuffer) {
CxPlatPoolFree(RecvBuffer->AppBufferChunkPool, Chunk);
} else {
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
}
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QuicRecvBufferInitialize(
_Inout_ QUIC_RECV_BUFFER* RecvBuffer,
_In_ uint32_t AllocBufferLength,
_In_ uint32_t VirtualBufferLength,
_In_ QUIC_RECV_BUF_MODE RecvMode,
_In_ CXPLAT_POOL* AppBufferChunkPool,
_In_opt_ QUIC_RECV_CHUNK* PreallocatedChunk
)
{
Expand All @@ -81,6 +104,7 @@ QuicRecvBufferInitialize(
RecvBuffer->ReadPendingLength = 0;
RecvBuffer->ReadLength = 0;
RecvBuffer->RecvMode = RecvMode;
RecvBuffer->AppBufferChunkPool = AppBufferChunkPool;
QuicRangeInitialize(QUIC_MAX_RANGE_ALLOC_SIZE, &RecvBuffer->WrittenRanges);
CxPlatListInitializeHead(&RecvBuffer->Chunks);

Expand All @@ -103,7 +127,7 @@ QuicRecvBufferInitialize(
sizeof(QUIC_RECV_CHUNK) + AllocBufferLength);
return QUIC_STATUS_OUT_OF_MEMORY;
}
QuicRecvChunkInitialize(Chunk, AllocBufferLength, (uint8_t*)(Chunk + 1));
QuicRecvChunkInitialize(Chunk, AllocBufferLength, (uint8_t*)(Chunk + 1), FALSE);
}
CxPlatListInsertHead(&RecvBuffer->Chunks, &Chunk->Link);
RecvBuffer->Capacity = AllocBufferLength;
Expand All @@ -130,9 +154,7 @@ QuicRecvBufferUninitialize(
CxPlatListRemoveHead(&RecvBuffer->Chunks),
QUIC_RECV_CHUNK,
Link);
if (Chunk != RecvBuffer->PreallocatedChunk) {
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
}
QuicRecvChunkFree(RecvBuffer, Chunk);
}
}

Expand Down Expand Up @@ -277,7 +299,7 @@ QuicRecvBufferResize(
return FALSE;
}

QuicRecvChunkInitialize(NewChunk, TargetBufferLength, (uint8_t*)(NewChunk + 1));
QuicRecvChunkInitialize(NewChunk, TargetBufferLength, (uint8_t*)(NewChunk + 1), FALSE);
CxPlatListInsertTail(&RecvBuffer->Chunks, &NewChunk->Link);

if (!LastChunk->ExternalReference) {
Expand Down Expand Up @@ -325,9 +347,7 @@ QuicRecvBufferResize(
}

CxPlatListEntryRemove(&LastChunk->Link);
if (LastChunk != RecvBuffer->PreallocatedChunk) {
CXPLAT_FREE(LastChunk, QUIC_POOL_RECVBUF);
}
QuicRecvChunkFree(RecvBuffer, LastChunk);

return TRUE;
}
Expand Down Expand Up @@ -1012,7 +1032,6 @@ QuicRecvBufferPartialDrain(
RecvBuffer->Chunks.Flink,
QUIC_RECV_CHUNK,
Link);
CXPLAT_DBG_ASSERT(Chunk->ExternalReference);

if (Chunk->Link.Flink != &RecvBuffer->Chunks &&
(RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_SINGLE ||
Expand All @@ -1024,17 +1043,14 @@ QuicRecvBufferPartialDrain(
// operating on the next one.
//
CxPlatListEntryRemove(&Chunk->Link);
if (Chunk != RecvBuffer->PreallocatedChunk) {
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
}
QuicRecvChunkFree(RecvBuffer, Chunk);

CXPLAT_DBG_ASSERT(!CxPlatListIsEmpty(&RecvBuffer->Chunks));
Chunk =
CXPLAT_CONTAINING_RECORD(
RecvBuffer->Chunks.Flink,
QUIC_RECV_CHUNK,
Link);
CXPLAT_DBG_ASSERT(!Chunk->ExternalReference);
RecvBuffer->ReadStart = 0;
}

Expand Down Expand Up @@ -1074,15 +1090,9 @@ QuicRecvBufferPartialDrain(
RecvBuffer->ReadLength -= (uint32_t)DrainLength;
}

if (RecvBuffer->RecvMode != QUIC_RECV_BUF_MODE_MULTIPLE) {
//
// Unless we are in multiple mode, a partial drain means the app isn't
// referencing any chunks anymore.
//
Chunk->ExternalReference = FALSE;
} else {
if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE) {
//
// If all ReadPending data is drained, then we can release the external reference
// If all ReadPending data is drained, then we can release the external reference.
//
Chunk->ExternalReference = RecvBuffer->ReadPendingLength != DrainLength;
CXPLAT_DBG_ASSERT(DrainLength <= RecvBuffer->ReadPendingLength);
Expand Down Expand Up @@ -1118,13 +1128,12 @@ QuicRecvBufferFullDrain(
RecvBuffer->Chunks.Flink,
QUIC_RECV_CHUNK,
Link);
CXPLAT_DBG_ASSERT(Chunk->ExternalReference);

Chunk->ExternalReference = FALSE;
DrainLength -= RecvBuffer->ReadLength;
RecvBuffer->ReadStart = 0;
RecvBuffer->BaseOffset += RecvBuffer->ReadLength;
if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_MULTIPLE) {
Chunk->ExternalReference = FALSE;
RecvBuffer->ReadPendingLength -= RecvBuffer->ReadLength;
}
if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_EXTERNAL) {
Expand All @@ -1151,9 +1160,7 @@ QuicRecvBufferFullDrain(
// free the last chunk.
//
CxPlatListEntryRemove(&Chunk->Link);
if (Chunk != RecvBuffer->PreallocatedChunk) {
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
}
QuicRecvChunkFree(RecvBuffer, Chunk);
RecvBuffer->Capacity = 0;
}

Expand All @@ -1165,9 +1172,7 @@ QuicRecvBufferFullDrain(
// going to re-use this one. Free it.
//
CxPlatListEntryRemove(&Chunk->Link);
if (Chunk != RecvBuffer->PreallocatedChunk) {
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
}
QuicRecvChunkFree(RecvBuffer, Chunk);

//
// The rest of the contiguous data might not fit in just the next chunk
Expand Down Expand Up @@ -1197,9 +1202,22 @@ QuicRecvBufferDrain(
)
{
CXPLAT_DBG_ASSERT(DrainLength <= RecvBuffer->ReadPendingLength);

//
// Mark chunks as no longer externally referenced reset the read-pending data length.
// For Multiple mode, this is done when each chunk is drained.
//
if (RecvBuffer->RecvMode != QUIC_RECV_BUF_MODE_MULTIPLE) {
for (CXPLAT_LIST_ENTRY* Link = RecvBuffer->Chunks.Flink;
Link != &RecvBuffer->Chunks;
Link = Link->Flink) {
QUIC_RECV_CHUNK* Chunk =
CXPLAT_CONTAINING_RECORD(Link, QUIC_RECV_CHUNK, Link);
Chunk->ExternalReference = FALSE;
}
RecvBuffer->ReadPendingLength = 0;
}

QUIC_SUBRANGE* FirstRange = QuicRangeGet(&RecvBuffer->WrittenRanges, 0);
CXPLAT_DBG_ASSERT(FirstRange);
CXPLAT_DBG_ASSERT(FirstRange->Low == 0);
Expand All @@ -1226,18 +1244,18 @@ QuicRecvBufferDrain(
PartialDrain &= (uint64_t)RecvBuffer->Capacity > DrainLength;
} else if (RecvBuffer->RecvMode == QUIC_RECV_BUF_MODE_EXTERNAL) {
//
// In external mode, the chunk must be full drained only if its capacity reaches 0.
// In external mode, the chunk must be fully drained only if its capacity reaches 0.
// Otherwise, we either have more bytes to read, or more space to write.
// Contrary to other modes, we can reset ReadStart to the start of the buffer whenever
// we drained all written data.
// Contrary to other modes, we cannot reset ReadStart to the start of the buffer
// whenever we drained all written data.
//
PartialDrain = (uint64_t)RecvBuffer->Capacity > DrainLength;
}

if (PartialDrain) {
//
// In single/circular mode, a full drain must be done only all the data
// written to the buffer got read.
// In single/circular mode, a full drain must be done only if all the data
// written to the buffer was read.
// A partial drain is done if not all the readily readable data was read
// or if the read is limited by a gap in the data.
//
Expand Down
30 changes: 22 additions & 8 deletions src/core/recv_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@ typedef enum QUIC_RECV_BUF_MODE {
// Represents a single contiguous range of bytes.
//
typedef struct QUIC_RECV_CHUNK {
CXPLAT_LIST_ENTRY Link; // Link in the list of chunks.
uint32_t AllocLength : 31; // Allocation size of Buffer
uint32_t ExternalReference : 1; // Indicates the buffer is being used externally.
_Field_size_(AllocLength)
uint8_t *Buffer; // Pointer to the buffer itself. Doesn't need to be freed independently:
// - for internally allocated buffers, points in the same allocation.
// - for exteral buffers, the buffer isn't owned
CXPLAT_LIST_ENTRY Link; // Link in the list of chunks.
uint32_t AllocLength : 31; // Allocation size of Buffer
uint32_t ExternalReference : 1; // Indicates the buffer is being used externally.
uint8_t AppOwnedBuffer : 1; // Indicates the buffer is managed by the app.
uint8_t *Buffer; // Pointer to the buffer itself. Doesn't need to be freed independently:
// - for internally allocated buffers, points in the same allocation.
// - for exteral buffers, the buffer isn't owned
} QUIC_RECV_CHUNK;

_IRQL_requires_max_(DISPATCH_LEVEL)
void
QuicRecvChunkInitialize(
_Inout_ QUIC_RECV_CHUNK* Chunk,
_In_ uint32_t AllocLength,
_Inout_updates_(AllocLength) uint8_t* Buffer
_Inout_updates_(AllocLength) uint8_t* Buffer,
_In_ BOOLEAN AppOwnedBuffer
);

typedef struct QUIC_RECV_BUFFER {
Expand All @@ -45,6 +46,12 @@ typedef struct QUIC_RECV_BUFFER {
//
CXPLAT_LIST_ENTRY Chunks;

//
// Pool for the chunks managing app provided buffers.
// See QUIC_RECV_CHUNK::AppOwnedBuffer
//
CXPLAT_POOL *AppBufferChunkPool;

//
// Optional, preallocated initial chunk.
//
Expand Down Expand Up @@ -101,13 +108,20 @@ typedef struct QUIC_RECV_BUFFER {

} QUIC_RECV_BUFFER;

//
// Initialize a QUIC_RECV_BUFFER.
// Can only fail if PreallocatedChunk == NULL && RecvMode != QUIC_RECV_BUF_MODE_EXTERNAL.
// PreallocatedChunk is owned by the caller and must be freed afte the buffer is uninitialized.
// AppBufferChunkPool is used to allocate and free the chunk managing app-provided buffers.
//
_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QuicRecvBufferInitialize(
_Inout_ QUIC_RECV_BUFFER* RecvBuffer,
_In_ uint32_t AllocBufferLength,
_In_ uint32_t VirtualBufferLength,
_In_ QUIC_RECV_BUF_MODE RecvMode,
_In_ CXPLAT_POOL* AppBufferChunkPool,
_In_opt_ QUIC_RECV_CHUNK* PreallocatedChunk
);

Expand Down
15 changes: 13 additions & 2 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ QuicStreamInitialize(
QuicRecvChunkInitialize(
PreallocatedRecvChunk,
InitialRecvBufferLength,
(uint8_t *)(PreallocatedRecvChunk + 1));
(uint8_t *)(PreallocatedRecvChunk + 1),
FALSE);
}

const uint32_t FlowControlWindowSize = Stream->Flags.Unidirectional
Expand All @@ -146,6 +147,7 @@ QuicStreamInitialize(
InitialRecvBufferLength,
FlowControlWindowSize,
RecvBufferMode,
&Connection->Worker->AppBufferChunkPool,
PreallocatedRecvChunk);
if (QUIC_FAILED(Status)) {
goto Exit;
Expand Down Expand Up @@ -977,7 +979,16 @@ QuicStreamSwitchToExternalBuffers(
)
{
QuicRecvBufferUninitialize(&Stream->RecvBuffer);
(void)QuicRecvBufferInitialize(&Stream->RecvBuffer, 0, 0, QUIC_RECV_BUF_MODE_EXTERNAL, NULL);
//
// Rq: Can't fail when initializing in external mode.
//
(void)QuicRecvBufferInitialize(
&Stream->RecvBuffer,
0,
0,
QUIC_RECV_BUF_MODE_EXTERNAL,
&Stream->Connection->Worker->AppBufferChunkPool,
NULL);
Stream->Flags.UseExternalRecvBuffers = TRUE;
}

Expand Down
Loading

0 comments on commit 6078320

Please sign in to comment.