Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/wet-dryers-matter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-server-sdk": patch
---

feat: auto failover APIs with LK Cloud
51 changes: 51 additions & 0 deletions .github/workflows/test-api.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# SPDX-FileCopyrightText: 2026 LiveKit, Inc.
#
# SPDX-License-Identifier: Apache-2.0

name: Test API

permissions:
contents: read

on:
workflow_dispatch:
push:
branches: [main]
pull_request:
branches: [main]

jobs:
failover:
runs-on: ubuntu-latest
services:
mock-server:
image: livekit/test-server:latest
ports:
- 9999:9999
- 10000:10000
- 10001:10001
- 10002:10002
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

- uses: pnpm/action-setup@b906affcce14559ad1aafd4ab0e942779e9f58b1 # v4.3.0

- name: Setup Node.js
uses: actions/setup-node@53b83947a5a98c8d113130e565377fae1a50d02f # v6.3.0
with:
node-version: 24
cache: pnpm

- name: Install dependencies
run: pnpm install

- name: Wait for mock server
run: |
for i in $(seq 1 30); do
curl -sf http://127.0.0.1:9999/settings/regions >/dev/null && exit 0
sleep 1
done
echo "mock server did not become ready" && exit 1

- name: Run API tests
run: pnpm --filter="livekit-server-sdk" exec vitest --environment node run test/api
1 change: 0 additions & 1 deletion packages/livekit-server-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
"dependencies": {
"@bufbuild/protobuf": "^1.10.1",
"@livekit/protocol": "^1.46.6",
"camelcase-keys": "^9.0.0",
"jose": "^5.1.2"
},
"devDependencies": {
Expand Down
8 changes: 4 additions & 4 deletions packages/livekit-server-sdk/src/AgentDispatchClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ export class AgentDispatchClient extends ServiceBase {
*/
constructor(host: string, apiKey?: string, secret?: string, options?: ClientOptions) {
super(apiKey, secret);
const rpcOptions = options?.requestTimeout
? { requestTimeout: options.requestTimeout }
: undefined;
this.rpc = new TwirpRpc(host, livekitPackage, rpcOptions);
this.rpc = new TwirpRpc(host, livekitPackage, {
requestTimeout: options?.requestTimeout,
failover: options?.failover,
});
}

/**
Expand Down
5 changes: 5 additions & 0 deletions packages/livekit-server-sdk/src/ClientOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ export type ClientOptions = {
* Optional timeout, in seconds, for all server requests
*/
requestTimeout?: number;
/**
* Whether to fail over to alternative regions on retryable errors (LiveKit
* Cloud hosts only). Defaults to true; set to false to disable.
*/
failover?: boolean;
};
8 changes: 4 additions & 4 deletions packages/livekit-server-sdk/src/ConnectorClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ export class ConnectorClient extends ServiceBase {
*/
constructor(host: string, apiKey?: string, secret?: string, options?: ClientOptions) {
super(apiKey, secret);
const rpcOptions = options?.requestTimeout
? { requestTimeout: options.requestTimeout }
: undefined;
this.rpc = new TwirpRpc(host, livekitPackage, rpcOptions);
this.rpc = new TwirpRpc(host, livekitPackage, {
requestTimeout: options?.requestTimeout,
failover: options?.failover,
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/livekit-server-sdk/src/EgressClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ export class EgressClient extends ServiceBase {
*/
constructor(host: string, apiKey?: string, secret?: string, options?: ClientOptions) {
super(apiKey, secret);
const rpcOptions = options?.requestTimeout
? { requestTimeout: options.requestTimeout }
: undefined;
this.rpc = new TwirpRpc(host, livekitPackage, rpcOptions);
this.rpc = new TwirpRpc(host, livekitPackage, {
requestTimeout: options?.requestTimeout,
failover: options?.failover,
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/livekit-server-sdk/src/IngressClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ export class IngressClient extends ServiceBase {
*/
constructor(host: string, apiKey?: string, secret?: string, options?: ClientOptions) {
super(apiKey, secret);
const rpcOptions = options?.requestTimeout
? { requestTimeout: options.requestTimeout }
: undefined;
this.rpc = new TwirpRpc(host, livekitPackage, rpcOptions);
this.rpc = new TwirpRpc(host, livekitPackage, {
requestTimeout: options?.requestTimeout,
failover: options?.failover,
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/livekit-server-sdk/src/RoomServiceClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ export class RoomServiceClient extends ServiceBase {
*/
constructor(host: string, apiKey?: string, secret?: string, options?: ClientOptions) {
super(apiKey, secret);
const rpcOptions = options?.requestTimeout
? { requestTimeout: options.requestTimeout }
: undefined;
this.rpc = new TwirpRpc(host, livekitPackage, rpcOptions);
this.rpc = new TwirpRpc(host, livekitPackage, {
requestTimeout: options?.requestTimeout,
failover: options?.failover,
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions packages/livekit-server-sdk/src/SipClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ export class SipClient extends ServiceBase {
*/
constructor(host: string, apiKey?: string, secret?: string, options?: ClientOptions) {
super(apiKey, secret);
const rpcOptions = options?.requestTimeout
? { requestTimeout: options.requestTimeout }
: undefined;
this.rpc = new TwirpRpc(host, livekitPackage, rpcOptions);
this.rpc = new TwirpRpc(host, livekitPackage, {
requestTimeout: options?.requestTimeout,
failover: options?.failover,
});
}

/**
Expand Down
151 changes: 113 additions & 38 deletions packages/livekit-server-sdk/src/TwirpRPC.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
//
// SPDX-License-Identifier: Apache-2.0
import type { JsonValue } from '@bufbuild/protobuf';
import {
FAILOVER_BACKOFF_BASE_MS,
failoverAttempts,
hostKey,
pickNext,
regionOrigins,
sleep,
} from './failover.js';

// twirp RPC adapter for client implementation

Expand All @@ -10,6 +18,12 @@ type Options = {
prefix?: string;
/** Timeout for fetch requests, in seconds. Must be within the valid range for abort signal timeouts. */
requestTimeout?: number;
/** Whether region failover is enabled (LiveKit Cloud hosts only). Defaults to true. */
failover?: boolean;
/** @internal test-only: force failover regardless of host. */
failoverForce?: boolean;
/** @internal test-only: base retry backoff in ms. */
failoverBackoffMs?: number;
};

const defaultPrefix = '/twirp';
Expand Down Expand Up @@ -58,6 +72,12 @@ export class TwirpRpc {

requestTimeout: number;

failover: boolean;

private failoverForce: boolean;

private failoverBackoffMs: number;

constructor(host: string, pkg: string, options?: Options) {
if (host.startsWith('ws')) {
host = host.replace('ws', 'http');
Expand All @@ -66,8 +86,18 @@ export class TwirpRpc {
this.pkg = pkg;
this.requestTimeout = options?.requestTimeout ?? defaultTimeoutSeconds;
this.prefix = options?.prefix || defaultPrefix;
this.failover = options?.failover ?? true;
this.failoverForce = options?.failoverForce ?? false;
this.failoverBackoffMs = options?.failoverBackoffMs ?? FAILOVER_BACKOFF_BASE_MS;
}

/**
* Issues a Twirp request, failing over to alternative regions on retryable
* errors. On any transport error or HTTP 5xx it discovers regions via
* /settings/regions and replays the request — body and headers intact —
* against the next untried region, with exponential backoff. A 4xx is
* returned immediately.
*/
async request(
service: string,
method: string,
Expand All @@ -77,52 +107,97 @@ export class TwirpRpc {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
): Promise<any> {
const path = `${this.prefix}/${this.pkg}.${service}/${method}`;
const url = new URL(path, this.host);
const init: RequestInit = {
method: 'POST',
headers: {
'Content-Type': 'application/json;charset=UTF-8',
...headers,
},
body: JSON.stringify(data),
const body = JSON.stringify(data);
const requestHeaders = {
'Content-Type': 'application/json;charset=UTF-8',
...headers,
};

if (timeout) {
init.signal = AbortSignal.timeout(timeout * 1000);
}

const response = await fetch(url, init);
const origin = new URL(this.host);
const maxAttempts = failoverAttempts(this.failover, origin.hostname, this.failoverForce);
const attempted = new Set([hostKey(origin)]);
let regions: string[] | undefined;
let current = this.host;

for (let attempt = 0; attempt < maxAttempts; attempt += 1) {
const isLast = attempt + 1 >= maxAttempts;
const init: RequestInit = { method: 'POST', headers: requestHeaders, body };
if (timeout) {
init.signal = AbortSignal.timeout(timeout * 1000);
}

if (!response.ok) {
const isJson = response.headers.get('content-type') === 'application/json';
let errorMessage = 'Unknown internal error';
let errorCode: string | undefined = undefined;
let metadata: Record<string, string> | undefined = undefined;
let response: Response | undefined;
let transportError: unknown;
try {
if (isJson) {
const parsedError = (await response.json()) as Record<string, unknown>;
if ('msg' in parsedError) {
errorMessage = <string>parsedError.msg;
}
if ('code' in parsedError) {
errorCode = <string>parsedError.code;
}
if ('meta' in parsedError) {
metadata = <Record<string, string>>parsedError.meta;
}
} else {
errorMessage = await response.text();
}
response = await fetch(new URL(path, current), init);
} catch (e) {
// parsing went wrong, no op and we keep default error message
console.debug(`Error when trying to parse error message, using defaults`, e);
transportError = e;
}

if (response?.ok) {
// Return the raw JSON. Every caller parses it with protobuf-es
// fromJson(), which per the proto3 JSON spec accepts both the proto
// field names (snake_case) and their json_name (camelCase), so no key
// conversion is needed. Converting keys would also corrupt map<string,…>
// entries (e.g. participant attributes), whose keys are user data.
return (await response.json()) as Record<string, unknown>;
}

// Only retryable failures (a transport error or HTTP 5xx) continue;
// a 4xx is terminal.
const retryable = transportError !== undefined || (!!response && response.status >= 500);
Comment thread
davidzhao marked this conversation as resolved.
let next: string | undefined;
if (retryable && !isLast) {
if (!regions) {
regions = await regionOrigins(origin, headers);
}
next = pickNext(regions, attempted);
}

if (!retryable || next === undefined) {
if (response) {
throw await toTwirpError(response);
}
throw transportError;
}

throw new TwirpError(response.statusText, errorMessage, response.status, errorCode, metadata);
const reason = response ? `status ${response.status}` : transportError;
console.warn(
`livekit API request to ${new URL(current).host} failed (${reason}), retrying with fallback url ${next}`,
);
await sleep(this.failoverBackoffMs * 2 ** attempt);
attempted.add(hostKey(new URL(next)));
current = next;
}
const parsedResp = (await response.json()) as Record<string, unknown>;

const camelcaseKeys = await import('camelcase-keys').then((mod) => mod.default);
return camelcaseKeys(parsedResp, { deep: true });
throw new Error('failover loop exited without returning'); // unreachable
}
}

/** Builds a TwirpError from a non-2xx response, mirroring Twirp's JSON error shape. */
async function toTwirpError(response: Response): Promise<TwirpError> {
const isJson = response.headers.get('content-type') === 'application/json';
let errorMessage = 'Unknown internal error';
let errorCode: string | undefined = undefined;
let metadata: Record<string, string> | undefined = undefined;
try {
if (isJson) {
const parsedError = (await response.json()) as Record<string, unknown>;
if ('msg' in parsedError) {
errorMessage = <string>parsedError.msg;
}
if ('code' in parsedError) {
errorCode = <string>parsedError.code;
}
if ('meta' in parsedError) {
metadata = <Record<string, string>>parsedError.meta;
}
} else {
errorMessage = await response.text();
}
} catch (e) {
// parsing went wrong, no op and we keep default error message
console.debug(`Error when trying to parse error message, using defaults`, e);
}
return new TwirpError(response.statusText, errorMessage, response.status, errorCode, metadata);
}
Loading
Loading