Skip to content

Commit

Permalink
fix: premature channels state consumption in ChannelManager (#1466)
Browse files Browse the repository at this point in the history
  • Loading branch information
isekovanic authored Feb 13, 2025
1 parent e6b2b81 commit ab24bf8
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 18 deletions.
26 changes: 17 additions & 9 deletions src/channel_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
promoteChannel,
shouldConsiderArchivedChannels,
shouldConsiderPinnedChannels,
uniqBy,
} from './utils';

export type ChannelManagerPagination<SCG extends ExtendableGenerics = DefaultGenerics> = {
Expand Down Expand Up @@ -275,7 +276,7 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
};

public loadNext = async () => {
const { pagination, channels, initialized } = this.state.getLatestValue();
const { pagination, initialized } = this.state.getLatestValue();
const { filters, sort, options, isLoadingNext, hasNext } = pagination;

if (!initialized || isLoadingNext || !hasNext) {
Expand All @@ -288,11 +289,12 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
pagination: { ...pagination, isLoading: false, isLoadingNext: true },
});
const nextChannels = await this.client.queryChannels(filters, sort, options, this.stateOptions);
const { channels } = this.state.getLatestValue();
const newOffset = offset + (nextChannels?.length ?? 0);
const newOptions = { ...options, offset: newOffset };

