Skip to content

Commit ad99f5d

Browse files
authored
Add app-level startup hooks (#577)
* add startup and teardown hook support * add initial types * move hook data handling to wokrer channel * add app startup hooks to worker init handler * run new script file on functionEnvironmentReloadRequest * add logger to base hook * refactor * update comment * create copies of hook data for invocation contexts * don't copy for app startup hook data + add function app directory * function environment reload logic * implement logger interface in hook context * add host version to startup hooks * fix tests * rename app startup function * remove teardown hooks * proper message category and functionInvocationId in hook logs * make sure it compiles * remove logger * refactor app startup code * don't reset the hook data * simplify copying hook data * add specialization test for environment reload handler * add test for loading entry point in specialization scenario * add test for running app startup hook in non-specialization scenario * assert called with right context * add test for specialization scenario * add test for persisting hook data from startup hooks * add comment * allow functionAppDirectory to be undefined in workerInitRequest * rename to appStart * one more file rename * add appHookData and hookData distinction + tests * fix tests * remove outdated comment * non null hostVersion and remove log warning * fix tests * old hookData behavior * address PR comments
1 parent ed6224f commit ad99f5d

10 files changed

+597
-21
lines changed

src/WorkerChannel.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the MIT License.
33

4-
import { HookCallback, HookContext } from '@azure/functions-core';
4+
import { HookCallback, HookContext, HookData } from '@azure/functions-core';
55
import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-worker-protobuf/src/rpc';
66
import { Disposable } from './Disposable';
77
import { IFunctionLoader } from './FunctionLoader';
@@ -15,8 +15,21 @@ export class WorkerChannel {
1515
eventStream: IEventStream;
1616
functionLoader: IFunctionLoader;
1717
packageJson: PackageJson;
18+
/**
19+
* This will only be set after worker init request is received
20+
*/
21+
hostVersion?: string;
22+
/**
23+
* this hook data will be passed to (and set by) all hooks in all scopes
24+
*/
25+
appHookData: HookData = {};
26+
/**
27+
* this hook data is limited to the app-level scope and persisted only for app-level hooks
28+
*/
29+
appLevelOnlyHookData: HookData = {};
1830
#preInvocationHooks: HookCallback[] = [];
1931
#postInvocationHooks: HookCallback[] = [];
32+
#appStartHooks: HookCallback[] = [];
2033

2134
constructor(eventStream: IEventStream, functionLoader: IFunctionLoader) {
2235
this.eventStream = eventStream;
@@ -80,6 +93,8 @@ export class WorkerChannel {
8093
return this.#preInvocationHooks;
8194
case 'postInvocation':
8295
return this.#postInvocationHooks;
96+
case 'appStart':
97+
return this.#appStartHooks;
8398
default:
8499
throw new RangeError(`Unrecognized hook "${hookName}"`);
85100
}

src/eventHandlers/FunctionEnvironmentReloadHandler.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33

44
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
5+
import { startApp } from '../startApp';
56
import { WorkerChannel } from '../WorkerChannel';
67
import { EventHandler } from './EventHandler';
78
import LogCategory = rpc.RpcLog.RpcLogCategory;
@@ -35,6 +36,7 @@ export class FunctionEnvironmentReloadHandler extends EventHandler<
3536
});
3637

3738
process.env = Object.assign({}, msg.environmentVariables);
39+
3840
// Change current working directory
3941
if (msg.functionAppDirectory) {
4042
channel.log({
@@ -43,7 +45,7 @@ export class FunctionEnvironmentReloadHandler extends EventHandler<
4345
logCategory: LogCategory.System,
4446
});
4547
process.chdir(msg.functionAppDirectory);
46-
await channel.updatePackageJson(msg.functionAppDirectory);
48+
await startApp(msg.functionAppDirectory, channel);
4749
}
4850

4951
return response;

src/eventHandlers/InvocationHandler.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,18 @@ export class InvocationHandler extends EventHandler<'invocationRequest', 'invoca
9090
});
9191
});
9292

93-
const hookData: HookData = {};
9493
let userFunction = channel.functionLoader.getFunc(functionId);
94+
95+
const invocationHookData: HookData = {};
96+
9597
const preInvocContext: PreInvocationContext = {
96-
hookData,
98+
hookData: invocationHookData,
99+
appHookData: channel.appHookData,
97100
invocationContext: context,
98101
functionCallback: <AzureFunction>userFunction,
99102
inputs,
100103
};
101-
102-
await channel.executeHooks('preInvocation', preInvocContext, msg.invocationId, msgCategory);
104+
await channel.executeHooks('preInvocation', preInvocContext, invocationId, msgCategory);
103105
inputs = preInvocContext.inputs;
104106
userFunction = preInvocContext.functionCallback;
105107

@@ -117,7 +119,8 @@ export class InvocationHandler extends EventHandler<'invocationRequest', 'invoca
117119
}
118120

