Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve websocket error handling #1563

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
"@protobuf-ts/twirp-transport": "^2.9.4",
"@types/ws": "^8.5.7",
"axios": "^1.6.0",
"base64-js": "^1.5.1",
"isomorphic-ws": "^5.0.0",
"rxjs": "~7.8.1",
"sdp-transform": "^2.14.1",
Expand Down
52 changes: 0 additions & 52 deletions packages/client/src/coordinator/connection/base64.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,3 @@
import { fromByteArray } from 'base64-js';

function isString<T>(arrayOrString: string | T[]): arrayOrString is string {
return typeof (arrayOrString as string) === 'string';
}

type MapGenericCallback<T, U> = (value: T, index: number, array: T[]) => U;
type MapStringCallback<U> = (value: string, index: number, string: string) => U;

function isMapStringCallback<T, U>(
arrayOrString: string | T[],
callback: MapGenericCallback<T, U> | MapStringCallback<U>,
): callback is MapStringCallback<U> {
return !!callback && isString(arrayOrString);
}

// source - https://github.com/beatgammit/base64-js/blob/master/test/convert.js#L72
function map<T, U>(array: T[], callback: MapGenericCallback<T, U>): U[];
function map<U>(string: string, callback: MapStringCallback<U>): U[];
function map<T, U>(
arrayOrString: string | T[],
callback: MapGenericCallback<T, U> | MapStringCallback<U>,
): U[] {
const res = [];

if (isString(arrayOrString) && isMapStringCallback(arrayOrString, callback)) {
for (let k = 0, len = arrayOrString.length; k < len; k++) {
if (arrayOrString.charAt(k)) {
const kValue = arrayOrString.charAt(k);
const mappedValue = callback(kValue, k, arrayOrString);
res[k] = mappedValue;
}
}
} else if (
!isString(arrayOrString) &&
!isMapStringCallback(arrayOrString, callback)
) {
for (let k = 0, len = arrayOrString.length; k < len; k++) {
if (k in arrayOrString) {
const kValue = arrayOrString[k];
const mappedValue = callback(kValue, k, arrayOrString);
res[k] = mappedValue;
}
}
}

return res;
}

export const encodeBase64 = (data: string): string =>
fromByteArray(new Uint8Array(map(data, (char) => char.charCodeAt(0))));

// base-64 decoder throws exception if encoded string is not padded by '=' to make string length
// in multiples of 4. So gonna use our own method for this purpose to keep backwards compatibility
// https://github.com/beatgammit/base64-js/blob/master/index.js#L26
Expand Down
142 changes: 10 additions & 132 deletions packages/client/src/coordinator/connection/client.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
import axios, {
AxiosError,
AxiosHeaders,
AxiosInstance,
AxiosRequestConfig,
AxiosResponse,
} from 'axios';
import https from 'https';
import { StableWSConnection } from './connection';
import { DevToken } from './signing';
import { TokenManager } from './token_manager';
import { WSConnectionFallback } from './connection_fallback';
import { isErrorResponse, isWSFailure } from './errors';
import {
addConnectionEventListeners,
isErrorResponse,
isFunction,
isOnline,
KnownCodes,
randomId,
removeConnectionEventListeners,
retryInterval,
sleep,
} from './utils';

