Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: premature channels state consumption in ChannelManager #1466

Merged
merged 7 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions src/channel_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
promoteChannel,
shouldConsiderArchivedChannels,
shouldConsiderPinnedChannels,
uniqBy,
} from './utils';

export type ChannelManagerPagination<SCG extends ExtendableGenerics = DefaultGenerics> = {
Expand Down Expand Up @@ -275,7 +276,7 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
};

public loadNext = async () => {
const { pagination, channels, initialized } = this.state.getLatestValue();
const { pagination, initialized } = this.state.getLatestValue();
const { filters, sort, options, isLoadingNext, hasNext } = pagination;

if (!initialized || isLoadingNext || !hasNext) {
Expand All @@ -288,11 +289,12 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
pagination: { ...pagination, isLoading: false, isLoadingNext: true },
});
const nextChannels = await this.client.queryChannels(filters, sort, options, this.stateOptions);
const { channels } = this.state.getLatestValue();
const newOffset = offset + (nextChannels?.length ?? 0);
const newOptions = { ...options, offset: newOffset };

this.state.partialNext({
channels: [...(channels || []), ...nextChannels],
channels: uniqBy<Channel<SCG>>([...(channels || []), ...nextChannels], 'cid'),
pagination: {
...pagination,
hasNext: (nextChannels?.length ?? 0) >= limit,
Expand All @@ -313,9 +315,11 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {

private notificationAddedToChannelHandler = async (event: Event<SCG>) => {
const { id, type, members } = event?.channel ?? {};

if (!type || !this.options.allowNotLoadedChannelPromotionForEvent?.['notification.added_to_channel']) {
return;
}

const channel = await getAndWatchChannel({
client: this.client,
id,
Expand All @@ -328,6 +332,7 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
}, []),
type,
});

const { pagination, channels } = this.state.getLatestValue();
if (!channels) {
return;
Expand Down Expand Up @@ -415,10 +420,7 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
private notificationNewMessageHandler = async (event: Event<SCG>) => {
const { id, type } = event?.channel ?? {};

const { channels, pagination } = this.state.getLatestValue();
const { filters, sort } = pagination ?? {};

if (!channels || !id || !type) {
if (!id || !type) {
return;
}

Expand All @@ -428,10 +430,14 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
type,
});

const { channels, pagination } = this.state.getLatestValue();
const { filters, sort } = pagination ?? {};

const considerArchivedChannels = shouldConsiderArchivedChannels(filters);
const isTargetChannelArchived = isChannelArchived(channel);

if (
!channels ||
(considerArchivedChannels && isTargetChannelArchived && !filters.archived) ||
(considerArchivedChannels && !isTargetChannelArchived && filters.archived) ||
!this.options.allowNotLoadedChannelPromotionForEvent?.['notification.message_new']
Expand All @@ -449,11 +455,9 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
};

private channelVisibleHandler = async (event: Event<SCG>) => {
const { channels, pagination } = this.state.getLatestValue();
const { sort, filters } = pagination ?? {};
const { channel_type: channelType, channel_id: channelId } = event;

if (!channels || !channelType || !channelId) {
if (!channelType || !channelId) {
return;
}

Expand All @@ -463,10 +467,14 @@ export class ChannelManager<SCG extends ExtendableGenerics = DefaultGenerics> {
type: event.channel_type,
});

const { channels, pagination } = this.state.getLatestValue();
const { sort, filters } = pagination ?? {};

const considerArchivedChannels = shouldConsiderArchivedChannels(filters);
const isTargetChannelArchived = isChannelArchived(channel);

if (
!channels ||
(considerArchivedChannels && isTargetChannelArchived && !filters.archived) ||
(considerArchivedChannels && !isTargetChannelArchived && filters.archived) ||
!this.options.allowNotLoadedChannelPromotionForEvent?.['channel.visible']
Expand Down
21 changes: 21 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,27 @@ export const throttle = <T extends (...args: unknown[]) => unknown>(
};
};

const get = <T>(obj: T, path: string): unknown =>
path.split('.').reduce<unknown>((acc, key) => {
if (acc && typeof acc === 'object' && key in acc) {
return (acc as Record<string, unknown>)[key];
}
return undefined;
}, obj);

// works exactly the same as lodash.uniqBy
export const uniqBy = <T>(array: T[] | unknown, iteratee: ((item: T) => unknown) | keyof T): T[] => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the same implementation as in lodash obviously and it does not use their fast SetCache, but close enough for such small lists. Still ~ O(n).

if (!Array.isArray(array)) return [];

const seen = new Set<unknown>();
return array.filter((item) => {
const key = typeof iteratee === 'function' ? iteratee(item) : get(item, iteratee as string);
if (seen.has(key)) return false;
seen.add(key);
return true;
});
};

type MessagePaginationUpdatedParams<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = {
parentSet: MessageSet;
requestedPageSize: number;
Expand Down
151 changes: 149 additions & 2 deletions test/unit/channel_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ describe('ChannelManager', () => {
});
clientQueryChannelsStub = sinon.stub(client, 'queryChannels').callsFake((_filters, _sort, options) => {
const offset = options?.offset ?? 0;
return Promise.resolve(mockChannelPages[offset / 10]);
return Promise.resolve(mockChannelPages[Math.floor(offset / 10)]);
});
});

Expand Down Expand Up @@ -563,6 +563,153 @@ describe('ChannelManager', () => {
expect(channels.length).to.equal(20);
});

it('should properly paginate even if state.channels gets modified in the meantime', async () => {
await channelManager.queryChannels({ filterA: true }, { asc: 1 }, { limit: 10, offset: 0 });
channelManager.state.next((prevState) => ({
...prevState,
channels: [...mockChannelPages[2].slice(0, 5), ...prevState.channels],
}));

const stateChangeSpy = sinon.spy();
channelManager.state.subscribeWithSelector(
(nextValue) => ({ pagination: nextValue.pagination }),
stateChangeSpy,
);

stateChangeSpy.resetHistory();

await channelManager.loadNext();

const { channels } = channelManager.state.getLatestValue();

expect(clientQueryChannelsStub.callCount).to.equal(2);
expect(stateChangeSpy.callCount).to.equal(2);
expect(stateChangeSpy.args[0][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: true,
options: { limit: 10, offset: 10 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[1][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 20 },
sort: { asc: 1 },
},
});
expect(channels.length).to.equal(25);
});

it('should properly deduplicate when paginating if channels from the next page have been promoted', async () => {
await channelManager.queryChannels({ filterA: true }, { asc: 1 }, { limit: 10, offset: 0 });
channelManager.state.next((prevState) => ({
...prevState,
channels: [...mockChannelPages[1].slice(0, 5), ...prevState.channels],
}));

const stateChangeSpy = sinon.spy();
channelManager.state.subscribeWithSelector(
(nextValue) => ({ pagination: nextValue.pagination }),
stateChangeSpy,
);

stateChangeSpy.resetHistory();

await channelManager.loadNext();

const { channels } = channelManager.state.getLatestValue();

expect(clientQueryChannelsStub.callCount).to.equal(2);
expect(stateChangeSpy.callCount).to.equal(2);
expect(stateChangeSpy.args[0][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: true,
options: { limit: 10, offset: 10 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[1][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 20 },
sort: { asc: 1 },
},
});
expect(channels.length).to.equal(20);
});

it('should properly deduplicate when paginating if channels latter pages have been promoted and reached', async () => {
await channelManager.queryChannels({ filterA: true }, { asc: 1 }, { limit: 10, offset: 0 });
channelManager.state.next((prevState) => ({
...prevState,
channels: [...mockChannelPages[2].slice(0, 3), ...prevState.channels],
}));

const stateChangeSpy = sinon.spy();
channelManager.state.subscribeWithSelector(
(nextValue) => ({ pagination: nextValue.pagination }),
stateChangeSpy,
);

stateChangeSpy.resetHistory();

await channelManager.loadNext();

const { channels: channelsAfterFirstPagination } = channelManager.state.getLatestValue();
expect(channelsAfterFirstPagination.length).to.equal(23);

await channelManager.loadNext();

const { channels } = channelManager.state.getLatestValue();

expect(clientQueryChannelsStub.callCount).to.equal(3);
expect(stateChangeSpy.callCount).to.equal(4);
expect(stateChangeSpy.args[0][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: true,
options: { limit: 10, offset: 10 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[1][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: true,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 20 },
sort: { asc: 1 },
},
});
expect(stateChangeSpy.args[3][0]).to.deep.equal({
pagination: {
filters: { filterA: true },
hasNext: false,
isLoading: false,
isLoadingNext: false,
options: { limit: 10, offset: 25 },
sort: { asc: 1 },
},
});
expect(channels.length).to.equal(25);
});

it('should correctly update hasNext and offset if the last page has been reached', async () => {
const { channels: initialChannels } = channelManager.state.getLatestValue();
expect(initialChannels.length).to.equal(0);
Expand Down Expand Up @@ -999,7 +1146,7 @@ describe('ChannelManager', () => {

await clock.runAllAsync();

expect(getAndWatchChannelStub.called).to.be.false;
expect(getAndWatchChannelStub.called).to.be.true;
expect(setChannelsStub.called).to.be.false;
});

Expand Down
Loading