Skip to content

Commit

Permalink
feat: thread list feature
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalnarkhede committed Jan 20, 2024
1 parent 78600ed commit d0a6309
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 64 deletions.
60 changes: 2 additions & 58 deletions src/channel_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
UserResponse,
PendingMessageResponse,
} from './types';
import { addToMessageList } from './utils';

type ChannelReadStatus<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = Record<
string,
Expand Down Expand Up @@ -441,64 +442,7 @@ export class ChannelState<StreamChatGenerics extends ExtendableGenerics = Defaul
sortBy: 'pinned_at' | 'created_at' = 'created_at',
addIfDoesNotExist = true,
) {
const addMessageToList = addIfDoesNotExist || timestampChanged;
let messageArr = messages;

// if created_at has changed, message should be filtered and re-inserted in correct order
// slow op but usually this only happens for a message inserted to state before actual response with correct timestamp
if (timestampChanged) {
messageArr = messageArr.filter((msg) => !(msg.id && message.id === msg.id));
}

// Get array length after filtering
const messageArrayLength = messageArr.length;

// for empty list just concat and return unless it's an update or deletion
if (messageArrayLength === 0 && addMessageToList) {
return messageArr.concat(message);
} else if (messageArrayLength === 0) {
return [...messageArr];
}

const messageTime = (message[sortBy] as Date).getTime();
const messageIsNewest = (messageArr[messageArrayLength - 1][sortBy] as Date).getTime() < messageTime;

// if message is newer than last item in the list concat and return unless it's an update or deletion
if (messageIsNewest && addMessageToList) {
return messageArr.concat(message);
} else if (messageIsNewest) {
return [...messageArr];
}

// find the closest index to push the new message
let left = 0;
let middle = 0;
let right = messageArrayLength - 1;
while (left <= right) {
middle = Math.floor((right + left) / 2);
if ((messageArr[middle][sortBy] as Date).getTime() <= messageTime) left = middle + 1;
else right = middle - 1;
}

// message already exists and not filtered due to timestampChanged, update and return
if (!timestampChanged && message.id) {
if (messageArr[left] && message.id === messageArr[left].id) {
messageArr[left] = message;
return [...messageArr];
}

if (messageArr[left - 1] && message.id === messageArr[left - 1].id) {
messageArr[left - 1] = message;
return [...messageArr];
}
}

// Do not add updated or deleted messages to the list if they do not already exist
// or have a timestamp change.
if (addMessageToList) {
messageArr.splice(left, 0, message);
}
return [...messageArr];
return addToMessageList(messages, message, timestampChanged, sortBy, addIfDoesNotExist);
}

