Skip to content

Commit

Permalink
SCG - First pass
Browse files Browse the repository at this point in the history
  • Loading branch information
arnautov-anton committed Jan 30, 2025
1 parent f0d90e4 commit d17fc82
Show file tree
Hide file tree
Showing 18 changed files with 1,138 additions and 1,557 deletions.
8 changes: 4 additions & 4 deletions src/campaign.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { StreamChat } from './client';
import { CampaignData, DefaultGenerics, ExtendableGenerics } from './types';
import { CampaignData } from './types';

export class Campaign<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
export class Campaign {
id: string | null;
data?: CampaignData;
client: StreamChat<StreamChatGenerics>;
client: StreamChat;

constructor(client: StreamChat<StreamChatGenerics>, id: string | null, data?: CampaignData) {
constructor(client: StreamChat, id: string | null, data?: CampaignData) {
this.client = client;
this.id = id;
this.data = data;
Expand Down
409 changes: 188 additions & 221 deletions src/channel.ts

Large diffs are not rendered by default.

130 changes: 64 additions & 66 deletions src/channel_state.ts

Large diffs are not rendered by default.

587 changes: 262 additions & 325 deletions src/client.ts

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions src/client_state.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { UserResponse, ExtendableGenerics, DefaultGenerics } from './types';
import { UserResponse } from './types';
import { StreamChat } from './client';

/**
* ClientState - A container class for the client state.
*/
export class ClientState<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
private client: StreamChat<StreamChatGenerics>;
export class ClientState {
private client: StreamChat;
users: {
[key: string]: UserResponse<StreamChatGenerics>;
[key: string]: UserResponse;
};
userChannelReferences: { [key: string]: { [key: string]: boolean } };
constructor({ client }: { client: StreamChat<StreamChatGenerics> }) {
constructor({ client }: { client: StreamChat }) {
// show the status for a certain user...
// ie online, offline etc
this.client = client;
Expand All @@ -19,19 +19,19 @@ export class ClientState<StreamChatGenerics extends ExtendableGenerics = Default
this.userChannelReferences = {};
}

updateUsers(users: UserResponse<StreamChatGenerics>[]) {
updateUsers(users: UserResponse[]) {
for (const user of users) {
this.updateUser(user);
}
}

updateUser(user?: UserResponse<StreamChatGenerics>) {
updateUser(user?: UserResponse) {
if (user != null && this.client._cacheEnabled()) {
this.users[user.id] = user;
}
}

updateUserReference(user: UserResponse<StreamChatGenerics>, channelID: string) {
updateUserReference(user: UserResponse, channelID: string) {
if (user == null || !this.client._cacheEnabled()) {
return;
}
Expand Down
55 changes: 31 additions & 24 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import {
addConnectionEventListeners,
} from './utils';
import { buildWsFatalInsight, buildWsSuccessAfterFailureInsight, postInsights } from './insights';
import { ConnectAPIResponse, ConnectionOpen, ExtendableGenerics, DefaultGenerics, UR, LogLevel } from './types';
import { ConnectAPIResponse, ConnectionOpen, UR, LogLevel } from './types';
import { StreamChat } from './client';
import { APIError } from 'errors';

// Type guards to check WebSocket error type
const isCloseEvent = (res: WebSocket.CloseEvent | WebSocket.Data | WebSocket.ErrorEvent): res is WebSocket.CloseEvent =>
Expand All @@ -36,13 +37,13 @@ const isErrorEvent = (res: WebSocket.CloseEvent | WebSocket.Data | WebSocket.Err
* - state can be recovered by querying the channel again
* - if the servers fails to publish a message to the client, the WS connection is destroyed
*/
export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
export class StableWSConnection {
// global from constructor
client: StreamChat<StreamChatGenerics>;
client: StreamChat;

// local vars
connectionID?: string;
connectionOpen?: ConnectAPIResponse<StreamChatGenerics>;
connectionOpen?: ConnectAPIResponse;
consecutiveFailures: number;
pingInterval: number;
healthCheckTimeoutRef?: NodeJS.Timeout;
Expand All @@ -57,12 +58,12 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
reason?: Error & { code?: string | number; isWSFailure?: boolean; StatusCode?: string | number },
) => void;
requestID: string | undefined;
resolvePromise?: (value: ConnectionOpen<StreamChatGenerics>) => void;
resolvePromise?: (value: ConnectionOpen) => void;
totalFailures: number;
ws?: WebSocket;
wsID: number;

constructor({ client }: { client: StreamChat<StreamChatGenerics> }) {
constructor({ client }: { client: StreamChat }) {
/** StreamChat client */
this.client = client;
/** consecutive failures influence the duration of the timeout */
Expand Down Expand Up @@ -92,7 +93,7 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
this.client.logger(level, 'connection:' + msg, { tags: ['connection'], ...extra });
}

setClient(client: StreamChat<StreamChatGenerics>) {
setClient(client: StreamChat) {
this.client = client;
}

Expand All @@ -117,17 +118,19 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
this.isHealthy = false;
this.consecutiveFailures += 1;

if (error.code === chatCodes.TOKEN_EXPIRED && !this.client.tokenManager.isStatic()) {
const e = error as APIError;

if (e.code === chatCodes.TOKEN_EXPIRED && !this.client.tokenManager.isStatic()) {
this._log('connect() - WS failure due to expired token, so going to try to reload token and reconnect');
this._reconnect({ refreshToken: true });
} else if (!error.isWSFailure) {
} else if (!e.isWSFailure) {
// API rejected the connection and we should not retry
throw new Error(
JSON.stringify({
code: error.code,
StatusCode: error.StatusCode,
message: error.message,
isWSFailure: error.isWSFailure,
code: e.code,
StatusCode: e.StatusCode,
message: e.message,
isWSFailure: e.isWSFailure,
}),
);
}
Expand All @@ -149,13 +152,15 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
try {
return await this.connectionOpen;
} catch (error) {
const e = error as APIError;

if (i === timeout) {
throw new Error(
JSON.stringify({
code: error.code,
StatusCode: error.StatusCode,
message: error.message,
isWSFailure: error.isWSFailure,
code: e.code,
StatusCode: e.StatusCode,
message: e.message,
isWSFailure: e.isWSFailure,
}),
);
}
Expand Down Expand Up @@ -298,17 +303,17 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
}
return response;
}
} catch (err) {
} catch (error) {
this.isConnecting = false;
this._log(`_connect() - Error - `, err);
this._log(`_connect() - Error - `, error);
if (this.client.options.enableInsights) {
this.client.insightMetrics.wsConsecutiveFailures++;
this.client.insightMetrics.wsTotalFailures++;

const insights = buildWsFatalInsight((this as unknown) as StableWSConnection, convertErrorToJson(err as Error));
const insights = buildWsFatalInsight((this as unknown) as StableWSConnection, convertErrorToJson(error as Error));
postInsights?.('ws_fatal', insights);
}
throw err;
throw error;
}
}

Expand Down Expand Up @@ -367,16 +372,18 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =

this.consecutiveFailures = 0;
} catch (error) {
const e = error as APIError;

this.isHealthy = false;
this.consecutiveFailures += 1;
if (error.code === chatCodes.TOKEN_EXPIRED && !this.client.tokenManager.isStatic()) {
if (e.code === chatCodes.TOKEN_EXPIRED && !this.client.tokenManager.isStatic()) {
this._log('_reconnect() - WS failure due to expired token, so going to try to reload token and reconnect');

return this._reconnect({ refreshToken: true });
}

// reconnect on WS failures, don't reconnect if there is a code bug
if (error.isWSFailure) {
if (e.isWSFailure) {
this._log('_reconnect() - WS failure, so going to try to reconnect');

this._reconnect();
Expand Down Expand Up @@ -576,7 +583,7 @@ export class StableWSConnection<StreamChatGenerics extends ExtendableGenerics =
_setupConnectionPromise = () => {
this.isResolved = false;
/** a promise that is resolved once ws.open is called */
this.connectionOpen = new Promise<ConnectionOpen<StreamChatGenerics>>((resolve, reject) => {
this.connectionOpen = new Promise<ConnectionOpen>((resolve, reject) => {
this.resolvePromise = resolve;
this.rejectPromise = reject;
});
Expand Down
20 changes: 10 additions & 10 deletions src/connection_fallback.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import axios, { AxiosRequestConfig, CancelTokenSource } from 'axios';
import { StreamChat } from './client';
import { addConnectionEventListeners, removeConnectionEventListeners, retryInterval, sleep } from './utils';
import { isAPIError, isConnectionIDError, isErrorRetryable } from './errors';
import { ConnectionOpen, Event, UR, ExtendableGenerics, DefaultGenerics, LogLevel } from './types';
import { APIError, isAPIError, isConnectionIDError, isErrorRetryable } from './errors';
import { ConnectionOpen, Event, UR, LogLevel } from './types';

export enum ConnectionState {
Closed = 'CLOSED',
Expand All @@ -12,14 +12,14 @@ export enum ConnectionState {
Init = 'INIT',
}

export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
client: StreamChat<StreamChatGenerics>;
export class WSConnectionFallback {
client: StreamChat;
state: ConnectionState;
consecutiveFailures: number;
connectionID?: string;
cancelToken?: CancelTokenSource;

constructor({ client }: { client: StreamChat<StreamChatGenerics> }) {
constructor({ client }: { client: StreamChat }) {
this.client = client;
this.state = ConnectionState.Init;
this.consecutiveFailures = 0;
Expand Down Expand Up @@ -84,7 +84,7 @@ export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics
} catch (err) {
this.consecutiveFailures += 1;

if (retry && isErrorRetryable(err)) {
if (retry && isErrorRetryable(err as APIError)) {
this._log(`_req() - Retryable error, retrying request`);
await sleep(retryInterval(this.consecutiveFailures));
return this._req<T>(params, config, retry);
Expand All @@ -99,7 +99,7 @@ export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics
while (this.state === ConnectionState.Connected) {
try {
const data = await this._req<{
events: Event<StreamChatGenerics>[];
events: Event[];
}>({}, { timeout: 30000 }, true); // 30s => API responds in 20s if there is no event

if (data.events?.length) {
Expand All @@ -115,14 +115,14 @@ export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics

/** client.doAxiosRequest will take care of TOKEN_EXPIRED error */

if (isConnectionIDError(err)) {
if (isConnectionIDError(err as APIError)) {
this._log(`_poll() - ConnectionID error, connecting without ID...`);
this._setState(ConnectionState.Disconnected);
this.connect(true);
return;
}

if (isAPIError(err) && !isErrorRetryable(err)) {
if (isAPIError(err as APIError) && !isErrorRetryable(err as APIError)) {
this._setState(ConnectionState.Closed);
return;
}
Expand All @@ -149,7 +149,7 @@ export class WSConnectionFallback<StreamChatGenerics extends ExtendableGenerics
this._setState(ConnectionState.Connecting);
this.connectionID = undefined; // connect should be sent with empty connection_id so API creates one
try {
const { event } = await this._req<{ event: ConnectionOpen<StreamChatGenerics> }>(
const { event } = await this._req<{ event: ConnectionOpen }>(
{ json: this.client._buildWSPayload() },
{ timeout: 8000 }, // 8s
reconnect,
Expand Down
2 changes: 1 addition & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const APIErrorCodes: Record<string, { name: string; retryable: boolean }>
'99': { name: 'AppSuspendedError', retryable: false },
};

type APIError = Error & { code: number; isWSFailure?: boolean };
export type APIError = Error & { code: number; isWSFailure?: boolean; StatusCode?: number };

export function isAPIError(error: Error): error is APIError {
return (error as APIError).code !== undefined;
Expand Down
12 changes: 5 additions & 7 deletions src/moderation.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import {
APIResponse,
ModerationConfig,
DefaultGenerics,
ExtendableGenerics,
GetConfigResponse,
GetUserModerationReportResponse,
MuteUserResponse,
Expand Down Expand Up @@ -31,10 +29,10 @@ export const MODERATION_ENTITY_TYPES = {
};

// Moderation class provides all the endpoints related to moderation v2.
export class Moderation<StreamChatGenerics extends ExtendableGenerics = DefaultGenerics> {
client: StreamChat<StreamChatGenerics>;
export class Moderation {
client: StreamChat;

constructor(client: StreamChat<StreamChatGenerics>) {
constructor(client: StreamChat) {
this.client = client;
}

Expand Down Expand Up @@ -104,7 +102,7 @@ export class Moderation<StreamChatGenerics extends ExtendableGenerics = DefaultG
* @returns
*/
async muteUser(targetID: string, options: ModerationMuteOptions = {}) {
return await this.client.post<MuteUserResponse<StreamChatGenerics> & APIResponse>(
return await this.client.post<MuteUserResponse & APIResponse>(
this.client.baseURL + '/api/v2/moderation/mute',
{
target_ids: [targetID],
Expand Down Expand Up @@ -144,7 +142,7 @@ export class Moderation<StreamChatGenerics extends ExtendableGenerics = DefaultG
* @param {boolean} options.include_user_mutes Include user mutes
*/
async getUserModerationReport(userID: string, options: GetUserModerationReportOptions = {}) {
return await this.client.get<GetUserModerationReportResponse<StreamChatGenerics>>(
return await this.client.get<GetUserModerationReportResponse>(
this.client.baseURL + `/api/v2/moderation/user_report`,
{
user_id: userID,
Expand Down
Loading

0 comments on commit d17fc82

Please sign in to comment.