Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
RecommendedOpusBitrates,
NamedMediaGroup,
} from '@webex/internal-media-core';
import {cloneDeepWith, debounce, isEmpty} from 'lodash';
import {cloneDeepWith, debounce, isEmpty, max, throttle} from 'lodash';

import LoggerProxy from '../common/logs/logger-proxy';

Expand Down Expand Up @@ -63,6 +63,7 @@ const CODEC_DEFAULTS = {
};

const DEBOUNCED_SOURCE_UPDATE_TIME = 1000;
const THROTTLED_SEND_REQUESTS_TIME = 2000; // used for sendRequests() calls triggered by Homer updates

type DegradationPreferences = {
maxMacroblocksLimit: number;
Expand Down Expand Up @@ -93,6 +94,7 @@ export class MediaRequestManager {
private sourceUpdateListener: () => void;

private debouncedSourceUpdateListener: () => void;
private throttledSendRequests: () => void;

private previousStreamRequests: Array<StreamRequest> = [];

Expand All @@ -114,6 +116,10 @@ export class MediaRequestManager {
this.sourceUpdateListener,
DEBOUNCED_SOURCE_UPDATE_TIME
);
this.throttledSendRequests = throttle(
this.sendRequests.bind(this),
THROTTLED_SEND_REQUESTS_TIME
);
}

public setDegradationPreferences(degradationPreferences: DegradationPreferences) {
Expand Down Expand Up @@ -398,7 +404,7 @@ export class MediaRequestManager {
mediaRequest.handleMaxFs = eventHandler;

mediaRequest.receiveSlots.forEach((rs) => {
rs.on(ReceiveSlotEvents.SourceUpdate, this.sourceUpdateListener);
rs.on(ReceiveSlotEvents.SourceUpdate, this.throttledSendRequests);
rs.on(ReceiveSlotEvents.MaxFsUpdate, mediaRequest.handleMaxFs);
});

Expand All @@ -413,7 +419,7 @@ export class MediaRequestManager {
const mediaRequest = this.clientRequests[requestId];

mediaRequest?.receiveSlots.forEach((rs) => {
rs.off(ReceiveSlotEvents.SourceUpdate, this.sourceUpdateListener);
rs.off(ReceiveSlotEvents.SourceUpdate, this.throttledSendRequests);
rs.off(ReceiveSlotEvents.MaxFsUpdate, mediaRequest.handleMaxFs);
});

Expand All @@ -438,6 +444,6 @@ export class MediaRequestManager {
this.numTotalSources = numTotalSources;
this.numLiveSources = numLiveSources;

this.sendRequests();
this.throttledSendRequests();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1118,21 +1118,167 @@ describe('MediaRequestManager', () => {
});
});

describe('throttling of calls to sendMediaRequests() caused by notifications from Homer', () => {
let clock;
const sourceUpdateHandlers = [];

beforeEach(() => {
clock = sinon.useFakeTimers();
mediaRequestManager = new MediaRequestManager(sendMediaRequestsCallback, {
degradationPreferences,
kind: 'video',
trimRequestsToNumOfSources: true,
});

sourceUpdateHandlers.length = 0;

fakeReceiveSlots.forEach((slot) => {
slot.on.callsFake((eventName, handler) => {
if (eventName === 'sourceUpdate') {
sourceUpdateHandlers.push({handler, slot});
}
});
});

// add some requests and commit them
addActiveSpeakerRequest(
255,
[
fakeReceiveSlots[0],
fakeReceiveSlots[1],
fakeReceiveSlots[2],
fakeReceiveSlots[3],
fakeReceiveSlots[4],
fakeReceiveSlots[4],
],
MAX_FS_1080p,
false
);

mediaRequestManager.setNumCurrentSources(5, 5);
mediaRequestManager.setDegradationPreferences({maxMacroblocksLimit: 8192});
mediaRequestManager.commit();

// advance time to reach a stable state after any initial throttling
clock.tick(9999);
sendMediaRequestsCallback.resetHistory();
});

afterEach(() => {
clock.restore();
});

it('throttles calls to sendMediaRequests() when multiple source updates happen', () => {
// simulate multiple source update changes
sourceUpdateHandlers.forEach(({handler, slot}) => {
slot.sourceState = `avatar`;
handler();
});

// The throttled sendMediaRequests should execute immediately on first call, then throttle subsequent calls
clock.tick(1);
assert.calledOnce(sendMediaRequestsCallback);

// after 1s simulate more updates -> they should not trigger any more calls to sendRequests()
clock.tick(1000);
sourceUpdateHandlers.forEach(({handler, slot}) => {
slot.sourceState = `live`;
handler();
});
clock.tick(1);
// still only 1 call due to throttling
assert.calledOnce(sendMediaRequestsCallback);

// now advance time by another 1s (past the throttle period) -> this should trigger another call to sendRequests()
clock.tick(1000);
assert.calledTwice(sendMediaRequestsCallback);

// and no more calls after that
clock.tick(9999);
assert.calledTwice(sendMediaRequestsCallback);
});

it('throttles calls to sendMediaRequests() when setNumCurrentSources() is called', () => {
// change number of available streams
mediaRequestManager.setNumCurrentSources(4, 4);

// The throttled function should execute immediately on first call, then throttle subsequent calls
clock.tick(1);
assert.calledOnce(sendMediaRequestsCallback);

// after 1s simulate more updates -> they should not trigger any more calls to sendRequests()
clock.tick(1000);
mediaRequestManager.setNumCurrentSources(3, 3);
clock.tick(1);
// still only 1 call due to throttling
assert.calledOnce(sendMediaRequestsCallback);

// now advance time by another 1s (past the throttle period) -> this should trigger another call to sendRequests()
clock.tick(1000);
assert.calledTwice(sendMediaRequestsCallback);

// and no more calls after that
clock.tick(9999);
assert.calledTwice(sendMediaRequestsCallback);
});

it('throttles calls to sendMediaRequests() when setNumCurrentSources() is called AND source updates happen', () => {
// change number of available streams and simulate source updates
mediaRequestManager.setNumCurrentSources(4, 4);
sourceUpdateHandlers.forEach(({handler, slot}) => {
slot.sourceState = `avatar`;
handler();
});

// The throttled function should execute immediately on first call, then throttle subsequent calls
clock.tick(1);
assert.calledOnce(sendMediaRequestsCallback);

// after 1s simulate more updates -> they should not trigger any more calls to sendRequests()
clock.tick(1000);
mediaRequestManager.setNumCurrentSources(3, 3);
sourceUpdateHandlers.forEach(({handler, slot}) => {
slot.sourceState = `live`;
handler();
});
clock.tick(1);
// still only 1 call due to throttling
assert.calledOnce(sendMediaRequestsCallback);

// now advance time by another 1s (past the throttle period) -> this should trigger another call to sendRequests()
clock.tick(1000);
assert.calledTwice(sendMediaRequestsCallback);

// and no more calls after that
clock.tick(9999);
assert.calledTwice(sendMediaRequestsCallback);
});
});

describe('trimming of requested receive slots', () => {
let clock;

beforeEach(() => {
clock = sinon.useFakeTimers();
mediaRequestManager = new MediaRequestManager(sendMediaRequestsCallback, {
degradationPreferences,
kind: 'video',
trimRequestsToNumOfSources: true,
});
});

afterEach(() => {
clock.restore();
});

const limitNumAvailableStreams = (preferLiveVideo, limit) => {
if (preferLiveVideo) {
mediaRequestManager.setNumCurrentSources(100, limit);
} else {
mediaRequestManager.setNumCurrentSources(limit, 1);
}
// Advance time to trigger the throttled sendRequests
clock.tick(2000);
};

[true, false].forEach((preferLiveVideo) =>
Expand Down
Loading