/**
Expand Down
102 changes: 99 additions & 3 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ import {
GetImportResponse,
GetMessageAPIResponse,
GetRateLimitsResponse,
GetThreadsAPIResponse,
QueryThreadsAPIResponse,
GetUnreadCountAPIResponse,
ListChannelResponse,
ListCommandsResponse,
Expand Down Expand Up @@ -165,8 +165,11 @@ import {
UserOptions,
UserResponse,
UserSort,
GetThreadAPIResponse,
PartialThreadUpdate,
} from './types';
import { InsightMetrics, postInsights } from './insights';
import { Thread } from './thread';

function isString(x: unknown): x is string {
return typeof x === 'string' || x instanceof String;
Expand Down Expand Up @@ -2579,8 +2582,101 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
);
}

async getThreads() {
return await this.get<GetThreadsAPIResponse>(this.baseURL + `/threads`);
async queryThreads(options?: {
limit?: number;
next?: string;
participant_limit?: number;
prev?: string;
reply_limit?: number;
watch?: boolean;
}) {
const opts = {
limit: 2,
participant_limit: 100,
reply_limit: 3,
watch: true,
...(options || {}),
};

const res = await this.post<QueryThreadsAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads`, opts);
const threads: Thread<StreamChatGenerics>[] = [];

for (const t of res.threads) {
const thread = new Thread<StreamChatGenerics>(this, t);
threads.push(thread);
}

// TODO: Currently we are handling watch on client level. We should move this to server side.
const cids = threads.map((thread) => thread.channel.cid);
if (options?.watch && cids.length > 0) {
await this.queryChannels(
{
cid: { $in: cids },
} as ChannelFilters<StreamChatGenerics>,
{},
{
limit: 30,
message_limit: 0,
watch: true,
},
);
}

return {
threads,
next: res.next,
};
}

async getThread(
messageId: string,
options: { participant_limit?: number; reply_limit?: number; watch?: boolean } = {},
) {
const opts = {
participant_limit: 100,
reply_limit: 3,
...options,
};

const res = await this.get<GetThreadAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads/${messageId}`, opts);

if (options?.watch) {
const channel = this.channel(res.thread.channel.type, res.thread.channel.id);
await channel.watch();
}

return new Thread<StreamChatGenerics>(this, res.thread);
}
async partialUpdateThread(messageId: string, partialThreadObject: PartialThreadUpdate) {
if (!messageId) {
throw Error('Please specify the message id when calling updateThread');
}

// check for reserved fields from ThreadResponse type within partialThreadObject's set and unset.
// Throw error if any of the reserved field is found.
const reservedThreadFields = [
'created_at',
'id',
'last_message_at',
'type',
'updated_at',
'user',
'reply_count',
'participants',
'channel',
];

for (const key in { ...partialThreadObject.set, ...partialThreadObject.unset }) {
if (reservedThreadFields.includes(key)) {
throw Error(
`You cannot set ${key} field. ${key} is reserved for server-side use. Please omit ${key} from your set object.`,
);
}
}

return await this.patch<GetThreadAPIResponse<StreamChatGenerics>>(this.baseURL + `/threads/${messageId}`, {
...partialThreadObject,
});
}

getUserAgent() {
Expand Down
1 change: 1 addition & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export const EVENT_MAP = {
'notification.message_new': true,
'notification.mutes_updated': true,
'notification.removed_from_channel': true,
'notification.thread_message_new': true,
'reaction.deleted': true,
'reaction.new': true,
'reaction.updated': true,
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ export * from './client';
export * from './client_state';
export * from './channel';
export * from './channel_state';
export * from './thread';
export * from './connection';
export * from './events';
export * from './permissions';
export * from './signing';
export * from './token_manager';
export * from './insights';
export * from './types';
export { isOwnUser, chatCodes, logChatPromiseExecution } from './utils';
export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils';
65 changes: 65 additions & 0 deletions src/thread.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { StreamChat } from './client';
import {
DefaultGenerics,
ExtendableGenerics,
MessageResponse,
ThreadResponse,
ChannelResponse,
FormatMessageResponse,
} from './types';
import { addToMessageList, formatMessage } from './utils';

export class Thread<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
id: string;
latestReplies: FormatMessageResponse<StreamChatGenerics>[] = [];
participants: ThreadResponse['thread_participants'] = [];
message: FormatMessageResponse<StreamChatGenerics>;
channel: ChannelResponse<StreamChatGenerics>;
replyCount = 0;
_client: StreamChat<StreamChatGenerics>;

constructor(client: StreamChat<StreamChatGenerics>, t: ThreadResponse<StreamChatGenerics>) {
this.id = t.parent_message.id;
this.message = formatMessage(t.parent_message);
this.latestReplies = t.latest_replies.map(formatMessage);
this.participants = t.thread_participants;
this.replyCount = t.reply_count;
this.channel = t.channel;
this._client = client;
}

getClient(): StreamChat<StreamChatGenerics> {
return this._client;
}

addReply(message: MessageResponse<StreamChatGenerics>) {
this.latestReplies = addToMessageList(this.latestReplies, formatMessage(message));
}

updateReply(message: MessageResponse<StreamChatGenerics>) {
this.latestReplies = this.latestReplies.map((m) => {
if (m.id === message.id) {
return formatMessage(message);
}
return m;
});
}

updateMessageOrReplyIfExists(message: MessageResponse<StreamChatGenerics>) {
if (!message.parent_id && message.id !== this.message.id) {
return;
}

if (message.parent_id && message.parent_id !== this.message.id) {
return;
}

if (message.parent_id && message.parent_id === this.message.id) {
this.updateReply(message);
}

if (!message.parent_id && message.id === this.message.id) {
this.message = formatMessage(message);
}
}
}
32 changes: 31 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,36 @@ export type GetMessageAPIResponse<
StreamChatGenerics extends ExtendableGenerics = DefaultGenerics
> = SendMessageAPIResponse<StreamChatGenerics>;

export type GetThreadsAPIResponse = APIResponse;
export type ThreadResponse<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = {
channel: ChannelResponse<StreamChatGenerics>;
channel_cid: string;
created_at: string;
deleted_at: string;
latest_replies: MessageResponse<StreamChatGenerics>[];
parent_message: MessageResponse<StreamChatGenerics>;
parent_message_id: string;
reply_count: number;
thread_participants: {
created_at: string;
user: UserResponse<StreamChatGenerics>;
}[];
title: string;
updated_at: string;
};

// TODO: Figure out a way to strongly type set and unset.
export type PartialThreadUpdate = {
set?: Partial<Record<string, unknown>>;
unset?: Partial<Record<string, unknown>>;
};

export type QueryThreadsAPIResponse<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = APIResponse & {
threads: ThreadResponse<StreamChatGenerics>[];
next?: string;
};
export type GetThreadAPIResponse<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> = APIResponse & {
thread: ThreadResponse<StreamChatGenerics>;
};

export type GetMultipleMessagesAPIResponse<
StreamChatGenerics extends ExtendableGenerics = DefaultGenerics
Expand Down Expand Up @@ -1079,6 +1108,7 @@ export type Event<StreamChatGenerics extends ExtendableGenerics = DefaultGeneric
reaction?: ReactionResponse<StreamChatGenerics>;
received_at?: string | Date;
team?: string;
thread?: ThreadResponse<StreamChatGenerics>;
// @deprecated number of all unread messages across all current user's unread channels, equals unread_count
total_unread_count?: number;
// number of all current user's channels with at least one unread message including the channel in this event
Expand Down
Loading

0 comments on commit d0a6309

Please sign in to comment.