Skip to content

Commit 325dace

Browse files
toger5robintownhughns
authored
Fix local echo in embedded mode (#4498)
* fix local echo * dont use custome event emitter anymore * move logic into updateTxId * temp testing * use generic eventEmtitter names * add tests --------- Co-authored-by: Robin <[email protected]> Co-authored-by: Hugh Nimmo-Smith <[email protected]>
1 parent 5c894b3 commit 325dace

File tree

3 files changed

+213
-3
lines changed

3 files changed

+213
-3
lines changed

spec/unit/embedded.spec.ts

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
ITurnServer,
3131
IRoomEvent,
3232
IOpenIDCredentials,
33+
ISendEventFromWidgetResponseData,
3334
WidgetApiResponseError,
3435
} from "matrix-widget-api";
3536

@@ -40,6 +41,7 @@ import { ICapabilities, RoomWidgetClient } from "../../src/embedded";
4041
import { MatrixEvent } from "../../src/models/event";
4142
import { ToDeviceBatch } from "../../src/models/ToDeviceMessage";
4243
import { DeviceInfo } from "../../src/crypto/deviceinfo";
44+
import { sleep } from "../../src/utils";
4345

4446
const testOIDCToken = {
4547
access_token: "12345678",
@@ -127,9 +129,16 @@ describe("RoomWidgetClient", () => {
127129
const makeClient = async (
128130
capabilities: ICapabilities,
129131
sendContentLoaded: boolean | undefined = undefined,
132+
userId?: string,
130133
): Promise<void> => {
131134
const baseUrl = "https://example.org";
132-
client = createRoomWidgetClient(widgetApi, capabilities, "!1:example.org", { baseUrl }, sendContentLoaded);
135+
client = createRoomWidgetClient(
136+
widgetApi,
137+
capabilities,
138+
"!1:example.org",
139+
{ baseUrl, userId },
140+
sendContentLoaded,
141+
);
133142
expect(widgetApi.start).toHaveBeenCalled(); // needs to have been called early in order to not miss messages
134143
widgetApi.emit("ready");
135144
await client.startClient();
@@ -192,6 +201,142 @@ describe("RoomWidgetClient", () => {
192201
.map((e) => e.getEffectiveEvent()),
193202
).toEqual([event]);
194203
});
204+
describe("local echos", () => {
205+
const setupRemoteEcho = () => {
206+
makeClient(
207+
{
208+
receiveEvent: ["org.matrix.rageshake_request"],
209+
sendEvent: ["org.matrix.rageshake_request"],
210+
},
211+
undefined,
212+
"@me:example.org",
213+
);
214+
expect(widgetApi.requestCapabilityForRoomTimeline).toHaveBeenCalledWith("!1:example.org");
215+
expect(widgetApi.requestCapabilityToReceiveEvent).toHaveBeenCalledWith("org.matrix.rageshake_request");
216+
const injectSpy = jest.spyOn((client as any).syncApi, "injectRoomEvents");
217+
const widgetSendEmitter = new EventEmitter();
218+
const widgetSendPromise = new Promise<void>((resolve) =>
219+
widgetSendEmitter.once("send", () => resolve()),
220+
);
221+
const resolveWidgetSend = () => widgetSendEmitter.emit("send");
222+
widgetApi.sendRoomEvent.mockImplementation(
223+
async (eType, content, roomId): Promise<ISendEventFromWidgetResponseData> => {
224+
await widgetSendPromise;
225+
return { room_id: "!1:example.org", event_id: "event_id" };
226+
},
227+
);
228+
return { injectSpy, resolveWidgetSend };
229+
};
230+
const remoteEchoEvent = new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, {
231+
detail: {
232+
data: {
233+
type: "org.matrix.rageshake_request",
234+
235+
room_id: "!1:example.org",
236+
event_id: "event_id",
237+
sender: "@me:example.org",
238+
state_key: "bar",
239+
content: { hello: "world" },
240+
unsigned: { transaction_id: "1234" },
241+
},
242+
},
243+
cancelable: true,
244+
});
245+
it("get response then local echo", async () => {
246+
await sleep(600);
247+
const { injectSpy, resolveWidgetSend } = await setupRemoteEcho();
248+
249+
// Begin by sending an event:
250+
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
251+
// we do not expect it to be send -- until we call `resolveWidgetSend`
252+
expect(injectSpy).not.toHaveBeenCalled();
253+
254+
// We first get the response from the widget
255+
resolveWidgetSend();
256+
// We then get the remote echo from the widget
257+
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent);
258+
259+
// gets emitted after the event got injected
260+
await new Promise<void>((resolve) => client.once(ClientEvent.Event, () => resolve()));
261+
expect(injectSpy).toHaveBeenCalled();
262+
263+
const call = injectSpy.mock.calls[0] as any;
264+
const injectedEv = call[2][0];
265+
expect(injectedEv.getType()).toBe("org.matrix.rageshake_request");
266+
expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId");
267+
});
268+
269+
it("get local echo then response", async () => {
270+
await sleep(600);
271+
const { injectSpy, resolveWidgetSend } = await setupRemoteEcho();
272+
273+
// Begin by sending an event:
274+
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
275+
// we do not expect it to be send -- until we call `resolveWidgetSend`
276+
expect(injectSpy).not.toHaveBeenCalled();
277+
278+
// We first get the remote echo from the widget
279+
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent);
280+
expect(injectSpy).not.toHaveBeenCalled();
281+
282+
// We then get the response from the widget
283+
resolveWidgetSend();
284+
285+
// Gets emitted after the event got injected
286+
await new Promise<void>((resolve) => client.once(ClientEvent.Event, () => resolve()));
287+
expect(injectSpy).toHaveBeenCalled();
288+
289+
const call = injectSpy.mock.calls[0] as any;
290+
const injectedEv = call[2][0];
291+
expect(injectedEv.getType()).toBe("org.matrix.rageshake_request");
292+
expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId");
293+
});
294+
it("__ local echo then response", async () => {
295+
await sleep(600);
296+
const { injectSpy, resolveWidgetSend } = await setupRemoteEcho();
297+
298+
// Begin by sending an event:
299+
client.sendEvent("!1:example.org", "org.matrix.rageshake_request", { request_id: 12 }, "widgetTxId");
300+
// we do not expect it to be send -- until we call `resolveWidgetSend`
301+
expect(injectSpy).not.toHaveBeenCalled();
302+
303+
// We first get the remote echo from the widget
304+
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, remoteEchoEvent);
305+
const otherRemoteEcho = new CustomEvent(`action:${WidgetApiToWidgetAction.SendEvent}`, {
306+
detail: { data: { ...remoteEchoEvent.detail.data } },
307+
});
308+
otherRemoteEcho.detail.data.unsigned.transaction_id = "4567";
309+
otherRemoteEcho.detail.data.event_id = "other_id";
310+
widgetApi.emit(`action:${WidgetApiToWidgetAction.SendEvent}`, otherRemoteEcho);
311+
312+
// Simulate the wait time while the widget is waiting for a response
313+
// after we already received the remote echo
314+
await sleep(20);
315+
// even after the wait we do not want any event to be injected.
316+
// we do not know their event_id and cannot know if they are the remote echo
317+
// where we need to update the txId because they are send by this client
318+
expect(injectSpy).not.toHaveBeenCalled();
319+
// We then get the response from the widget
320+
resolveWidgetSend();
321+
322+
// Gets emitted after the event got injected
323+
await new Promise<void>((resolve) => client.once(ClientEvent.Event, () => resolve()));
324+
// Now we want both events to be injected since we know the txId - event_id match
325+
expect(injectSpy).toHaveBeenCalled();
326+
327+
// it has been called with the event sent by ourselves
328+
const call = injectSpy.mock.calls[0] as any;
329+
const injectedEv = call[2][0];
330+
expect(injectedEv.getType()).toBe("org.matrix.rageshake_request");
331+
expect(injectedEv.getUnsigned().transaction_id).toBe("widgetTxId");
332+
333+
// It has been called by the event we blocked because of our send right afterwards
334+
const call2 = injectSpy.mock.calls[1] as any;
335+
const injectedEv2 = call2[2][0];
336+
expect(injectedEv2.getType()).toBe("org.matrix.rageshake_request");
337+
expect(injectedEv2.getUnsigned().transaction_id).toBe("4567");
338+
});
339+
});
195340

