Skip to content

Commit

Permalink
Merge branch 'conditional-style-performance' of https://github.com/na…
Browse files Browse the repository at this point in the history
…sa/openmct into conditional-style-performance

mergin master
  • Loading branch information
jvigliotta committed Oct 1, 2024
2 parents 3fd6d78 + 74f59e1 commit 47e7ed7
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 199 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*****************************************************************************
* Open MCT, Copyright (c) 2014-2023, United States Government
* as represented by the Administrator of the National Aeronautics and Space
* Administration. All rights reserved.
*
* Open MCT is licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* Open MCT includes source code licensed under additional open source
* licenses. See the Open Source Licenses file (LICENSES.md) included with
* this source code distribution or the Licensing information page available
* at runtime from the About dialog for additional information.
*****************************************************************************/
import { expect, test } from '../../../../pluginFixtures.js';

test.describe('The performance indicator', () => {
test.beforeEach(async ({ page }) => {
await page.goto('./', { waitUntil: 'domcontentloaded' });
await page.evaluate(() => {
const openmct = window.openmct;
openmct.install(openmct.plugins.PerformanceIndicator());
});
});

test('can be installed', ({ page }) => {
const performanceIndicator = page.getByTitle('Performance Indicator');
expect(performanceIndicator).toBeDefined();
});

test('Shows a numerical FPS value', async ({ page }) => {
// Frames Per Second. We need to wait at least 1 second to get a value.
// eslint-disable-next-line playwright/no-wait-for-timeout
await page.waitForTimeout(1000);
await expect(page.getByTitle('Performance Indicator')).toHaveText(/\d\d? fps/);
});

test('Supports showing optional extended performance information in an overlay for debugging', async ({
page
}) => {
const performanceMeasurementLabel = 'Some measurement';
const performanceMeasurementValue = 'Some value';

await page.evaluate(
({ performanceMeasurementLabel: label, performanceMeasurementValue: value }) => {
const openmct = window.openmct;
openmct.performance.measurements.set(label, value);
},
{ performanceMeasurementLabel, performanceMeasurementValue }
);
const performanceIndicator = page.getByTitle('Performance Indicator');
await performanceIndicator.click();
//Performance overlay is a crude debugging tool, it's evaluated once per second.
// eslint-disable-next-line playwright/no-wait-for-timeout
await page.waitForTimeout(1000);
const performanceOverlay = page.getByTitle('Performance Overlay');
await expect(performanceOverlay).toBeVisible();
await expect(performanceOverlay).toHaveText(new RegExp(`${performanceMeasurementLabel}.*`));
await expect(performanceOverlay).toHaveText(new RegExp(`.*${performanceMeasurementValue}`));

//Confirm that it disappears if we click on it again.
await performanceIndicator.click();
await expect(performanceOverlay).toBeHidden();
});
});
193 changes: 107 additions & 86 deletions src/api/telemetry/BatchingWebSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,46 @@
* at runtime from the About dialog for additional information.
*****************************************************************************/
import installWorker from './WebSocketWorker.js';

/**
* Describes the strategy to be used when batching WebSocket messages
* @typedef RequestIdleCallbackOptions
* @prop {Number} timeout If the number of milliseconds represented by this
* parameter has elapsed and the callback has not already been called, invoke
* the callback.
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
*/

/**
* Mocks requestIdleCallback for Safari using setTimeout. Functionality will be
* identical to setTimeout in Safari, which is to fire the callback function
* after the provided timeout period.
*
* In browsers that support requestIdleCallback, this const is just a
* pointer to the native function.
*
* @param {Function} callback a callback to be invoked during the next idle period, or
* after the specified timeout
* @param {RequestIdleCallbackOptions} options
* @see https://developer.mozilla.org/en-US/docs/Web/API/Window/requestIdleCallback
*
* @typedef BatchingStrategy
* @property {Function} shouldBatchMessage a function that accepts a single
* argument - the raw message received from the websocket. Every message
* received will be evaluated against this function so it should be performant.
* Note also that this function is executed in a worker, so it must be
* completely self-contained with no external dependencies. The function
* should return `true` if the message should be batched, and `false` if not.
* @property {Function} getBatchIdFromMessage a function that accepts a
* single argument - the raw message received from the websocket. Only messages
* where `shouldBatchMessage` has evaluated to true will be passed into this
* function. The function should return a unique value on which to batch the
* messages. For example a telemetry, channel, or parameter identifier.
*/
function requestIdleCallbackPolyfill(callback, options) {
return (
// eslint-disable-next-line compat/compat
window.requestIdleCallback ??
((fn, { timeout }) =>
setTimeout(() => {
fn({ didTimeout: false });
}, timeout))
);
}
const requestIdleCallback = requestIdleCallbackPolyfill();

