Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion dist/web/pubnub.js
Original file line number Diff line number Diff line change
Expand Up @@ -9566,6 +9566,11 @@
return subscriptionTimetokenFromReference(currentTimetoken, referenceTimetoken !== null && referenceTimetoken !== void 0 ? referenceTimetoken : '0');
}
subscribe({ channels, channelGroups, timetoken, withPresence, }) {
var _a;
// check if the channels and groups are already subscribed
const hasNewChannels = channels === null || channels === void 0 ? void 0 : channels.some((channel) => !this.channels.includes(channel));
const hasNewGroups = channelGroups === null || channelGroups === void 0 ? void 0 : channelGroups.some((group) => !this.groups.includes(group));
const hasNewSubscriptions = hasNewChannels || hasNewGroups;
this.channels = [...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])];
this.groups = [...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])];
if (withPresence) {
Expand All @@ -9576,7 +9581,29 @@
this.engine.transition(restore(Array.from(new Set([...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])])), Array.from(new Set([...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])])), timetoken));
}
else {
this.engine.transition(subscriptionChange(Array.from(new Set([...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])])), Array.from(new Set([...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])]))));
if (hasNewSubscriptions) {
this.engine.transition(subscriptionChange(Array.from(new Set([...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])])), Array.from(new Set([...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])]))));
}
else {
this.dependencies.config
.logger()
.debug('EventEngine', 'Skipping state transition - all channels/groups already subscribed. Emitting SubscriptionChanged event.');
// Get current timetoken from state context
const currentState = this.engine.currentState;
const currentContext = this.engine.currentContext;
let currentTimetoken = '0';
if ((currentState === null || currentState === void 0 ? void 0 : currentState.label) === ReceivingState.label && currentContext) {
const receivingContext = currentContext;
currentTimetoken = (_a = receivingContext.cursor) === null || _a === void 0 ? void 0 : _a.timetoken;
}
// Manually emit SubscriptionChanged status event
this.dependencies.emitStatus({
category: StatusCategory$1.PNSubscriptionChangedCategory,
affectedChannels: Array.from(new Set(this.channels.filter((c) => !c.endsWith('-pnpres')))),
affectedChannelGroups: Array.from(new Set(this.groups.filter((g) => !g.endsWith('-pnpres')))),
currentTimetoken,
});
}
}
if (this.dependencies.join) {
this.dependencies.join({
Expand Down
4 changes: 2 additions & 2 deletions dist/web/pubnub.min.js

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion lib/event-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ var __importStar = (this && this.__importStar) || (function () {
return result;
};
})();
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.EventEngine = void 0;
const receiving_1 = require("./states/receiving");
const dispatcher_1 = require("./dispatcher");
const utils_1 = require("../core/utils");
const unsubscribed_1 = require("./states/unsubscribed");
const core_1 = require("./core");
const categories_1 = __importDefault(require("../core/constants/categories"));
const utils = __importStar(require("../core/utils"));
const events = __importStar(require("./events"));
/**
Expand Down Expand Up @@ -88,6 +92,11 @@ class EventEngine {
return (0, utils_1.subscriptionTimetokenFromReference)(currentTimetoken, referenceTimetoken !== null && referenceTimetoken !== void 0 ? referenceTimetoken : '0');
}
subscribe({ channels, channelGroups, timetoken, withPresence, }) {
var _a;
// check if the channels and groups are already subscribed
const hasNewChannels = channels === null || channels === void 0 ? void 0 : channels.some((channel) => !this.channels.includes(channel));
const hasNewGroups = channelGroups === null || channelGroups === void 0 ? void 0 : channelGroups.some((group) => !this.groups.includes(group));
const hasNewSubscriptions = hasNewChannels || hasNewGroups;
this.channels = [...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])];
this.groups = [...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])];
if (withPresence) {
Expand All @@ -98,7 +107,29 @@ class EventEngine {
this.engine.transition(events.restore(Array.from(new Set([...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])])), Array.from(new Set([...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])])), timetoken));
}
else {
this.engine.transition(events.subscriptionChange(Array.from(new Set([...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])])), Array.from(new Set([...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])]))));
if (hasNewSubscriptions) {
this.engine.transition(events.subscriptionChange(Array.from(new Set([...this.channels, ...(channels !== null && channels !== void 0 ? channels : [])])), Array.from(new Set([...this.groups, ...(channelGroups !== null && channelGroups !== void 0 ? channelGroups : [])]))));
}
else {
this.dependencies.config
.logger()
.debug('EventEngine', 'Skipping state transition - all channels/groups already subscribed. Emitting SubscriptionChanged event.');
// Get current timetoken from state context
const currentState = this.engine.currentState;
const currentContext = this.engine.currentContext;
let currentTimetoken = '0';
if ((currentState === null || currentState === void 0 ? void 0 : currentState.label) === receiving_1.ReceivingState.label && currentContext) {
const receivingContext = currentContext;
currentTimetoken = (_a = receivingContext.cursor) === null || _a === void 0 ? void 0 : _a.timetoken;
}
// Manually emit SubscriptionChanged status event
this.dependencies.emitStatus({
category: categories_1.default.PNSubscriptionChangedCategory,
affectedChannels: Array.from(new Set(this.channels.filter((c) => !c.endsWith('-pnpres')))),
affectedChannelGroups: Array.from(new Set(this.groups.filter((g) => !g.endsWith('-pnpres')))),
currentTimetoken,
});
}
}
if (this.dependencies.join) {
this.dependencies.join({
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 38 additions & 6 deletions src/event-engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Dependencies, EventEngineDispatcher } from './dispatcher';
import { subscriptionTimetokenFromReference } from '../core/utils';
import { UnsubscribedState } from './states/unsubscribed';
import { Dispatcher, Engine } from './core';
import categoryConstants from '../core/constants/categories';
import * as utils from '../core/utils';
import * as effects from './effects';
import * as events from './events';
Expand Down Expand Up @@ -79,6 +80,11 @@ export class EventEngine {
timetoken?: string | number;
withPresence?: boolean;
}): void {
// check if the channels and groups are already subscribed
const hasNewChannels = channels?.some((channel) => !this.channels.includes(channel));
const hasNewGroups = channelGroups?.some((group) => !this.groups.includes(group));
const hasNewSubscriptions = hasNewChannels || hasNewGroups;

this.channels = [...this.channels, ...(channels ?? [])];
this.groups = [...this.groups, ...(channelGroups ?? [])];

Expand All @@ -95,12 +101,38 @@ export class EventEngine {
),
);
} else {
this.engine.transition(
events.subscriptionChange(
Array.from(new Set([...this.channels, ...(channels ?? [])])),
Array.from(new Set([...this.groups, ...(channelGroups ?? [])])),
),
);
if (hasNewSubscriptions) {
this.engine.transition(
events.subscriptionChange(
Array.from(new Set([...this.channels, ...(channels ?? [])])),
Array.from(new Set([...this.groups, ...(channelGroups ?? [])])),
),
);
} else {
this.dependencies.config
.logger()
.debug(
'EventEngine',
'Skipping state transition - all channels/groups already subscribed. Emitting SubscriptionChanged event.',
);
// Get current timetoken from state context
const currentState = this.engine.currentState;
const currentContext = this.engine.currentContext;
let currentTimetoken: string | undefined = '0';

if (currentState?.label === ReceivingState.label && currentContext) {
const receivingContext = currentContext as ReceivingStateContext;
currentTimetoken = receivingContext.cursor?.timetoken;
}

// Manually emit SubscriptionChanged status event
this.dependencies.emitStatus({
category: categoryConstants.PNSubscriptionChangedCategory,
affectedChannels: Array.from(new Set(this.channels.filter((c) => !c.endsWith('-pnpres')))),
affectedChannelGroups: Array.from(new Set(this.groups.filter((g) => !g.endsWith('-pnpres')))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohitpubnub is there a reason to filter out presence channels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think filter is not needed. I tested it now, we do not filter out presence channels from status event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating.

currentTimetoken,
});
}
}
if (this.dependencies.join) {
this.dependencies.join({
Expand Down
129 changes: 129 additions & 0 deletions test/unit/event_engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,135 @@ describe('EventEngine', () => {
await forState('UNSUBSCRIBED', 1000);
});

it('should not trigger state transition when subscribing to already subscribed channels', async () => {
utils.createPresenceMockScopes({
subKey: 'demo',
presenceType: 'heartbeat',
requests: [{ channels: ['ch1'] }],
});
utils.createPresenceMockScopes({
subKey: 'demo',
presenceType: 'leave',
requests: [{ channels: ['ch1'] }],
});
utils.createSubscribeMockScopes({
subKey: 'demo',
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
userId: 'test-js',
eventEngine: true,
requests: [
{ channels: ['ch1'], messages: [] },
{ channels: ['ch1'], messages: [], replyDelay: 500 }, // Long poll with shorter delay
],
});

let subscriptionChangeTransitionCount = 0;
let subscriptionChangedCount = 0;

engine.subscribe((change: { type: string; toState: { label: string }; event?: { type: string } }) => {
if (change.type === 'transitionDone' && change.event?.type === 'SUBSCRIPTION_CHANGED') {
subscriptionChangeTransitionCount++;
stateChanges.push(change);
}
});

pubnub.addListener({
status: (statusEvent) => {
receivedStatuses.push(statusEvent);
if (statusEvent.category === StatusCategory.PNSubscriptionChangedCategory) {
subscriptionChangedCount++;
}
},
});

// First subscribe to 'ch1'
pubnub.subscribe({ channels: ['ch1'] });

await forState('RECEIVING', 1000);

const subscriptionChangeTransitionsBeforeResubscribe = subscriptionChangeTransitionCount;

// Subscribe to 'ch1' again (already subscribed)
pubnub.subscribe({ channels: ['ch1'] });

// Wait a bit to ensure no state transition occurs
await new Promise((resolve) => setTimeout(resolve, 200));

// Verify no additional subscriptionChange state transition occurred
if (subscriptionChangeTransitionCount !== subscriptionChangeTransitionsBeforeResubscribe) {
throw new Error(
`Expected no subscriptionChange transition, but got ${subscriptionChangeTransitionCount - subscriptionChangeTransitionsBeforeResubscribe} transitions`,
);
}

// Verify SubscriptionChanged event was emitted for the re-subscribe
if (subscriptionChangedCount < 1) {
throw new Error(`Expected at least 1 SubscriptionChanged event from re-subscribe, but got ${subscriptionChangedCount}`);
}

// Verify we're still in RECEIVING state (long poll not aborted)
if (engine.currentState?.label !== 'RECEIVING') {
throw new Error(`Expected state to be RECEIVING, but got ${engine.currentState?.label}`);
}

// Test passed! Clean up (let afterEach handle full cleanup)
pubnub.unsubscribe({ channels: ['ch1'] });
});

it('should trigger state transition when subscribing to new channel alongside existing channel', async () => {
utils.createPresenceMockScopes({
subKey: 'demo',
presenceType: 'heartbeat',
requests: [{ channels: ['ch1'] }, { channels: ['ch1', 'ch2'] }],
});
utils.createPresenceMockScopes({
subKey: 'demo',
presenceType: 'leave',
requests: [{ channels: ['ch1', 'ch2'] }],
});
utils.createSubscribeMockScopes({
subKey: 'demo',
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
userId: 'test-js',
eventEngine: true,
requests: [
{ channels: ['ch1'], messages: [] },
{ channels: ['ch1', 'ch2'], messages: [] },
{ channels: ['ch1', 'ch2'], messages: [], replyDelay: 1000 },
],
});

let receivingStateCount = 0;

engine.subscribe((change: { type: string; toState: { label: string } }) => {
if (change.type === 'transitionDone' && change.toState.label === 'RECEIVING') {
receivingStateCount++;
}
stateChanges.push(change);
});

pubnub.addListener({ status: (statusEvent) => receivedStatuses.push(statusEvent) });

// First subscribe to 'ch1'
pubnub.subscribe({ channels: ['ch1'] });

await forState('RECEIVING', 1000);

// Subscribe to 'ch2' (new channel) - should trigger state transition
pubnub.subscribe({ channels: ['ch2'] });

await forStatus(StatusCategory.PNSubscriptionChangedCategory, 1000);

// Verify state transition occurred (should have at least 2 RECEIVING states)
if (receivingStateCount < 2) {
throw new Error(`Expected at least 2 RECEIVING state transitions, but got ${receivingStateCount}`);
}

// Clean up
pubnub.unsubscribe({ channels: ['ch1', 'ch2'] });
await forState('UNSUBSCRIBED', 1000);
});

// TODO: retry with configuration
// it('should retry correctly', async () => {
// utils.createNock().get('/v2/subscribe/demo/test/0').query(true).reply(200, '{"t":{"t":"12345","r":1}, "m": []}');
Expand Down