196341
it("handles widget errors with generic error data", async () => {
197342
const error = new Error("failed to send");

src/embedded.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import { ToDeviceBatch, ToDevicePayload } from "./models/ToDeviceMessage.ts";
5757
import { DeviceInfo } from "./crypto/deviceinfo.ts";
5858
import { IOlmDevice } from "./crypto/algorithms/megolm.ts";
5959
import { MapWithDefault, recursiveMapToObject } from "./utils.ts";
60+
import { TypedEventEmitter } from "./matrix.ts";
6061

6162
interface IStateEventRequest {
6263
eventType: string;
@@ -123,6 +124,10 @@ export interface ICapabilities {
123124
updateDelayedEvents?: boolean;
124125
}
125126

127+
export enum RoomWidgetClientEvent {
128+
PendingEventsChanged = "PendingEvent.pendingEventsChanged",
129+
}
130+
export type EventHandlerMap = { [RoomWidgetClientEvent.PendingEventsChanged]: () => void };
126131
/**
127132
* A MatrixClient that routes its requests through the widget API instead of the
128133
* real CS API.
@@ -134,6 +139,9 @@ export class RoomWidgetClient extends MatrixClient {
134139
private lifecycle?: AbortController;
135140
private syncState: SyncState | null = null;
136141

142+
private pendingSendingEventsTxId: { type: string; id: string | undefined; txId: string }[] = [];
143+
private eventEmitter = new TypedEventEmitter<keyof EventHandlerMap, EventHandlerMap>();
144+
137145
/**
138146
*
139147
* @param widgetApi - The widget api to use for communication.
@@ -330,6 +338,8 @@ export class RoomWidgetClient extends MatrixClient {
330338
const content = event.event.redacts
331339
? { ...event.getContent(), redacts: event.event.redacts }
332340
: event.getContent();
341+
342+
// Delayed event special case.
333343
if (delayOpts) {
334344
// TODO: updatePendingEvent for delayed events?
335345
const response = await this.widgetApi.sendRoomEvent(
@@ -342,16 +352,26 @@ export class RoomWidgetClient extends MatrixClient {
342352
return this.validateSendDelayedEventResponse(response);
343353
}
344354

355+
const txId = event.getTxnId();
356+
// Add the txnId to the pending list (still with unknown evID)
357+
if (txId) this.pendingSendingEventsTxId.push({ type: event.getType(), id: undefined, txId });
358+
345359
let response: ISendEventFromWidgetResponseData;
346360
try {
347361
response = await this.widgetApi.sendRoomEvent(event.getType(), content, room.roomId);
348362
} catch (e) {
349363
this.updatePendingEventStatus(room, event, EventStatus.NOT_SENT);
350364
throw e;
351365
}
352-
353366
// This also checks for an event id on the response
354367
room.updatePendingEvent(event, EventStatus.SENT, response.event_id);
368+
369+
// Update the pending events list with the eventId
370+
this.pendingSendingEventsTxId.forEach((p) => {
371+
if (p.txId === txId) p.id = response.event_id;
372+
});
373+
this.eventEmitter.emit(RoomWidgetClientEvent.PendingEventsChanged);
374+
355375
return { event_id: response.event_id! };
356376
}
357377

@@ -495,13 +515,58 @@ export class RoomWidgetClient extends MatrixClient {
495515
await this.widgetApi.transport.reply<IWidgetApiAcknowledgeResponseData>(ev.detail, {});
496516
}
497517

518+
private updateTxId = async (event: MatrixEvent): Promise<void> => {
519+
// We update the txId for remote echos that originate from this client.
520+
// This happens with the help of `pendingSendingEventsTxId` where we store all events that are currently sending
521+
// with their widget txId and once ready the final evId.
522+
if (
523+
// This could theoretically be an event send by this device
524+
// In that case we need to update the txId of the event because the embedded client/widget
525+
// knows this event with a different transaction Id than what was used by the host client.
526+
event.getSender() === this.getUserId() &&
527+
// We optimize by not blocking events from types that we have not send
528+
// with this client.
529+
this.pendingSendingEventsTxId.some((p) => event.getType() === p.type)
530+
) {
531+
// Compare by event Id if we have a matching pending event where we know the txId.
532+
let matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId;
533+
// Block any further processing of this event until we have received the sending response.
534+
// -> until we know the event id.
535+
// -> until we have not pending events anymore.
536+
while (!matchingTxId && this.pendingSendingEventsTxId.length > 0) {
537+
// Recheck whenever the PendingEventsChanged
538+
await new Promise<void>((resolve) =>
539+
this.eventEmitter.once(RoomWidgetClientEvent.PendingEventsChanged, () => resolve()),
540+
);
541+
matchingTxId = this.pendingSendingEventsTxId.find((p) => p.id === event.getId())?.txId;
542+
}
543+
544+
// We found the correct txId: we update the event and delete the entry of the pending events.
545+
if (matchingTxId) {
546+
event.setTxnId(matchingTxId);
547+
event.setUnsigned({ ...event.getUnsigned(), transaction_id: matchingTxId });
548+
}
549+
this.pendingSendingEventsTxId = this.pendingSendingEventsTxId.filter((p) => p.id !== event.getId());
550+
551+
// Emit once there are no pending events anymore to release all other events that got
552+
// awaited in the `while (!matchingTxId && this.pendingSendingEventsTxId.length > 0)` loop
553+
// but are not send by this client.
554+
if (this.pendingSendingEventsTxId.length === 0) {
555+
this.eventEmitter.emit(RoomWidgetClientEvent.PendingEventsChanged);
556+
}
557+
}
558+
};
559+
498560
private onEvent = async (ev: CustomEvent<ISendEventToWidgetActionRequest>): Promise<void> => {
499561
ev.preventDefault();
500562

501563
// Verify the room ID matches, since it's possible for the client to
502564
// send us events from other rooms if this widget is always on screen
503565
if (ev.detail.data.room_id === this.roomId) {
504566
const event = new MatrixEvent(ev.detail.data as Partial<IEvent>);
567+
568+
// Only inject once we have update the txId
569+
await this.updateTxId(event);
505570
await this.syncApi!.injectRoomEvents(this.room!, [], [event]);
506571
this.emit(ClientEvent.Event, event);
507572
this.setSyncState(SyncState.Syncing);

src/matrixrtc/MatrixRTCSessionManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter<MatrixRTCSessionM
5656

5757
public start(): void {
5858
// We shouldn't need to null-check here, but matrix-client.spec.ts mocks getRooms
59-
// returing nothing, and breaks tests if you change it to return an empty array :'(
59+
// returning nothing, and breaks tests if you change it to return an empty array :'(
6060
for (const room of this.client.getRooms() ?? []) {
6161
const session = MatrixRTCSession.roomSessionForRoom(this.client, room);
6262
if (session.memberships.length > 0) {

0 commit comments

Comments
 (0)