const ONE_SECOND = 1000;

/**
* Provides a reliable and convenient WebSocket abstraction layer that handles
* a lot of boilerplate common to managing WebSocket connections such as:
* Provides a WebSocket abstraction layer that handles a lot of boilerplate common
* to managing WebSocket connections such as:
* - Establishing a WebSocket connection to a server
* - Reconnecting on error, with a fallback strategy
* - Queuing messages so that clients can send messages without concern for the current
Expand All @@ -49,22 +70,19 @@ import installWorker from './WebSocketWorker.js';
* and batching of messages without blocking either the UI or server.
*
*/
// Shim for Internet Explorer, I mean Safari. It doesn't support requestIdleCallback, but it's in a tech preview, so it will be dropping soon.
const requestIdleCallback =
// eslint-disable-next-line compat/compat
window.requestIdleCallback ?? ((fn, { timeout }) => setTimeout(fn, timeout));
const ONE_SECOND = 1000;
const FIVE_SECONDS = 5 * ONE_SECOND;

class BatchingWebSocket extends EventTarget {
#worker;
#openmct;
#showingRateLimitNotification;
#maxBatchSize;
#applicationIsInitializing;
#maxBatchWait;
#maxBufferSize;
#throttleRate;
#firstBatchReceived;
#lastBatchReceived;
#peakBufferSize = Number.NEGATIVE_INFINITY;

/**
* @param {import('openmct.js').OpenMCT} openmct
*/
constructor(openmct) {
super();
// Install worker, register listeners etc.
Expand All @@ -74,9 +92,8 @@ class BatchingWebSocket extends EventTarget {
this.#worker = new Worker(workerUrl);
this.#openmct = openmct;
this.#showingRateLimitNotification = false;
this.#maxBatchSize = Number.POSITIVE_INFINITY;
this.#maxBatchWait = ONE_SECOND;
this.#applicationIsInitializing = true;
this.#maxBufferSize = Number.POSITIVE_INFINITY;
this.#throttleRate = ONE_SECOND;
this.#firstBatchReceived = false;

const routeMessageToHandler = this.#routeMessageToHandler.bind(this);
Expand All @@ -89,20 +106,6 @@ class BatchingWebSocket extends EventTarget {
},
{ once: true }
);

openmct.once('start', () => {
// An idle callback is a pretty good indication that a complex display is done loading. At that point set the batch size more conservatively.
// Force it after 5 seconds if it hasn't happened yet.
requestIdleCallback(
() => {
this.#applicationIsInitializing = false;
this.setMaxBatchSize(this.#maxBatchSize);
},
{
timeout: FIVE_SECONDS
}
);
});
}

/**
Expand Down Expand Up @@ -137,57 +140,48 @@ class BatchingWebSocket extends EventTarget {
}

/**
* Set the strategy used to both decide which raw messages to batch, and how to group
* them.
* @param {BatchingStrategy} strategy The batching strategy to use when evaluating
* raw messages from the WebSocket.
*/
setBatchingStrategy(strategy) {
const serializedStrategy = {
shouldBatchMessage: strategy.shouldBatchMessage.toString(),
getBatchIdFromMessage: strategy.getBatchIdFromMessage.toString()
};

this.#worker.postMessage({
type: 'setBatchingStrategy',
serializedStrategy
});
}

