Skip to content

Commit 35db835

Browse files
authored
Add app termination hooks (#623)
1 parent 7d8f850 commit 35db835

File tree

8 files changed

+353
-1
lines changed

8 files changed

+353
-1
lines changed

src/WorkerChannel.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export class WorkerChannel {
4545
#preInvocationHooks: HookCallback[] = [];
4646
#postInvocationHooks: HookCallback[] = [];
4747
#appStartHooks: HookCallback[] = [];
48+
#appTerminateHooks: HookCallback[] = [];
4849
functions: { [id: string]: RegisteredFunction } = {};
4950
hasIndexedFunctions = false;
5051

@@ -112,6 +113,8 @@ export class WorkerChannel {
112113
return this.#postInvocationHooks;
113114
case 'appStart':
114115
return this.#appStartHooks;
116+
case 'appTerminate':
117+
return this.#appTerminateHooks;
115118
default:
116119
throw new RangeError(`Unrecognized hook "${hookName}"`);
117120
}

src/eventHandlers/WorkerInitHandler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export class WorkerInitHandler extends EventHandler<'workerInitRequest', 'worker
5555
UseNullableValueDictionaryForHttp: 'true',
5656
WorkerStatus: 'true',
5757
TypedDataCollection: 'true',
58+
HandlesWorkerTerminateMessage: 'true',
5859
};
5960

6061
return response;

src/eventHandlers/terminateWorker.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import { AppTerminateContext } from '@azure/functions-core';
5+
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
6+
import { ReadOnlyError } from '../utils/ReadOnlyError';
7+
import { WorkerChannel } from '../WorkerChannel';
8+
import LogCategory = rpc.RpcLog.RpcLogCategory;
9+
import LogLevel = rpc.RpcLog.Level;
10+
11+
export async function terminateWorker(channel: WorkerChannel, _msg: rpc.IWorkerTerminate) {
12+
channel.log({
13+
message: 'Received workerTerminate message; gracefully shutting down worker',
14+
level: LogLevel.Debug,
15+
logCategory: LogCategory.System,
16+
});
17+
18+
const appTerminateContext: AppTerminateContext = {
19+
get hookData() {
20+
return channel.appLevelOnlyHookData;
21+
},
22+
set hookData(_obj) {
23+
throw new ReadOnlyError('hookData');
24+
},
25+
get appHookData() {
26+
return channel.appHookData;
27+
},
28+
set appHookData(_obj) {
29+
throw new ReadOnlyError('appHookData');
30+
},
31+
};
32+
33+
await channel.executeHooks('appTerminate', appTerminateContext);
34+
35+
channel.eventStream.end();
36+
process.exit(0);
37+
}

src/setupEventStream.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { FunctionEnvironmentReloadHandler } from './eventHandlers/FunctionEnviro
77
import { FunctionLoadHandler } from './eventHandlers/FunctionLoadHandler';
88
import { FunctionsMetadataHandler } from './eventHandlers/FunctionsMetadataHandler';
99
import { InvocationHandler } from './eventHandlers/InvocationHandler';
10+
import { terminateWorker } from './eventHandlers/terminateWorker';
1011
import { WorkerInitHandler } from './eventHandlers/WorkerInitHandler';
1112
import { ensureErrorType } from './utils/ensureErrorType';
1213
import { InternalException } from './utils/InternalException';
@@ -66,6 +67,11 @@ async function handleMessage(workerId: string, channel: WorkerChannel, inMsg: rp
6667
case 'workerInitRequest':
6768
eventHandler = new WorkerInitHandler();
6869
break;
70+
case 'workerTerminate':
71+
// Worker terminate request is a special request which gracefully shuts down worker
72+
// It doesn't have a response so we don't have an EventHandler class for it
73+
await terminateWorker(channel, nonNullProp(inMsg, eventName));
74+
return;
6975
case 'workerStatusRequest':
7076
// Worker sends the host empty response to evaluate the worker's latency
7177
// The response doesn't even allow a `result` property, which is why we don't implement an EventHandler class
@@ -81,7 +87,6 @@ async function handleMessage(workerId: string, channel: WorkerChannel, inMsg: rp
8187
case 'invocationCancel':
8288
case 'startStream':
8389
case 'workerHeartbeat':
84-
case 'workerTerminate':
8590
// Not yet implemented
8691
return;
8792
default:

test/eventHandlers/InvocationHandler.test.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { LegacyFunctionLoader } from '../../src/LegacyFunctionLoader';
1313
import { WorkerChannel } from '../../src/WorkerChannel';
1414
import { Msg as AppStartMsg } from '../startApp.test';
1515
import { beforeEventHandlerSuite } from './beforeEventHandlerSuite';
16+
import { Msg as WorkerTerminateMsg } from './terminateWorker.test';
1617
import { TestEventStream } from './TestEventStream';
1718
import { Msg as WorkerInitMsg } from './WorkerInitHandler.test';
1819
import LogCategory = rpc.RpcLog.RpcLogCategory;
@@ -333,13 +334,15 @@ describe('InvocationHandler', () => {
333334
let channel: WorkerChannel;
334335
let coreApi: typeof coreTypes;
335336
let testDisposables: coreTypes.Disposable[] = [];
337+
let processExitStub: sinon.SinonStub;
336338

337339
before(async () => {
338340
const result = beforeEventHandlerSuite();
339341
stream = result.stream;
340342
loader = <TestFunctionLoader>result.loader;
341343
channel = result.channel;
342344
coreApi = await import('@azure/functions-core');
345+
processExitStub = sinon.stub(process, 'exit');
343346
});
344347

345348
beforeEach(async () => {
@@ -354,6 +357,10 @@ describe('InvocationHandler', () => {
354357
testDisposables = [];
355358
});
356359

360+
after(() => {
361+
processExitStub.restore();
362+
});
363+
357364
function sendInvokeMessage(inputData?: rpc.IParameterBinding[] | null): void {
358365
stream.addTestMessage({
359366
requestId: 'testReqId',
@@ -924,6 +931,57 @@ describe('InvocationHandler', () => {
924931
expect(hookData).to.equal('appStartpreInvocpostInvoc');
925932
});
926933

934+
it('appHookData changes from invocation hooks are persisted in app terminate hook contexts', async () => {
935+
const expectedAppHookData = {
936+
hello: 'world',
937+
test: {
938+
test2: 3,
939+
},
940+
};
941+
942+
loader.getFunction.returns({ callback: async () => {}, metadata: Binding.queue });
943+
944+
testDisposables.push(
945+
coreApi.registerHook('preInvocation', (context: coreTypes.PreInvocationContext) => {
946+
Object.assign(context.appHookData, expectedAppHookData);
947+
hookData += 'preInvoc';
948+
})
949+
);
950+
951+
testDisposables.push(
952+
coreApi.registerHook('postInvocation', (context: coreTypes.PostInvocationContext) => {
953+
expect(context.appHookData).to.deep.equal(expectedAppHookData);
954+
hookData += 'postInvoc';
955+
})
956+
);
957+
958+
sendInvokeMessage([InputData.http]);
959+
await stream.assertCalledWith(
960+
Msg.receivedInvocLog(),
961+
Msg.executingHooksLog(1, 'preInvocation'),
962+
Msg.executedHooksLog('preInvocation'),
963+
Msg.executingHooksLog(1, 'postInvocation'),
964+
Msg.executedHooksLog('postInvocation'),
965+
Msg.invocResponse([])
966+
);
967+
968+
const terminateFunc = sinon.spy((context: coreTypes.AppTerminateContext) => {
969+
expect(context.appHookData).to.deep.equal(expectedAppHookData);
970+
hookData += 'appTerminate';
971+
});
972+
testDisposables.push(coreApi.registerHook('appTerminate', terminateFunc));
973+
974+
stream.addTestMessage(WorkerTerminateMsg.workerTerminate());
975+
976+
await stream.assertCalledWith(
977+
WorkerTerminateMsg.receivedWorkerTerminateLog,
978+
AppStartMsg.executingHooksLog(1, 'appTerminate'),
979+
AppStartMsg.executedHooksLog('appTerminate')
980+
);
981+
expect(terminateFunc.callCount).to.be.equal(1);
982+
expect(hookData).to.equal('preInvocpostInvocappTerminate');
983+
});
984+
927985
it('hookData changes from appStart hooks are not persisted in invocation hook contexts', async () => {
928986
const functionAppDirectory = __dirname;
929987
const startFunc = sinon.spy((context: coreTypes.AppStartContext) => {
@@ -982,6 +1040,61 @@ describe('InvocationHandler', () => {
9821040
expect(hookData).to.equal('appStartpreInvocpostInvoc');
9831041
});
9841042

1043+
it('hookData changes from invocation hooks are not persisted in app terminate contexts', async () => {
1044+
const expectedAppHookData = {
1045+
hello: 'world',
1046+
test: {
1047+
test2: 3,
1048+
},
1049+
};
1050+
1051+
loader.getFunction.returns({ callback: async () => {}, metadata: Binding.queue });
1052+
1053+
testDisposables.push(
1054+
coreApi.registerHook('preInvocation', (context: coreTypes.PreInvocationContext) => {
1055+
Object.assign(context.hookData, expectedAppHookData);
1056+
expect(context.appHookData).to.be.empty;
1057+
hookData += 'preInvoc';
1058+
})
1059+
);
1060+
1061+
testDisposables.push(
1062+
coreApi.registerHook('postInvocation', (context: coreTypes.PostInvocationContext) => {
1063+
expect(context.hookData).to.deep.equal(expectedAppHookData);
1064+
expect(context.appHookData).to.be.empty;
1065+
hookData += 'postInvoc';
1066+
})
1067+
);
1068+
1069+
sendInvokeMessage([InputData.http]);
1070+
await stream.assertCalledWith(
1071+
Msg.receivedInvocLog(),
1072+
Msg.executingHooksLog(1, 'preInvocation'),
1073+
Msg.executedHooksLog('preInvocation'),
1074+
Msg.executingHooksLog(1, 'postInvocation'),
1075+
Msg.executedHooksLog('postInvocation'),
1076+
Msg.invocResponse([])
1077+
);
1078+
1079+
const terminateFunc = sinon.spy((context: coreTypes.AppTerminateContext) => {
1080+
expect(context.appHookData).to.be.empty;
1081+
expect(context.hookData).to.be.empty;
1082+
hookData += 'appTerminate';
1083+
});
1084+
testDisposables.push(coreApi.registerHook('appTerminate', terminateFunc));
1085+
1086+
stream.addTestMessage(WorkerTerminateMsg.workerTerminate());
1087+
1088+
await stream.assertCalledWith(
1089+
WorkerTerminateMsg.receivedWorkerTerminateLog,
1090+
AppStartMsg.executingHooksLog(1, 'appTerminate'),
1091+
AppStartMsg.executedHooksLog('appTerminate')
1092+
);
1093+
1094+
expect(terminateFunc.callCount).to.be.equal(1);
1095+
expect(hookData).to.equal('preInvocpostInvocappTerminate');
1096+
});
1097+
9851098
it('appHookData changes are persisted between invocation-level hooks', async () => {
9861099
const expectedAppHookData = {
9871100
hello: 'world',

test/eventHandlers/WorkerInitHandler.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export namespace Msg {
4545
UseNullableValueDictionaryForHttp: 'true',
4646
WorkerStatus: 'true',
4747
TypedDataCollection: 'true',
48+
HandlesWorkerTerminateMessage: 'true',
4849
},
4950
result: {
5051
status: rpc.StatusResult.Status.Success,

0 commit comments

Comments
 (0)