Skip to content

runners: Add scaleCycle lambda to reuse runners #6892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import {
ScaleUpChronMetrics,
sendMetricsAtTimeout,
sendMetricsTimeoutVars,
ScaleCycleMetrics,
} from './scale-runners/metrics';
import { getDelayWithJitterRetryCount, stochaticRunOvershoot } from './scale-runners/utils';
import { scaleDown as scaleDownR } from './scale-runners/scale-down';
import { scaleUpChron as scaleUpChronR } from './scale-runners/scale-up-chron';
import { sqsSendMessages, sqsDeleteMessageBatch } from './scale-runners/sqs';
import { scaleCycle as scaleCycleR } from './scale-runners/scale-cycle';

async function sendRetryEvents(evtFailed: Array<[SQSRecord, boolean, number]>, metrics: ScaleUpMetrics) {
console.error(`Detected ${evtFailed.length} errors when processing messages, will retry relevant messages.`);
Expand Down Expand Up @@ -202,3 +204,38 @@ export async function scaleUpChron(event: ScheduledEvent, context: Context, call
}
callback(callbackOutput);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export async function scaleCycle(event: ScheduledEvent, context: Context, callback: any) {
// we mantain open connections to redis, so the event pool is only cleaned when the SIGTERM is sent
context.callbackWaitsForEmptyEventLoop = false;

const metrics = new ScaleCycleMetrics();
const sndMetricsTimout: sendMetricsTimeoutVars = {
metrics: metrics,
};
sndMetricsTimout.setTimeout = setTimeout(
sendMetricsAtTimeout(sndMetricsTimout),
(Config.Instance.lambdaTimeout - 10) * 1000,
);

let callbackOutput: string | null = null;

try {
await scaleCycleR(metrics);
} catch (e) {
console.error(e);
callbackOutput = `Failed to scale cycle: ${e}`;
} finally {
try {
clearTimeout(sndMetricsTimout.setTimeout);
sndMetricsTimout.metrics = undefined;
sndMetricsTimout.setTimeout = undefined;
await metrics.sendMetrics();
} catch (e) {
callbackOutput = `Error sending metrics: ${e}`;
}
}

callback(callbackOutput);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1813,3 +1813,30 @@ export function sendMetricsAtTimeout(metricsTimeouts: sendMetricsTimeoutVars) {
}
};
}

export class ScaleCycleMetrics extends ScaleUpMetrics {
constructor() {
super('scaleCycle');
}

scaleCycleRunnerReuseFound(runnerType: string) {
const dimensions = new Map([['RunnerType', runnerType]]);
this.countEntry('run.scaleCycle.runnerReuse.found', 1, dimensions);
}

scaleCycleRunnerReuseFoundOrg(org: string, runnerType: string) {
const dimensions = new Map([
['Org', org],
['RunnerType', runnerType],
]);
this.countEntry('run.scaleCycle.runnerReuse.found.org', 1, dimensions);
}

scaleCycleRunnerReuseFoundRepo(repo: string, runnerType: string) {
const dimensions = new Map([
['Repo', repo],
['RunnerType', runnerType],
]);
this.countEntry('run.scaleCycle.runnerReuse.found.repo', 1, dimensions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Config } from './config';
import { listRunners, RunnerInputParameters, tryReuseRunner } from './runners';
import { getRepo, getRepoKey } from './utils';
import { ScaleCycleMetrics } from './metrics';
import { getRunnerTypes } from './gh-runners';
import { createRunnerConfigArgument } from './scale-up';

export async function scaleCycle(metrics: ScaleCycleMetrics) {
// Get runner types configuration first
const scaleConfigRepo = getRepo(Config.Instance.scaleConfigOrg, Config.Instance.scaleConfigRepo);
const runnerTypes = await getRunnerTypes(scaleConfigRepo, metrics);

// Get all valid runner type names for filtering
const validRunnerTypeNames = Array.from(runnerTypes.keys());

// Make separate calls for each runner type to filter at EC2 level
const allRunners = await Promise.all(
validRunnerTypeNames.map((runnerTypeName) =>
listRunners(metrics, {
containsTags: ['GithubRunnerID', 'EphemeralRunnerFinished', 'RunnerType'],
runnerType: runnerTypeName,
}),
),
);

// Flatten the results
const runners = allRunners.flat();

for (const runner of runners) {
// Skip if required fields are missing (org/repo still need to be checked)
if (!runner.runnerType || !runner.org || !runner.repo) {
console.warn(`Skipping runner ${runner.instanceId} due to missing required tags`);
continue;
}

// Get the RunnerType object from the string (we know it exists since we filtered by it)
const runnerType = runnerTypes.get(runner.runnerType);
if (!runnerType) {
console.warn(`Unknown runner type: ${runner.runnerType}, skipping`);
continue;
}

// Create repo object
const repo = getRepo(runner.org, runner.repo);

// For each runner send an EBS volume replacement task
const runnerInputParameters: RunnerInputParameters = {
runnerConfig: (awsRegion: string, experimentalRunner: boolean) => {
return createRunnerConfigArgument(
runnerType,
repo,
// NOTE: installationId can actually be undefined here but this may incur lower rate limits
// TODO: figure out if we need to pass an actual installationId here
undefined,
metrics,
awsRegion,
experimentalRunner,
);
},
environment: Config.Instance.environment,
runnerType: runnerType,
};

// Set orgName or repoName based on configuration
if (Config.Instance.enableOrganizationRunners) {
runnerInputParameters.orgName = runner.org;
metrics.scaleCycleRunnerReuseFoundOrg(runner.org, runner.runnerType);
console.info(`Reusing runner ${runner.instanceId} for ${runner.org}`);
} else {
runnerInputParameters.repoName = getRepoKey(repo);
metrics.scaleCycleRunnerReuseFoundRepo(getRepoKey(repo), runner.runnerType);
console.info(`Reusing runner ${runner.instanceId} for ${getRepoKey(repo)}`);
}

await tryReuseRunner(runnerInputParameters, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a little too high level of a function to be invoking here.

In this loop you've already identified a candidate list of runners that are probably ready to be scaled down.

tryReuseRunner however looses context on what those candidates were and on each loop it'll starts from scratch again to find a potential runner it can scale down, starting with asking EC2 to list all runners of this runner type.

What do you think about adding a refactor to do something like the following:

  1. Extract the logic the logic to determine if a runner is safe to be shut down into one function, use that to determine the exact list of runners we want to refresh here
  2. Extract the logic to do the scale down (plus validations) into a separate function that takes in an explicit runner id. Pass the runners you find here into that list

Goal is to not end up looping over the same instances over and over again during a single ScaleCycle invocation

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ export async function scaleUp(
}
}

async function createRunnerConfigArgument(
export async function createRunnerConfigArgument(
runnerType: RunnerType,
repo: Repo,
installationId: number | undefined,
Expand Down
Loading