Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 26 additions & 33 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { invariant } from '../jsutils/invariant.js';
import type { ObjMap } from '../jsutils/ObjMap.js';
import { pathToArray } from '../jsutils/Path.js';

Expand All @@ -7,7 +6,6 @@ import type { GraphQLError } from '../error/GraphQLError.js';
import type { AbortSignalListener } from './AbortSignalListener.js';
import { IncrementalGraph } from './IncrementalGraph.js';
import type {
CancellableStreamRecord,
CompletedExecutionGroup,
CompletedResult,
DeferredFragmentRecord,
Expand All @@ -21,23 +19,25 @@ import type {
InitialIncrementalExecutionResult,
PendingResult,
StreamItemsResult,
StreamRecord,
SubsequentIncrementalExecutionResult,
} from './types.js';
import {
isCancellableStreamRecord,
isCompletedExecutionGroup,
isFailedExecutionGroup,
} from './types.js';
import { isCompletedExecutionGroup, isFailedExecutionGroup } from './types.js';
import { withCleanup } from './withCleanup.js';

// eslint-disable-next-line max-params
export function buildIncrementalResponse(
context: IncrementalPublisherContext,
result: ObjMap<unknown>,
errors: ReadonlyArray<GraphQLError>,
newDeferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord> | undefined,
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
earlyReturns: Map<StreamRecord, () => Promise<unknown>>,
abortSignalListener: AbortSignalListener | undefined,
): ExperimentalIncrementalExecutionResults {
const incrementalPublisher = new IncrementalPublisher(context);
const incrementalPublisher = new IncrementalPublisher(
earlyReturns,
abortSignalListener,
);
return incrementalPublisher.buildResponse(
result,
errors,
Expand All @@ -46,11 +46,6 @@ export function buildIncrementalResponse(
);
}

interface IncrementalPublisherContext {
abortSignalListener: AbortSignalListener | undefined;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
}

interface SubsequentIncrementalExecutionResultContext {
pending: Array<PendingResult>;
incremental: Array<IncrementalResult>;
Expand All @@ -64,12 +59,17 @@ interface SubsequentIncrementalExecutionResultContext {
* @internal
*/
class IncrementalPublisher {
private _context: IncrementalPublisherContext;
private _earlyReturns: Map<StreamRecord, () => Promise<unknown>>;
private _abortSignalListener: AbortSignalListener | undefined;
private _nextId: number;
private _incrementalGraph: IncrementalGraph;

constructor(context: IncrementalPublisherContext) {
this._context = context;
constructor(
earlyReturns: Map<StreamRecord, () => Promise<unknown>>,
abortSignalListener: AbortSignalListener | undefined,
) {
this._earlyReturns = earlyReturns;
this._abortSignalListener = abortSignalListener;
this._nextId = 0;
this._incrementalGraph = new IncrementalGraph();
}
Expand Down Expand Up @@ -100,7 +100,7 @@ class IncrementalPublisher {
return {
initialResult,
subsequentResults: withCleanup(subsequentResults, async () => {
this._context.abortSignalListener?.disconnect();
this._abortSignalListener?.disconnect();
await this._returnAsyncIteratorsIgnoringErrors();
}),
};
Expand Down Expand Up @@ -241,21 +241,18 @@ class IncrementalPublisher {
errors: streamItemsResult.errors,
});
this._incrementalGraph.removeStream(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
invariant(this._context.cancellableStreams !== undefined);
this._context.cancellableStreams.delete(streamRecord);
streamRecord.earlyReturn().catch(() => {
const earlyReturn = this._earlyReturns.get(streamRecord);
if (earlyReturn !== undefined) {
earlyReturn().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
this._earlyReturns.delete(streamRecord);
}
} else if (streamItemsResult.result === undefined) {
context.completed.push({ id });
this._incrementalGraph.removeStream(streamRecord);
if (isCancellableStreamRecord(streamRecord)) {
invariant(this._context.cancellableStreams !== undefined);
this._context.cancellableStreams.delete(streamRecord);
}
this._earlyReturns.delete(streamRecord);
} else {
const incrementalEntry: IncrementalStreamResult = {
id,
Expand Down Expand Up @@ -310,14 +307,10 @@ class IncrementalPublisher {
}

private async _returnAsyncIterators(): Promise<void> {
const cancellableStreams = this._context.cancellableStreams;
if (cancellableStreams === undefined) {
return;
}
const promises: Array<Promise<unknown>> = [];
for (const streamRecord of cancellableStreams) {
if (streamRecord.earlyReturn !== undefined) {
promises.push(streamRecord.earlyReturn());
for (const earlyReturn of this._earlyReturns.values()) {
if (earlyReturn !== undefined) {
promises.push(earlyReturn());
}
}
await Promise.all(promises);
Expand Down
49 changes: 19 additions & 30 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import { getVariableSignature } from './getVariableSignature.js';
import { buildIncrementalResponse } from './IncrementalPublisher.js';
import { mapAsyncIterable } from './mapAsyncIterable.js';
import type {
CancellableStreamRecord,
CompletedExecutionGroup,
ExecutionResult,
ExperimentalIncrementalExecutionResults,
Expand Down Expand Up @@ -172,7 +171,7 @@ export interface ExecutionContext {
errors: Array<GraphQLError>;
abortSignalListener: AbortSignalListener | undefined;
completed: boolean;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
earlyReturns: Map<StreamRecord, () => Promise<unknown>> | undefined;
errorPropagation: boolean;
}

Expand Down Expand Up @@ -344,7 +343,7 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent(
? new AbortSignalListener(abortSignal)
: undefined,
completed: false,
cancellableStreams: undefined,
earlyReturns: undefined,
errorPropagation: errorPropagation(validatedExecutionArgs.operation),
};
try {
Expand Down Expand Up @@ -428,11 +427,12 @@ function buildDataResponse(
}

return buildIncrementalResponse(
exeContext,
data,
errors,
newDeferredFragmentRecords,
incrementalDataRecords,
(exeContext.earlyReturns ??= new Map()),
exeContext.abortSignalListener,
);
}

Expand Down Expand Up @@ -1351,10 +1351,6 @@ async function completeAsyncIteratorValue(
fieldDetailsList,
path,
);
const earlyReturn =
asyncIterator.return === undefined
? undefined
: asyncIterator.return.bind(asyncIterator);
try {
while (true) {
if (streamUsage && index >= streamUsage.initialCount) {
Expand All @@ -1368,22 +1364,17 @@ async function completeAsyncIteratorValue(
itemType,
);

let streamRecord: StreamRecord | CancellableStreamRecord;
if (earlyReturn === undefined) {
streamRecord = {
label: streamUsage.label,
path,
streamItemQueue,
};
} else {
streamRecord = {
label: streamUsage.label,
path,
earlyReturn,
streamItemQueue,
};
exeContext.cancellableStreams ??= new Set();
exeContext.cancellableStreams.add(streamRecord);
const streamRecord: StreamRecord = {
label: streamUsage.label,
path,
streamItemQueue,
};
if (asyncIterator.return !== undefined) {
exeContext.earlyReturns ??= new Map();
exeContext.earlyReturns.set(
streamRecord,
asyncIterator.return.bind(asyncIterator),
);
}

addIncrementalDataRecords(graphqlWrappedResult, [streamRecord]);
Expand Down Expand Up @@ -1450,12 +1441,10 @@ async function completeAsyncIteratorValue(
index++;
}
} catch (error) {
if (earlyReturn !== undefined) {
earlyReturn().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
}
asyncIterator.return?.().catch(() => {
/* c8 ignore next 1 */
// ignore error
});
throw error;
}

Expand Down
10 changes: 0 additions & 10 deletions src/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,6 @@ export interface StreamItemsResult {
incrementalDataRecords?: ReadonlyArray<IncrementalDataRecord> | undefined;
}

export interface CancellableStreamRecord extends StreamRecord {
earlyReturn: () => Promise<unknown>;
}

export function isCancellableStreamRecord(
deliveryGroup: DeliveryGroup,
): deliveryGroup is CancellableStreamRecord {
return 'earlyReturn' in deliveryGroup;
}

export type IncrementalDataRecord = PendingExecutionGroup | StreamRecord;

export type IncrementalDataRecordResult =
Expand Down
Loading