/**
* @param {number} maxBatchSize the maximum length of a batch of messages. For example,
* the maximum number of telemetry values to batch before dropping them
* @param {number} maxBufferSize the maximum length of the receive buffer in characters.
* Note that this is a fail-safe that is only invoked if performance drops to the
* point where Open MCT cannot keep up with the amount of telemetry it is receiving.
* In this event it will sacrifice the oldest telemetry in the batch in favor of the
* most recent telemetry. The user will be informed that telemetry has been dropped.
*
* This should be set appropriately for the expected data rate. eg. If telemetry
* is received at 10Hz for each telemetry point, then a minimal combination of batch
* size and rate is 10 and 1000 respectively. Ideally you would add some margin, so
* 15 would probably be a better batch size.
* This should be set appropriately for the expected data rate. eg. If typical usage
* sees 2000 messages arriving at a client per second, with an average message size
* of 500 bytes, then 2000 * 500 = 1000000 characters will be right on the limit.
* In this scenario, a buffer size of 1500000 character might be more appropriate
* to allow some overhead for bursty telemetry, and temporary UI load during page
* load.
*
* The PerformanceIndicator plugin (openmct.plugins.PerformanceIndicator) gives
* statistics on buffer utilization. It can be used to scale the buffer appropriately.
*/
setMaxBatchSize(maxBatchSize) {
this.#maxBatchSize = maxBatchSize;
if (!this.#applicationIsInitializing) {
this.#sendMaxBatchSizeToWorker(this.#maxBatchSize);
}
setMaxBufferSize(maxBatchSize) {
this.#maxBufferSize = maxBatchSize;
this.#sendMaxBufferSizeToWorker(this.#maxBufferSize);
}
setMaxBatchWait(wait) {
this.#maxBatchWait = wait;
this.#sendBatchWaitToWorker(this.#maxBatchWait);
setThrottleRate(throttleRate) {
this.#throttleRate = throttleRate;
this.#sendThrottleRateToWorker(this.#throttleRate);
}
#sendMaxBatchSizeToWorker(maxBatchSize) {
setThrottleMessagePattern(throttleMessagePattern) {
this.#worker.postMessage({
type: 'setThrottleMessagePattern',
throttleMessagePattern
});
}

#sendMaxBufferSizeToWorker(maxBufferSize) {
this.#worker.postMessage({
type: 'setMaxBatchSize',
maxBatchSize
type: 'setMaxBufferSize',
maxBufferSize
});
}

#sendBatchWaitToWorker(maxBatchWait) {
#sendThrottleRateToWorker(throttleRate) {
this.#worker.postMessage({
type: 'setMaxBatchWait',
maxBatchWait
type: 'setThrottleRate',
throttleRate
});
}

Expand All @@ -203,9 +197,38 @@ class BatchingWebSocket extends EventTarget {

#routeMessageToHandler(message) {
if (message.data.type === 'batch') {
this.start = Date.now();
const batch = message.data.batch;
if (batch.dropped === true && !this.#showingRateLimitNotification) {
const now = performance.now();

let currentBufferLength = message.data.currentBufferLength;
let maxBufferSize = message.data.maxBufferSize;
let parameterCount = batch.length;
if (this.#peakBufferSize < currentBufferLength) {
this.#peakBufferSize = currentBufferLength;
}

if (this.#openmct.performance !== undefined) {
if (!isNaN(this.#lastBatchReceived)) {
const elapsed = (now - this.#lastBatchReceived) / 1000;
this.#lastBatchReceived = now;
this.#openmct.performance.measurements.set(
'Parameters/s',
Math.floor(parameterCount / elapsed)
);
}
this.#openmct.performance.measurements.set(
'Buff. Util. (bytes)',
`${currentBufferLength} / ${maxBufferSize}`
);
this.#openmct.performance.measurements.set(
'Peak Buff. Util. (bytes)',
`${this.#peakBufferSize} / ${maxBufferSize}`
);
}

this.start = Date.now();
const dropped = message.data.dropped;
if (dropped === true && !this.#showingRateLimitNotification) {
const notification = this.#openmct.notifications.alert(
'Telemetry dropped due to client rate limiting.',
{ hint: 'Refresh individual telemetry views to retrieve dropped telemetry if needed.' }
Expand Down Expand Up @@ -240,18 +263,16 @@ class BatchingWebSocket extends EventTarget {
console.warn(`Event loop is too busy to process batch.`);
this.#waitUntilIdleAndRequestNextBatch(batch);
} else {
// After ingesting a telemetry batch, wait until the event loop is idle again before
// informing the worker we are ready for another batch.
this.#readyForNextBatch();
}
} else {
if (waitedFor > ONE_SECOND) {
if (waitedFor > this.#throttleRate) {
console.warn(`Warning, batch processing took ${waitedFor}ms`);
}
this.#readyForNextBatch();
}
},
{ timeout: ONE_SECOND }
{ timeout: this.#throttleRate }
);
}
}
Expand Down
Loading

0 comments on commit 47e7ed7

Please sign in to comment.