119121
const postInvocContext: PostInvocationContext = {
120-
hookData,
122+
hookData: invocationHookData,
123+
appHookData: channel.appHookData,
121124
invocationContext: context,
122125
inputs,
123126
result: null,

src/eventHandlers/WorkerInitHandler.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import { access, constants } from 'fs';
55
import * as path from 'path';
66
import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language-worker-protobuf/src/rpc';
7+
import { startApp } from '../startApp';
78
import { isError } from '../utils/ensureErrorType';
9+
import { nonNullProp } from '../utils/nonNull';
810
import { WorkerChannel } from '../WorkerChannel';
911
import { EventHandler } from './EventHandler';
1012
import LogCategory = rpc.RpcLog.RpcLogCategory;
@@ -30,9 +32,11 @@ export class WorkerInitHandler extends EventHandler<'workerInitRequest', 'worker
3032
});
3133

3234
logColdStartWarning(channel);
33-
const functionAppDirectory = msg.functionAppDirectory;
34-
if (functionAppDirectory) {
35-
await channel.updatePackageJson(functionAppDirectory);
35+
36+
channel.hostVersion = nonNullProp(msg, 'hostVersion');
37+
38+
if (msg.functionAppDirectory) {
39+
await startApp(msg.functionAppDirectory, channel);
3640
}
3741

3842
response.capabilities = {

src/startApp.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
import { AppStartContext } from '@azure/functions-core';
5+
import { pathExists } from 'fs-extra';
6+
import { AzureFunctionsRpcMessages as rpc } from '../azure-functions-language-worker-protobuf/src/rpc';
7+
import { loadScriptFile } from './loadScriptFile';
8+
import { ensureErrorType } from './utils/ensureErrorType';
9+
import { nonNullProp } from './utils/nonNull';
10+
import { WorkerChannel } from './WorkerChannel';
11+
import path = require('path');
12+
import LogLevel = rpc.RpcLog.Level;
13+
import LogCategory = rpc.RpcLog.RpcLogCategory;
14+
15+
/**
16+
* Starting an app can happen in two places, depending on if the worker was specialized or not
17+
* 1. The worker can start in "normal" mode, meaning `workerInitRequest` will reference the user's app
18+
* 2. The worker can start in "placeholder" mode, meaning `workerInitRequest` will reference a dummy app to "warm up" the worker and `functionEnvironmentReloadRequest` will be sent with the user's actual app.
19+
* This process is called worker specialization and it helps with cold start times.
20+
* The dummy app should never have actual startup code, so it should be safe to call `startApp` twice in this case
21+
* Worker specialization happens only once, so we don't need to worry about cleaning up resources from previous `functionEnvironmentReloadRequest`s.
22+
*/
23+
export async function startApp(functionAppDirectory: string, channel: WorkerChannel): Promise<void> {
24+
await channel.updatePackageJson(functionAppDirectory);
25+
await loadEntryPointFile(functionAppDirectory, channel);
26+
const appStartContext: AppStartContext = {
27+
hookData: channel.appLevelOnlyHookData,
28+
appHookData: channel.appHookData,
29+
functionAppDirectory,
30+
hostVersion: nonNullProp(channel, 'hostVersion'),
31+
};
32+
await channel.executeHooks('appStart', appStartContext);
33+
}
34+
35+
async function loadEntryPointFile(functionAppDirectory: string, channel: WorkerChannel): Promise<void> {
36+
const entryPointFile = channel.packageJson.main;
37+
if (entryPointFile) {
38+
channel.log({
39+
message: `Loading entry point "${entryPointFile}"`,
40+
level: LogLevel.Debug,
41+
logCategory: LogCategory.System,
42+
});
43+
try {
44+
const entryPointFullPath = path.join(functionAppDirectory, entryPointFile);
45+
if (!(await pathExists(entryPointFullPath))) {
46+
throw new Error(`file does not exist`);
47+
}
48+
49+
await loadScriptFile(entryPointFullPath, channel.packageJson);
50+
channel.log({
51+
message: `Loaded entry point "${entryPointFile}"`,
52+
level: LogLevel.Debug,
53+
logCategory: LogCategory.System,
54+
});
55+
} catch (err) {
56+
const error = ensureErrorType(err);
57+
error.isAzureFunctionsInternalException = true;
58+
error.message = `Worker was unable to load entry point "${entryPointFile}": ${error.message}`;
59+
throw error;
60+
}
61+
}
62+
}

test/eventHandlers/FunctionEnvironmentReloadHandler.test.ts

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ import { AzureFunctionsRpcMessages as rpc } from '../../azure-functions-language
88
import { WorkerChannel } from '../../src/WorkerChannel';
99
import { beforeEventHandlerSuite } from './beforeEventHandlerSuite';
1010
import { TestEventStream } from './TestEventStream';
11+
import { Msg as WorkerInitMsg } from './WorkerInitHandler.test';
1112
import path = require('path');
1213
import LogCategory = rpc.RpcLog.RpcLogCategory;
1314
import LogLevel = rpc.RpcLog.Level;
1415

15-
namespace Msg {
16+
export namespace Msg {
1617
export function reloadEnvVarsLog(numVars: number): rpc.IStreamingMessage {
1718
return {
1819
rpcLog: {
@@ -69,11 +70,14 @@ describe('FunctionEnvironmentReloadHandler', () => {
6970
let stream: TestEventStream;
7071
let channel: WorkerChannel;
7172

72-
// Reset `process.env` after this test suite so it doesn't affect other tests
73+
// Reset `process.env` and process.cwd() after this test suite so it doesn't affect other tests
7374
let originalEnv: NodeJS.ProcessEnv;
75+
let originalCwd: string;
7476
before(() => {
77+
originalCwd = process.cwd();
7578
originalEnv = process.env;
7679
({ stream, channel } = beforeEventHandlerSuite());
80+
channel.hostVersion = '2.7.0';
7781
});
7882

7983
after(() => {
@@ -82,6 +86,7 @@ describe('FunctionEnvironmentReloadHandler', () => {
8286

8387
afterEach(async () => {
8488
mock.restore();
89+
process.chdir(originalCwd);
8590
await stream.afterEachEventHandlerTest();
8691
});
8792

@@ -229,6 +234,83 @@ describe('FunctionEnvironmentReloadHandler', () => {
229234
});
230235
await stream.assertCalledWith(Msg.reloadEnvVarsLog(0), Msg.changingCwdLog(newDirAbsolute), Msg.reloadSuccess);
231236
expect(channel.packageJson).to.deep.equal(newPackageJson);
232-
process.chdir(cwd);
233237
});
238+
239+
it('correctly loads package.json in specialization scenario', async () => {
240+
const cwd = process.cwd();
241+
const tempDir = 'temp';
242+
const appDir = 'app';
243+
const packageJson = {
244+
type: 'module',
245+
hello: 'world',
246+
};
247+
248+
mock({
249+
[tempDir]: {},
250+
[appDir]: {
251+
'package.json': JSON.stringify(packageJson),
252+
},
253+
});
254+
255+
stream.addTestMessage(WorkerInitMsg.init(path.join(cwd, tempDir)));
256+
await stream.assertCalledWith(
257+
WorkerInitMsg.receivedInitLog,
258+
WorkerInitMsg.warning(`Worker failed to load package.json: file does not exist`),
259+
WorkerInitMsg.response
260+
);
261+
expect(channel.packageJson).to.be.empty;
262+
263+
stream.addTestMessage({
264+
requestId: 'id',
265+
functionEnvironmentReloadRequest: {
266+
functionAppDirectory: path.join(cwd, appDir),
267+
},
268+
});
269+
await stream.assertCalledWith(
270+
Msg.reloadEnvVarsLog(0),
271+
Msg.changingCwdLog(path.join(cwd, appDir)),
272+
Msg.reloadSuccess
273+
);
274+
expect(channel.packageJson).to.deep.equal(packageJson);
275+
});
276+
277+
for (const extension of ['.js', '.mjs', '.cjs']) {
278+
it(`Loads entry point (${extension}) in specialization scenario`, async () => {
279+
const cwd = process.cwd();
280+
const tempDir = 'temp';
281+
const fileName = `entryPointFiles/doNothing${extension}`;
282+
const expectedPackageJson = {
283+
main: fileName,
284+
};
285+
mock({
286+
[tempDir]: {},
287+
[__dirname]: {
288+
'package.json': JSON.stringify(expectedPackageJson),
289+
// 'require' and 'mockFs' don't play well together so we need these files in both the mock and real file systems
290+
entryPointFiles: mock.load(path.join(__dirname, 'entryPointFiles')),
291+
},
292+
});
293+
294+
stream.addTestMessage(WorkerInitMsg.init(path.join(cwd, tempDir)));
295+
await stream.assertCalledWith(
296+
WorkerInitMsg.receivedInitLog,
297+
WorkerInitMsg.warning('Worker failed to load package.json: file does not exist'),
298+
WorkerInitMsg.response
299+
);
300+
301+
stream.addTestMessage({
302+
requestId: 'id',
303+
functionEnvironmentReloadRequest: {
304+
functionAppDirectory: __dirname,
305+
},
306+
});
307+
await stream.assertCalledWith(
308+
Msg.reloadEnvVarsLog(0),
309+
Msg.changingCwdLog(__dirname),
310+
WorkerInitMsg.loadingEntryPoint(fileName),
311+
WorkerInitMsg.loadedEntryPoint(fileName),
312+
Msg.reloadSuccess
313+
);
314+
});
315+
}
234316
});

0 commit comments

Comments
 (0)