Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,33 @@ describe('PlatformServicesClient', () => {
});
});

describe('closeConnection', () => {
it('sends closeConnection request and resolves', async () => {
const result = client.closeConnection('peer-123');
await delay(10);
await stream.receiveInput(makeNullReply('m1'));
expect(await result).toBeUndefined();
});
});

describe('reconnectPeer', () => {
it('sends reconnectPeer request with hints and resolves', async () => {
const result = client.reconnectPeer('peer-456', [
'/dns4/relay.example/tcp/443/wss/p2p/relayPeer',
]);
await delay(10);
await stream.receiveInput(makeNullReply('m1'));
expect(await result).toBeUndefined();
});

it('sends reconnectPeer request with empty hints and resolves', async () => {
const result = client.reconnectPeer('peer-789');
await delay(10);
await stream.receiveInput(makeNullReply('m1'));
expect(await result).toBeUndefined();
});
});

describe('remoteDeliver', () => {
it('throws error when handler not set', async () => {
// Client without initialized remote comms
Expand Down
87 changes: 87 additions & 0 deletions packages/kernel-browser-runtime/src/PlatformServicesClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ export class PlatformServicesClient implements PlatformServices {
return new PlatformServicesClient(stream, logger);
}

/**
* Launch a new worker with a specific vat id.
*
* @param vatId - The vat id of the worker to launch.
* @param vatConfig - The configuration for the worker.
* @returns A promise for a duplex stream connected to the worker
* which rejects if a worker with the given vat id already exists.
*/
Comment on lines +120 to +127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding these doc comments. It's interesting that our lint rules demand jsdoc comments on functions but not on methods. I wonder if we can change that (or did it actually change and that's what motivated these?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We follow the MetaMask eslint rule which doesn't demand jsdoc comments on methods. We certainly can enable it though

async launch(
vatId: VatId,
vatConfig: VatConfig,
Expand All @@ -138,14 +146,37 @@ export class PlatformServicesClient implements PlatformServices {
);
}

/**
* Terminate a worker identified by its vat id.
*
* @param vatId - The vat id of the worker to terminate.
* @returns A promise that resolves when the worker has terminated
* or rejects if that worker does not exist.
*/
async terminate(vatId: VatId): Promise<void> {
await this.#rpcClient.call('terminate', { vatId });
}

/**
* Terminate all workers managed by the service.
*
* @returns A promise that resolves after all workers have terminated
* or rejects if there was an error during termination.
*/
async terminateAll(): Promise<void> {
await this.#rpcClient.call('terminateAll', []);
}

/**
* Initialize network communications.
*
* @param keySeed - The seed for generating this kernel's secret key.
* @param knownRelays - Array of the peerIDs of relay nodes that can be used to listen for incoming
* connections from other kernels.
* @param remoteMessageHandler - A handler function to receive remote messages.
* @returns A promise that resolves once network access has been established
* or rejects if there is some problem doing so.
*/
async initializeRemoteComms(
keySeed: string,
knownRelays: string[],
Expand All @@ -158,10 +189,24 @@ export class PlatformServicesClient implements PlatformServices {
});
}

/**
* Stop network communications.
*
* @returns A promise that resolves when network access has been stopped
* or rejects if there is some problem doing so.
*/
async stopRemoteComms(): Promise<void> {
await this.#rpcClient.call('stopRemoteComms', []);
}

/**
* Send a remote message to a peer.
*
* @param to - The peer ID to send the message to.
* @param message - The message to send.
* @param hints - Optional hints for the message.
* @returns A promise that resolves when the message has been sent.
*/
async sendRemoteMessage(
to: string,
message: string,
Expand All @@ -170,20 +215,62 @@ export class PlatformServicesClient implements PlatformServices {
await this.#rpcClient.call('sendRemoteMessage', { to, message, hints });
}

/**
* Explicitly close a connection to a peer.
* Marks the peer as intentionally closed to prevent automatic reconnection.
*
* @param peerId - The peer ID to close the connection for.
* @returns A promise that resolves when the connection is closed.
*/
async closeConnection(peerId: string): Promise<void> {
await this.#rpcClient.call('closeConnection', { peerId });
}

/**
* Manually reconnect to a peer after intentional close.
* Clears the intentional close flag and initiates reconnection.
*
* @param peerId - The peer ID to reconnect to.
* @param hints - Optional hints for reconnection.
* @returns A promise that resolves when reconnection is initiated.
*/
async reconnectPeer(peerId: string, hints: string[] = []): Promise<void> {
await this.#rpcClient.call('reconnectPeer', { peerId, hints });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it considered reconnection (as one would do after a network outage) or is it just a new connection to the same endpoint as before? In particular, do we expect clist entries to survive a trip through manual disconnect/manual reconnect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is a reconnection not a new connection. It reuses the existing RemoteHandle and remote endpoint. Clist entries and other endpoint state persist across the disconnect/reconnect cycle.


/**
* Handle a remote message from a peer.
*
* @param from - The peer ID that sent the message.
* @param message - The message received.
* @returns A promise that resolves with the reply message, or an empty string if no reply is needed.
*/
async #remoteDeliver(from: string, message: string): Promise<string> {
if (this.#remoteMessageHandler) {
return await this.#remoteMessageHandler(from, message);
}
throw Error(`remote message handler not set`);
}

/**
* Send a message to the server.
*
* @param payload - The message to send.
* @returns A promise that resolves when the message has been sent.
*/
async #sendMessage(payload: JsonRpcMessage): Promise<void> {
await this.#stream.write({
payload,
transfer: [],
});
}

/**
* Handle a message from the server.
*
* @param event - The message event.
* @returns A promise that resolves when the message has been sent.
*/
async #handleMessage(event: MessageEvent<JsonRpcMessage>): Promise<void> {
if (isJsonRpcResponse(event.data)) {
const { id } = event.data;
Expand Down
119 changes: 119 additions & 0 deletions packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import type {
// Mock initNetwork from ocap-kernel
const mockSendRemoteMessage = vi.fn(async () => undefined);
const mockStop = vi.fn(async () => undefined);
const mockCloseConnection = vi.fn(async () => undefined);
const mockReconnectPeer = vi.fn(async () => undefined);

vi.mock('@metamask/ocap-kernel', () => ({
PlatformServicesCommandMethod: {
Expand All @@ -31,6 +33,8 @@ vi.mock('@metamask/ocap-kernel', () => ({
initNetwork: vi.fn(async () => ({
sendRemoteMessage: mockSendRemoteMessage,
stop: mockStop,
closeConnection: mockCloseConnection,
reconnectPeer: mockReconnectPeer,
})),
}));

Expand Down Expand Up @@ -100,6 +104,25 @@ const makeStopRemoteCommsMessageEvent = (
params: [],
});

const makeCloseConnectionMessageEvent = (
messageId: `m${number}`,
peerId: string,
): MessageEvent =>
makeMessageEvent(messageId, {
method: 'closeConnection',
params: { peerId },
});

const makeReconnectPeerMessageEvent = (
messageId: `m${number}`,
peerId: string,
hints: string[] = [],
): MessageEvent =>
makeMessageEvent(messageId, {
method: 'reconnectPeer',
params: { peerId, hints },
});

describe('PlatformServicesServer', () => {
let cleanup: (() => Promise<void>)[] = [];

Expand Down Expand Up @@ -314,6 +337,8 @@ describe('PlatformServicesServer', () => {
// Reset mocks before each test
mockSendRemoteMessage.mockClear();
mockStop.mockClear();
mockCloseConnection.mockClear();
mockReconnectPeer.mockClear();
});

describe('initializeRemoteComms', () => {
Expand Down Expand Up @@ -454,6 +479,100 @@ describe('PlatformServicesServer', () => {
);
});
});

describe('closeConnection', () => {
it('closes connection via network layer', async () => {
// First initialize remote comms
await stream.receiveInput(
makeInitializeRemoteCommsMessageEvent('m0', '0xabcd', [
'/dns4/relay.example/tcp/443/wss/p2p/relayPeer',
]),
);
await delay(10);

// Now close connection
await stream.receiveInput(
makeCloseConnectionMessageEvent('m1', 'peer-123'),
);
await delay(10);

expect(mockCloseConnection).toHaveBeenCalledWith('peer-123');
});

it('throws error if remote comms not initialized', async () => {
const errorSpy = vi.spyOn(logger, 'error');

await stream.receiveInput(
makeCloseConnectionMessageEvent('m0', 'peer-456'),
);
await delay(10);

expect(errorSpy).toHaveBeenCalledWith(
'Error handling "closeConnection" request:',
expect.objectContaining({
message: 'remote comms not initialized',
}),
);
});
});

describe('reconnectPeer', () => {
it('reconnects peer via network layer', async () => {
// First initialize remote comms
await stream.receiveInput(
makeInitializeRemoteCommsMessageEvent('m0', '0xabcd', [
'/dns4/relay.example/tcp/443/wss/p2p/relayPeer',
]),
);
await delay(10);

// Now reconnect peer
await stream.receiveInput(
makeReconnectPeerMessageEvent('m1', 'peer-456', [
'/dns4/relay.example/tcp/443/wss/p2p/relayPeer',
]),
);
await delay(10);

expect(mockReconnectPeer).toHaveBeenCalledWith('peer-456', [
'/dns4/relay.example/tcp/443/wss/p2p/relayPeer',
]);
});

it('reconnects peer with empty hints', async () => {
// First initialize remote comms
await stream.receiveInput(
makeInitializeRemoteCommsMessageEvent('m0', '0xabcd', [
'/dns4/relay.example/tcp/443/wss/p2p/relayPeer',
]),
);
await delay(10);

// Now reconnect peer with empty hints
await stream.receiveInput(
makeReconnectPeerMessageEvent('m1', 'peer-789'),
);
await delay(10);

expect(mockReconnectPeer).toHaveBeenCalledWith('peer-789', []);
});

it('throws error if remote comms not initialized', async () => {
const errorSpy = vi.spyOn(logger, 'error');

await stream.receiveInput(
makeReconnectPeerMessageEvent('m0', 'peer-999'),
);
await delay(10);

expect(errorSpy).toHaveBeenCalledWith(
'Error handling "reconnectPeer" request:',
expect.objectContaining({
message: 'remote comms not initialized',
}),
);
});
});
});
});
});
Loading
Loading