Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix presence leave call on used channels #425

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions dist/web/pubnub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2916,7 +2916,10 @@
message = 'Network issues';
}
else if (errorName === 'TypeError') {
category = StatusCategory$1.PNBadRequestCategory;
if (message.indexOf('Load failed') !== -1 || message.indexOf('Failed to fetch') != -1)
category = StatusCategory$1.PNNetworkIssuesCategory;
else
category = StatusCategory$1.PNBadRequestCategory;
}
else if (errorName === 'FetchError') {
const errorCode = error.code;
Expand Down Expand Up @@ -9892,15 +9895,23 @@
*/
subscribe(subscribeParameters) {
const timetoken = subscribeParameters === null || subscribeParameters === void 0 ? void 0 : subscribeParameters.timetoken;
this.pubnub.registerSubscribeCapable(this);
this.pubnub.subscribe(Object.assign({ channels: this.channelNames, channelGroups: this.groupNames }, (timetoken !== null && timetoken !== '' && { timetoken: timetoken })));
}
/**
* Stop real-time events processing.
*/
unsubscribe() {
this.pubnub.unregisterSubscribeCapable(this);
const { channels, channelGroups } = this.pubnub.getSubscribeCapableEntities();
// Identify channels and groups from which PubNub client can safely unsubscribe.
const filteredChannelGroups = this.groupNames.filter((cg) => !channelGroups.includes(cg));
const filteredChannels = this.channelNames.filter((ch) => !channels.includes(ch));
if (filteredChannels.length === 0 && filteredChannelGroups.length === 0)
return;
this.pubnub.unsubscribe({
channels: this.channelNames,
channelGroups: this.groupNames,
channels: filteredChannels,
channelGroups: filteredChannelGroups,
});
}
/**
Expand Down Expand Up @@ -12566,6 +12577,7 @@
// Prepare for real-time events announcement.
this.listenerManager = new ListenerManager();
this.eventEmitter = new EventEmitter(this.listenerManager);
this.subscribeCapable = new Set();
if (this._configuration.enableEventEngine) {
{
let heartbeatInterval = this._configuration.getHeartbeatInterval();
Expand Down Expand Up @@ -12998,7 +13010,9 @@
* @param [isOffline] - Whether `offline` presence should be notified or not.
*/
destroy(isOffline) {
var _a;
{
(_a = this.subscribeCapable) === null || _a === void 0 ? void 0 : _a.clear();
if (this.subscriptionManager) {
this.subscriptionManager.unsubscribeAll(isOffline);
this.subscriptionManager.disconnect();
Expand Down Expand Up @@ -13127,6 +13141,53 @@
}
return [];
}
/**
* Register subscribe capable object with active subscription.
*
* @param subscribeCapable - {@link Subscription} or {@link SubscriptionSet} object.
*
* @internal
*/
registerSubscribeCapable(subscribeCapable) {
{
if (!this.subscribeCapable || this.subscribeCapable.has(subscribeCapable))
return;
this.subscribeCapable.add(subscribeCapable);
}
}
/**
* Unregister subscribe capable object with inactive subscription.
*
* @param subscribeCapable - {@link Subscription} or {@link SubscriptionSet} object.
*
* @internal
*/
unregisterSubscribeCapable(subscribeCapable) {
{
if (!this.subscribeCapable || !this.subscribeCapable.has(subscribeCapable))
return;
this.subscribeCapable.delete(subscribeCapable);
}
}
/**
* Retrieve list of subscribe capable entities currently used in subscription.
*
* @returns Channels and channel groups currently used in subscription.
*
* @internal
*/
getSubscribeCapableEntities() {
{
const entities = { channels: [], channelGroups: [] };
if (!this.subscribeCapable)
return entities;
for (const subscribeCapable of this.subscribeCapable) {
entities.channelGroups.push(...subscribeCapable.channelGroups);
entities.channels.push(...subscribeCapable.channels);
}
return entities;
}
}
/**
* Subscribe to specified channels and groups real-time events.
*
Expand Down Expand Up @@ -13205,7 +13266,9 @@
* Unsubscribe from all channels and groups.
*/
unsubscribeAll() {
var _a;
{
(_a = this.subscribeCapable) === null || _a === void 0 ? void 0 : _a.clear();
if (this.subscriptionManager)
this.subscriptionManager.unsubscribeAll();
else if (this.eventEngine)
Expand Down
4 changes: 2 additions & 2 deletions dist/web/pubnub.min.js

Large diffs are not rendered by default.

60 changes: 60 additions & 0 deletions lib/core/pubnub-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ class PubNubCore {
// Prepare for real-time events announcement.
this.listenerManager = new listener_manager_1.ListenerManager();
this.eventEmitter = new eventEmitter_1.default(this.listenerManager);
this.subscribeCapable = new Set();
if (this._configuration.enableEventEngine) {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
let heartbeatInterval = this._configuration.getHeartbeatInterval();
Expand Down Expand Up @@ -609,7 +610,10 @@ class PubNubCore {
* @param [isOffline] - Whether `offline` presence should be notified or not.
*/
destroy(isOffline) {
var _a;
if (process.env.SUBSCRIBE_MODULE !== 'disabled') {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled')
(_a = this.subscribeCapable) === null || _a === void 0 ? void 0 : _a.clear();
if (this.subscriptionManager) {
this.subscriptionManager.unsubscribeAll(isOffline);
this.subscriptionManager.disconnect();
Expand Down Expand Up @@ -755,6 +759,59 @@ class PubNubCore {
throw new Error('Subscription error: subscription module disabled');
return [];
}
/**
* Register subscribe capable object with active subscription.
*
* @param subscribeCapable - {@link Subscription} or {@link SubscriptionSet} object.
*
* @internal
*/
registerSubscribeCapable(subscribeCapable) {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
if (!this.subscribeCapable || this.subscribeCapable.has(subscribeCapable))
return;
this.subscribeCapable.add(subscribeCapable);
}
else
throw new Error('Subscription error: subscription event engine module disabled');
}
/**
* Unregister subscribe capable object with inactive subscription.
*
* @param subscribeCapable - {@link Subscription} or {@link SubscriptionSet} object.
*
* @internal
*/
unregisterSubscribeCapable(subscribeCapable) {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
if (!this.subscribeCapable || !this.subscribeCapable.has(subscribeCapable))
return;
this.subscribeCapable.delete(subscribeCapable);
}
else
throw new Error('Subscription error: subscription event engine module disabled');
}
/**
* Retrieve list of subscribe capable entities currently used in subscription.
*
* @returns Channels and channel groups currently used in subscription.
*
* @internal
*/
getSubscribeCapableEntities() {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
const entities = { channels: [], channelGroups: [] };
if (!this.subscribeCapable)
return entities;
for (const subscribeCapable of this.subscribeCapable) {
entities.channelGroups.push(...subscribeCapable.channelGroups);
entities.channels.push(...subscribeCapable.channels);
}
return entities;
}
else
throw new Error('Subscription error: subscription event engine module disabled');
}
/**
* Subscribe to specified channels and groups real-time events.
*
Expand Down Expand Up @@ -841,7 +898,10 @@ class PubNubCore {
* Unsubscribe from all channels and groups.
*/
unsubscribeAll() {
var _a;
if (process.env.SUBSCRIBE_MODULE !== 'disabled') {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled')
(_a = this.subscribeCapable) === null || _a === void 0 ? void 0 : _a.clear();
if (this.subscriptionManager)
this.subscriptionManager.unsubscribeAll();
else if (this.eventEngine)
Expand Down
12 changes: 10 additions & 2 deletions lib/entities/SubscribeCapable.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,23 @@ class SubscribeCapable {
*/
subscribe(subscribeParameters) {
const timetoken = subscribeParameters === null || subscribeParameters === void 0 ? void 0 : subscribeParameters.timetoken;
this.pubnub.registerSubscribeCapable(this);
this.pubnub.subscribe(Object.assign({ channels: this.channelNames, channelGroups: this.groupNames }, (timetoken !== null && timetoken !== '' && { timetoken: timetoken })));
}
/**
* Stop real-time events processing.
*/
unsubscribe() {
this.pubnub.unregisterSubscribeCapable(this);
const { channels, channelGroups } = this.pubnub.getSubscribeCapableEntities();
// Identify channels and groups from which PubNub client can safely unsubscribe.
const filteredChannelGroups = this.groupNames.filter((cg) => !channelGroups.includes(cg));
const filteredChannels = this.channelNames.filter((ch) => !channels.includes(ch));
if (filteredChannels.length === 0 && filteredChannelGroups.length === 0)
return;
this.pubnub.unsubscribe({
channels: this.channelNames,
channelGroups: this.groupNames,
channels: filteredChannels,
channelGroups: filteredChannelGroups,
});
}
/**
Expand Down
5 changes: 4 additions & 1 deletion lib/errors/pubnub-api-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ class PubNubAPIError extends Error {
message = 'Network issues';
}
else if (errorName === 'TypeError') {
category = categories_1.default.PNBadRequestCategory;
if (message.indexOf('Load failed') !== -1 || message.indexOf('Failed to fetch') != -1)
category = categories_1.default.PNNetworkIssuesCategory;
else
category = categories_1.default.PNBadRequestCategory;
}
else if (errorName === 'FetchError') {
const errorCode = error.code;
Expand Down
66 changes: 66 additions & 0 deletions src/core/pubnub-common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ import { AuditRequest } from './endpoints/access_manager/audit';
import * as PAM from './types/api/access-manager';
// endregion
// region Entities
import { SubscribeCapable } from '../entities/SubscribeCapable';
import { SubscriptionOptions } from '../entities/commonTypes';
import { ChannelMetadata } from '../entities/ChannelMetadata';
import { SubscriptionSet } from '../entities/SubscriptionSet';
Expand Down Expand Up @@ -216,6 +217,16 @@ export class PubNubCore<
*/
private presenceEventEngine?: PresenceEventEngine;

/**
* List of subscribe capable objects with active subscriptions.
*
* Track list of {@link Subscription} and {@link SubscriptionSet} objects with active
* subscription.
*
* @internal
*/
private readonly subscribeCapable?: Set<SubscribeCapable>;

/**
* Subscription event engine.
*
Expand Down Expand Up @@ -345,6 +356,7 @@ export class PubNubCore<
// Prepare for real-time events announcement.
this.listenerManager = new ListenerManager();
this.eventEmitter = new EventEmitter(this.listenerManager);
this.subscribeCapable = new Set<SubscribeCapable>();

if (this._configuration.enableEventEngine) {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
Expand Down Expand Up @@ -863,6 +875,8 @@ export class PubNubCore<
*/
public destroy(isOffline?: boolean): void {
if (process.env.SUBSCRIBE_MODULE !== 'disabled') {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') this.subscribeCapable?.clear();

if (this.subscriptionManager) {
this.subscriptionManager.unsubscribeAll(isOffline);
this.subscriptionManager.disconnect();
Expand Down Expand Up @@ -1086,6 +1100,56 @@ export class PubNubCore<
return [];
}

/**
* Register subscribe capable object with active subscription.
*
* @param subscribeCapable - {@link Subscription} or {@link SubscriptionSet} object.
*
* @internal
*/
public registerSubscribeCapable(subscribeCapable: SubscribeCapable) {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
if (!this.subscribeCapable || this.subscribeCapable.has(subscribeCapable)) return;

this.subscribeCapable.add(subscribeCapable);
} else throw new Error('Subscription error: subscription event engine module disabled');
}

/**
* Unregister subscribe capable object with inactive subscription.
*
* @param subscribeCapable - {@link Subscription} or {@link SubscriptionSet} object.
*
* @internal
*/
public unregisterSubscribeCapable(subscribeCapable: SubscribeCapable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
if (!this.subscribeCapable || !this.subscribeCapable.has(subscribeCapable)) return;

this.subscribeCapable.delete(subscribeCapable);
} else throw new Error('Subscription error: subscription event engine module disabled');
}

/**
* Retrieve list of subscribe capable entities currently used in subscription.
*
* @returns Channels and channel groups currently used in subscription.
*
* @internal
*/
public getSubscribeCapableEntities(): { channels: string[]; channelGroups: string[] } {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
const entities: { channels: string[]; channelGroups: string[] } = { channels: [], channelGroups: [] };
if (!this.subscribeCapable) return entities;

for (const subscribeCapable of this.subscribeCapable) {
entities.channelGroups.push(...subscribeCapable.channelGroups);
entities.channels.push(...subscribeCapable.channels);
}
return entities;
} else throw new Error('Subscription error: subscription event engine module disabled');
}

/**
* Subscribe to specified channels and groups real-time events.
*
Expand Down Expand Up @@ -1182,6 +1246,8 @@ export class PubNubCore<
*/
public unsubscribeAll() {
if (process.env.SUBSCRIBE_MODULE !== 'disabled') {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') this.subscribeCapable?.clear();

if (this.subscriptionManager) this.subscriptionManager.unsubscribeAll();
else if (this.eventEngine) this.eventEngine.unsubscribeAll();
} else throw new Error('Unsubscription error: subscription module disabled');
Expand Down
14 changes: 12 additions & 2 deletions src/entities/SubscribeCapable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export abstract class SubscribeCapable {
*/
subscribe(subscribeParameters?: { timetoken?: string }) {
const timetoken = subscribeParameters?.timetoken;
this.pubnub.registerSubscribeCapable(this);
this.pubnub.subscribe({
channels: this.channelNames,
channelGroups: this.groupNames,
Expand All @@ -68,9 +69,18 @@ export abstract class SubscribeCapable {
* Stop real-time events processing.
*/
unsubscribe() {
this.pubnub.unregisterSubscribeCapable(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

const { channels, channelGroups } = this.pubnub.getSubscribeCapableEntities();

// Identify channels and groups from which PubNub client can safely unsubscribe.
const filteredChannelGroups = this.groupNames.filter((cg) => !channelGroups.includes(cg));
const filteredChannels = this.channelNames.filter((ch) => !channels.includes(ch));

if (filteredChannels.length === 0 && filteredChannelGroups.length === 0) return;

this.pubnub.unsubscribe({
channels: this.channelNames,
channelGroups: this.groupNames,
channels: filteredChannels,
channelGroups: filteredChannelGroups,
});
}

Expand Down
4 changes: 3 additions & 1 deletion src/errors/pubnub-api-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ export class PubNubAPIError extends Error {
category = StatusCategory.PNNetworkIssuesCategory;
message = 'Network issues';
} else if (errorName === 'TypeError') {
category = StatusCategory.PNBadRequestCategory;
if (message.indexOf('Load failed') !== -1 || message.indexOf('Failed to fetch') != -1)
category = StatusCategory.PNNetworkIssuesCategory;
else category = StatusCategory.PNBadRequestCategory;
} else if (errorName === 'FetchError') {
const errorCode = (error as Record<string, string>).code;

Expand Down
Loading
Loading