import {
AllClientEvents,
AllClientEventTypes,
Expand All @@ -36,7 +31,6 @@ import {
User,
UserWithId,
} from './types';
import { InsightMetrics, postInsights } from './insights';
import { getLocationHint } from './location';
import { CreateGuestRequest, CreateGuestResponse } from '../../gen/coordinator';

Expand Down Expand Up @@ -66,11 +60,8 @@ export class StreamClient {
userID?: string;
wsBaseURL?: string;
wsConnection: StableWSConnection | null;
wsFallback?: WSConnectionFallback;
wsPromise: ConnectAPIResponse | null;
consecutiveFailures: number;
insightMetrics: InsightMetrics;
defaultWSTimeoutWithFallback: number;
defaultWSTimeout: number;
resolveConnectionId?: Function;
rejectConnectionId?: Function;
Expand Down Expand Up @@ -117,7 +108,6 @@ export class StreamClient {
this.options = {
timeout: 5000,
withCredentials: false, // making sure cookies are not sent
warmUp: false,
...inputOptions,
};

Expand All @@ -132,22 +122,6 @@ export class StreamClient {
this.options.baseURL || 'https://video.stream-io-api.com/video',
);

if (
typeof process !== 'undefined' &&
'env' in process &&
process.env.STREAM_LOCAL_TEST_RUN
) {
this.setBaseURL('http://localhost:3030/video');
}

if (
typeof process !== 'undefined' &&
'env' in process &&
process.env.STREAM_LOCAL_TEST_HOST
) {
this.setBaseURL(`http://${process.env.STREAM_LOCAL_TEST_HOST}/video`);
}

this.axiosInstance = axios.create({
...this.options,
baseURL: this.baseURL,
Expand All @@ -167,20 +141,14 @@ export class StreamClient {
// generated from secret.
this.tokenManager = new TokenManager(this.secret);
this.consecutiveFailures = 0;
this.insightMetrics = new InsightMetrics();

this.defaultWSTimeoutWithFallback = 6000;
this.defaultWSTimeout = 15000;

this.logger = isFunction(inputOptions.logger)
? inputOptions.logger
: () => null;
}

devToken = (userID: string) => {
return DevToken(userID);
};

getAuthType = () => {
return this.anonymous ? 'anonymous' : 'jwt';
};
Expand All @@ -207,8 +175,7 @@ export class StreamClient {
return hint;
};

_getConnectionID = () =>
this.wsConnection?.connectionID || this.wsFallback?.connectionID;
_getConnectionID = () => this.wsConnection?.connectionID;

_hasConnectionID = () => Boolean(this._getConnectionID());

Expand Down Expand Up @@ -323,11 +290,7 @@ export class StreamClient {
* https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
*/
closeConnection = async (timeout?: number) => {
await Promise.all([
this.wsConnection?.disconnect(timeout),
this.wsFallback?.disconnect(timeout),
]);
return Promise.resolve();
await this.wsConnection?.disconnect(timeout);
};

/**
Expand All @@ -348,10 +311,7 @@ export class StreamClient {
return this.wsPromise;
}

if (
(this.wsConnection?.isHealthy || this.wsFallback?.isHealthy()) &&
this._hasConnectionID()
) {
if (this.wsConnection?.isHealthy && this._hasConnectionID()) {
this.logger(
'info',
'client:openConnection() - openConnection called twice, healthy connection already exists',
Expand Down Expand Up @@ -686,89 +646,20 @@ export class StreamClient {
'Call connectUser or connectAnonymousUser before starting the connection',
);
}
if (!this.wsBaseURL) {
throw Error('Websocket base url not set');
}
if (!this.clientID) {
throw Error('clientID is not set');
}
if (!this.wsBaseURL) throw Error('Websocket base url not set');
if (!this.clientID) throw Error('clientID is not set');

if (
!this.wsConnection &&
(this.options.warmUp || this.options.enableInsights)
) {
this._sayHi();
}
// The StableWSConnection handles all the reconnection logic.
if (this.options.wsConnection && this.node) {
// Intentionally avoiding adding ts generics on wsConnection in options since its only useful for unit test purpose.
(this.options.wsConnection as unknown as StableWSConnection).setClient(
this,
);
this.wsConnection = this.options
.wsConnection as unknown as StableWSConnection;
this.options.wsConnection.setClient(this);
this.wsConnection = this.options.wsConnection;
} else {
this.wsConnection = new StableWSConnection(this);
}

try {
// if fallback is used before, continue using it instead of waiting for WS to fail
if (this.wsFallback) {
return await this.wsFallback.connect();
}
this.logger('info', 'StreamClient.connect: this.wsConnection.connect()');
// if WSFallback is enabled, ws connect should timeout faster so fallback can try
return await this.wsConnection.connect(
this.options.enableWSFallback
? this.defaultWSTimeoutWithFallback
: this.defaultWSTimeout,
);
} catch (err) {
// run fallback only if it's WS/Network error and not a normal API error
// make sure browser is online before even trying the longpoll
if (
this.options.enableWSFallback &&
// @ts-ignore
isWSFailure(err) &&
isOnline(this.logger)
) {
this.logger(
'warn',
'client:connect() - WS failed, fallback to longpoll',
);
this.dispatchEvent({ type: 'transport.changed', mode: 'longpoll' });

this.wsConnection._destroyCurrentWSConnection();
this.wsConnection.disconnect().then(); // close WS so no retry
this.wsFallback = new WSConnectionFallback(this);
return await this.wsFallback.connect();
}

throw err;
}
};

/**
* Check the connectivity with server for warmup purpose.
*
* @private
*/
_sayHi = () => {
const client_request_id = randomId();
const opts = {
headers: AxiosHeaders.from({
'x-client-request-id': client_request_id,
}),
};
this.doAxiosRequest('get', this.baseURL + '/hi', null, opts).catch((e) => {
if (this.options.enableInsights) {
postInsights('http_hi_failed', {
api_key: this.key,
err: e,
client_request_id,
});
}
});
this.logger('info', 'StreamClient.connect: this.wsConnection.connect()');
return await this.wsConnection.connect(this.defaultWSTimeout);
};

getUserAgent = () => {
Expand Down Expand Up @@ -837,19 +728,6 @@ export class StreamClient {
return this.tokenManager.getToken();
};

/**
* encode ws url payload
* @private
* @returns json string
*/
_buildWSPayload = (client_request_id?: string) => {
return JSON.stringify({
user_id: this.userID,
user_details: this._user,
client_request_id,
});
};

updateNetworkConnectionStatus = (
event: { type: 'online' | 'offline' } | Event,
) => {
Expand Down
Loading
Loading