Skip to content

Commit

Permalink
Validate messages instead (check deleted)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnautov-anton committed Jan 9, 2025
1 parent 9f46482 commit b3cfe1f
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 45 deletions.
18 changes: 18 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2552,6 +2552,24 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
return messageId;
}

public updateLiveLocation(
message: MessageResponse<StreamChatGenerics>,
{ latitude, longitude }: { latitude: number; longitude: number },
) {
const [attachment] = message.attachments ?? [];

if (!attachment || attachment.type !== 'live_location') {
throw new Error(
'Supplied message either has no attachments to update or attachment is not of type "live_location"',
);
}

return this.partialUpdateMessage(message.id, {
// @ts-expect-error valid update
set: { attachments: [{ ...attachment, latitude, longitude }] },
});
}

/**
* pinMessage - pins the message
* @param {string | { id: string }} messageOrMessageId message object or message id
Expand Down
90 changes: 45 additions & 45 deletions src/live_location_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@

import { withCancellation } from './concurrency';
import { StateStore } from './store';
import type { MessageResponse, Attachment, EventTypes, ExtendableGenerics } from './types';
import type { MessageResponse, Attachment, EventTypes, ExtendableGenerics, UpdateMessageAPIResponse } from './types';
import type { StreamChat } from './client';
import type { Unsubscribe } from './store';

// type Unsubscribe = () => void;
type WatchLocation = (handler: (value: { latitude: number; longitude: number }) => void) => Unsubscribe;
type SerializeAndStore = (state: MessageResponse[], userId: string) => void;
type RetrieveAndDeserialize = (userId: string) => MessageResponse[];
type SerializeAndStore<SCG extends ExtendableGenerics> = (state: MessageResponse<SCG>[], userId: string) => void;
type RetrieveAndDeserialize<SCG extends ExtendableGenerics> = (userId: string) => MessageResponse<SCG>[];

export type LiveLocationManagerState = {
export type LiveLocationManagerState<SCG extends ExtendableGenerics> = {
ready: boolean;
targetMessages: MessageResponse[];
targetMessages: MessageResponse<SCG>[];
};

// if (message.cid && this.messagesByChannelConfId[message.cid]) {
Expand Down Expand Up @@ -87,39 +87,55 @@ export type LiveLocationManagerState = {
// }

function isValidLiveLocationAttachment(attachment?: Attachment) {
if (!attachment || typeof attachment.end_time !== 'string' || attachment.stopped_sharing) return false;
if (!attachment || attachment.type !== 'live_location' || attachment.stopped_sharing) {
return false;
}

// If end_time has been defined, consider it
if (typeof attachment.end_time === 'string') {
const endTimeTimestamp = new Date(attachment.end_time).getTime();

const endTimeTimestamp = new Date(attachment.end_time).getTime();
if (Number.isNaN(endTimeTimestamp)) return false;

if (Number.isNaN(endTimeTimestamp)) return false;
const nowTimestamp = Date.now();

const nowTimestamp = Date.now();
return nowTimestamp < endTimeTimestamp;
}

return attachment && attachment.type === 'live_location' && endTimeTimestamp > nowTimestamp;
return true;
}

function isValidLiveLocationMessage(message?: MessageResponse) {
if (!message || message.type === 'deleted') return false;

const [attachment] = message.attachments ?? [];

return isValidLiveLocationAttachment(attachment);
}

export type LiveLocationManagerConstructorParameters<SCG extends ExtendableGenerics> = {
client: StreamChat<SCG>;
watchLocation: WatchLocation;
retrieveAndDeserialize?: RetrieveAndDeserialize;
serializeAndStore?: SerializeAndStore;
retrieveAndDeserialize?: RetrieveAndDeserialize<SCG>;
serializeAndStore?: SerializeAndStore<SCG>;
};

const MIN_THROTTLE_TIMEOUT = 1000;
// Hard-coded minimal throttle timeout
const MIN_THROTTLE_TIMEOUT = 3000;

export class LiveLocationManager<SCG extends ExtendableGenerics> {
public state: StateStore<LiveLocationManagerState>;
public state: StateStore<LiveLocationManagerState<SCG>>;
private client: StreamChat<SCG>;
private unsubscribeFunctions: Set<() => void> = new Set();
private serializeAndStore: SerializeAndStore;
private serializeAndStore: SerializeAndStore<SCG>;
private watchLocation: WatchLocation;
private messagesByChannelConfIdGetterCache: {
calculated: { [key: string]: [MessageResponse, number] };
targetMessages: LiveLocationManagerState['targetMessages'];
targetMessages: LiveLocationManagerState<SCG>['targetMessages'];
};
private messagesByIdGetterCache: {
calculated: { [key: string]: [MessageResponse, number] };
targetMessages: LiveLocationManagerState['targetMessages'];
targetMessages: LiveLocationManagerState<SCG>['targetMessages'];
};

static symbol = Symbol(LiveLocationManager.name);
Expand All @@ -144,7 +160,7 @@ export class LiveLocationManager<SCG extends ExtendableGenerics> {

const retreivedTargetMessages = retrieveAndDeserialize(client.userID!);

this.state = new StateStore<LiveLocationManagerState>({
this.state = new StateStore<LiveLocationManagerState<SCG>>({
targetMessages: retreivedTargetMessages,
// If there are no messages to validate, the manager is considered "ready"
ready: retreivedTargetMessages.length === 0,
Expand Down Expand Up @@ -204,7 +220,7 @@ export class LiveLocationManager<SCG extends ExtendableGenerics> {
let unsubscribeWatchLocation: null | (() => void) = null;

// Subscribe to location updates only if there are relevant messages to
// update, no need for the location watcher to active/instantiated otherwise
// update, no need for the location watcher to be active/instantiated otherwise
const unsubscribe = this.state.subscribeWithSelector(
({ targetMessages }) => ({ targetMessages }),
({ targetMessages }) => {
Expand Down Expand Up @@ -239,39 +255,29 @@ export class LiveLocationManager<SCG extends ExtendableGenerics> {
nextWatcherCallTimestamp = Date.now() + MIN_THROTTLE_TIMEOUT;

withCancellation(LiveLocationManager.symbol, async () => {
const promises: Promise<void>[] = [];
const promises: Promise<UpdateMessageAPIResponse<SCG>>[] = [];
const { ready } = this.state.getLatestValue();

if (!ready) {
await this.recoverAndValidateMessages();
}

const { targetMessages } = this.state.getLatestValue();
// if validator removes messages, we need to check
// If validator removes messages, we need to check
if (!targetMessages.length) return;

for (const message of targetMessages) {
const [attachment] = message.attachments ?? [];

if (!isValidLiveLocationAttachment(attachment)) {
if (!isValidLiveLocationMessage(message)) {
this.unregisterMessage(message);
continue;
}

// TODO: client.updateLiveLocation instead
const promise = this.client
.partialUpdateMessage(message.id, {
// @ts-expect-error valid update
set: { attachments: [{ ...attachment, latitude, longitude }] },
})
// TODO: change this this
.then((v) => console.log(v));
const promise = this.client.updateLiveLocation(message, { latitude, longitude });

promises.push(promise);
}

const values = await Promise.allSettled(promises);
console.log(values);
await Promise.allSettled(promises);
// TODO: handle values (remove failed - based on specific error code), keep re-trying others
});
});
Expand All @@ -298,29 +304,25 @@ export class LiveLocationManager<SCG extends ExtendableGenerics> {
for (const result of response.results) {
const { message } = result;

const [attachment] = message.attachments ?? [];

if (isValidLiveLocationAttachment(attachment)) {
if (isValidLiveLocationMessage(message)) {
newTargetMessages.push(message);
}
}

this.state.partialNext({ ready: true, targetMessages: newTargetMessages });
}

private registerMessage(message: MessageResponse) {
private registerMessage(message: MessageResponse<SCG>) {
if (!this.client.userID || message?.user?.id !== this.client.userID) return;

const [attachment] = message.attachments ?? [];

if (!isValidLiveLocationAttachment(attachment)) {
if (!isValidLiveLocationMessage(message)) {
return;
}

this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] }));
}

private updateRegisteredMessage(message: MessageResponse) {
private updateRegisteredMessage(message: MessageResponse<SCG>) {
if (!this.client.userID || message?.user?.id !== this.client.userID) return;

const [, targetMessageIndex] = this.messagesById[message.id];
Expand Down Expand Up @@ -381,9 +383,7 @@ export class LiveLocationManager<SCG extends ExtendableGenerics> {

if (!localMessage) return;

const [attachment] = event.message.attachments ?? [];

if (!isValidLiveLocationAttachment(attachment)) {
if (!isValidLiveLocationMessage(event.message)) {
this.unregisterMessage(event.message);
} else {
this.updateRegisteredMessage(event.message);
Expand Down

0 comments on commit b3cfe1f

Please sign in to comment.