this.state.partialNext({
channels: [...(channels || []), ...nextChannels],
channels: uniqBy<Channel<SCG>>([...(channels || []), ...nextChannels], 'cid'),
pagination: {
...pagination,
hasNext: (nextChannels?.length ?? 0) >= limit,
Expand All @@ -313,9 +315,11 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {

private notificationAddedToChannelHandler = async (event: Event<SCG>) => {
const { id, type, members } = event?.channel ?? {};

if (!type || !this.options.allowNotLoadedChannelPromotionForEvent?.['notification.added_to_channel']) {
return;
}

const channel = await getAndWatchChannel({
client: this.client,
id,
Expand All @@ -328,6 +332,7 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
}, []),
type,
});

const { pagination, channels } = this.state.getLatestValue();
if (!channels) {
return;
Expand Down Expand Up @@ -415,10 +420,7 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
private notificationNewMessageHandler = async (event: Event<SCG>) => {
const { id, type } = event?.channel ?? {};

const { channels, pagination } = this.state.getLatestValue();
const { filters, sort } = pagination ?? {};

if (!channels || !id || !type) {
if (!id || !type) {
return;
}

Expand All @@ -428,10 +430,14 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
type,
});

const { channels, pagination } = this.state.getLatestValue();
const { filters, sort } = pagination ?? {};

const considerArchivedChannels = shouldConsiderArchivedChannels(filters);
const isTargetChannelArchived = isChannelArchived(channel);

if (
!channels ||
(considerArchivedChannels && isTargetChannelArchived && !filters.archived) ||
(considerArchivedChannels && !isTargetChannelArchived && filters.archived) ||
!this.options.allowNotLoadedChannelPromotionForEvent?.['notification.message_new']
Expand All @@ -449,11 +455,9 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
};

private channelVisibleHandler = async (event: Event<SCG>) => {
const { channels, pagination } = this.state.getLatestValue();
const { sort, filters } = pagination ?? {};
const { channel_type: channelType, channel_id: channelId } = event;

if (!channels || !channelType || !channelId) {
if (!channelType || !channelId) {
return;
}

Expand All @@ -463,10 +467,14 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
type: event.channel_type,
});

const { channels, pagination } = this.state.getLatestValue();
const { sort, filters } = pagination ?? {};

const considerArchivedChannels = shouldConsiderArchivedChannels(filters);
const isTargetChannelArchived = isChannelArchived(channel);

if (
!channels ||
(considerArchivedChannels && isTargetChannelArchived && !filters.archived) ||
(considerArchivedChannels && !isTargetChannelArchived && filters.archived) ||
!this.options.allowNotLoadedChannelPromotionForEvent?.['channel.visible']
Expand Down
21 changes: 21 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,27 @@ export const throttle = <T extends (...args: unknown[]) => unknown>(
};
};

const get = <T>(obj: T, path: string): unknown =>
path.split('.').reduce<unknown>((acc, key) => {
if (acc && typeof acc === 'object' && key in acc) {
return (acc as Record<string, unknown>)[key];
}
return undefined;
}, obj);

// works exactly the same as lodash.uniqBy
export const uniqBy = <T>(array: T[] | unknown, iteratee: ((item: T) => unknown) | keyof T): T[] => {
if (!Array.isArray(array)) return [];

const seen = new Set<unknown>();
return array.filter((item) => {
const key = typeof iteratee === 'function' ? iteratee(item) : get(item, iteratee as string);
if (seen.has(key)) return false;
seen.add(key);
return true;
});
};

type MessagePaginationUpdatedParams<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = {
parentSet: MessageSet;
requestedPageSize: number;
Expand Down
151 changes: 149 additions & 2 deletions test/unit/channel_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ describe('ChannelManager', () => {
});
clientQueryChannelsStub = sinon.stub(client, 'queryChannels').callsFake((_filters, _sort, options) => {
const offset = options?.offset ?? 0;
return Promise.resolve(mockChannelPages[offset / 10]);
return Promise.resolve(mockChannelPages[Math.floor(offset / 10)]);
});
});

Expand Down Expand Up @@ -563,6 +563,153 @@ describe('ChannelManager', () => {
expect(channels.length).to.equal(20);
});

it('should properly paginate even if state.channels gets modified in the meantime', async () => {
await channelManager.queryChannels({ filterA: true }, { asc: 1 }, { limit: 10, offset: 0 });
channelManager.state.next((prevState) => ({
...prevState,
channels: [...mockChannelPages[2].slice(0, 5), ...prevState.channels],
}));

const stateChangeSpy = sinon.spy();
channelManager.state.subscribeWithSelector(
(nextValue) => ({ pagination: nextValue.pagination }),
stateChangeSpy,
);

stateChangeSpy.resetHistory();

await channelManager.loadNext();

const { channels } = channelManager.state.getLatestValue();

expect(clientQueryChannelsStub.callCount).to.equal(2);
expect(stateChangeSpy.callCount).to.equal(2);
expect(stateChangeSpy.args[0][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: true,
options: { limit: 10, offset: 10 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[1][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 20 },
sort: { asc: 1 },
},
});
expect(channels.length).to.equal(25);
});

it('should properly deduplicate when paginating if channels from the next page have been promoted', async () => {
await channelManager.queryChannels({ filterA: true }, { asc: 1 }, { limit: 10, offset: 0 });
channelManager.state.next((prevState) => ({
...prevState,
channels: [...mockChannelPages[1].slice(0, 5), ...prevState.channels],
}));

const stateChangeSpy = sinon.spy();
channelManager.state.subscribeWithSelector(
(nextValue) => ({ pagination: nextValue.pagination }),
stateChangeSpy,
);

stateChangeSpy.resetHistory();

await channelManager.loadNext();

const { channels } = channelManager.state.getLatestValue();

expect(clientQueryChannelsStub.callCount).to.equal(2);
expect(stateChangeSpy.callCount).to.equal(2);
expect(stateChangeSpy.args[0][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: true,
options: { limit: 10, offset: 10 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[1][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 20 },
sort: { asc: 1 },
},
});
expect(channels.length).to.equal(20);
});

it('should properly deduplicate when paginating if channels latter pages have been promoted and reached', async () => {
await channelManager.queryChannels({ filterA: true }, { asc: 1 }, { limit: 10, offset: 0 });
channelManager.state.next((prevState) => ({
...prevState,
channels: [...mockChannelPages[2].slice(0, 3), ...prevState.channels],
}));

const stateChangeSpy = sinon.spy();
channelManager.state.subscribeWithSelector(
(nextValue) => ({ pagination: nextValue.pagination }),
stateChangeSpy,
);

stateChangeSpy.resetHistory();

await channelManager.loadNext();

const { channels: channelsAfterFirstPagination } = channelManager.state.getLatestValue();
expect(channelsAfterFirstPagination.length).to.equal(23);

await channelManager.loadNext();

const { channels } = channelManager.state.getLatestValue();

expect(clientQueryChannelsStub.callCount).to.equal(3);
expect(stateChangeSpy.callCount).to.equal(4);
expect(stateChangeSpy.args[0][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: true,
options: { limit: 10, offset: 10 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[1][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 20 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[3][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: false,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 25 },
sort: { asc: 1 },
},
});
expect(channels.length).to.equal(25);
});

it('should correctly update hasNext and offset if the last page has been reached', async () => {
const { channels: initialChannels } = channelManager.state.getLatestValue();
expect(initialChannels.length).to.equal(0);
Expand Down Expand Up @@ -999,7 +1146,7 @@ describe('ChannelManager', () => {

await clock.runAllAsync();

expect(getAndWatchChannelStub.called).to.be.false;
expect(getAndWatchChannelStub.called).to.be.true;
expect(setChannelsStub.called).to.be.false;
});

Expand Down
Loading

0 comments on commit ab24bf8

Please sign in to comment.