diff --git a/test/integration/node-specific/client_close.test.ts b/test/integration/node-specific/client_close.test.ts index 7f9a3f11a3..0743cd211d 100644 --- a/test/integration/node-specific/client_close.test.ts +++ b/test/integration/node-specific/client_close.test.ts @@ -75,7 +75,32 @@ describe.skip('MongoClient.close() Integration', () => { describe('Topology', () => { describe('Node.js resource: Server Selection Timer', () => { describe('after a Topology is created through client.connect()', () => { - it.skip('server selection timers are cleaned up by client.close()', async () => {}); + const metadata: MongoDBMetadataUI = { requires: { topology: 'replicaset' } }; + + it('server selection timers are cleaned up by client.close()', metadata, async () => { + const run = async function ({ MongoClient, uri, expect, sleep, mongodb, getTimerCount }) { + const serverSelectionTimeoutMS = 2222; + const client = new MongoClient(uri, { + minPoolSize: 1, + serverSelectionTimeoutMS, + readPreference: new mongodb.ReadPreference('secondary', [ + { something: 'that does not exist' } + ]) + }); + const insertPromise = client.db('db').collection('collection').insertOne({ x: 1 }); + + // don't allow entire server selection timer to elapse to ensure close is called mid-timeout + await sleep(serverSelectionTimeoutMS / 2); + + expect(getTimerCount()).to.not.equal(0); + await client.close(); + expect(getTimerCount()).to.equal(0); + + const err = await insertPromise.catch(e => e); + expect(err).to.be.instanceOf(mongodb.MongoTopologyClosedError); + }; + await runScriptAndGetProcessInfo('timer-server-selection', config, run); + }); }); }); @@ -91,34 +116,179 @@ describe.skip('MongoClient.close() Integration', () => { describe('MonitorInterval', () => { describe('Node.js resource: Timer', () => { describe('after a new monitor is made', () => { - it.skip('monitor interval timer is cleaned up by client.close()', async () => {}); + it( + 'monitor interval timer is cleaned up by client.close()', + metadata, + async function () { + const run = async function ({ MongoClient, uri, expect, getTimerCount, once }) { + const heartbeatFrequencyMS = 2000; + const client = new MongoClient(uri, { heartbeatFrequencyMS }); + const willBeHeartbeatSucceeded = once(client, 'serverHeartbeatSucceeded'); + await client.connect(); + await willBeHeartbeatSucceeded; + + function monitorTimersExist(servers) { + for (const [, server] of servers) { + // the current expected behavior is that timerId is set to undefined once it expires or is interrupted + if (server?.monitor.monitorId.timerId === undefined) { + return false; + } + } + return true; + } + const servers = client.topology.s.servers; + expect(monitorTimersExist(servers)).to.be.true; + await client.close(); + expect(monitorTimersExist(servers)).to.be.true; + + expect(getTimerCount()).to.equal(0); + }; + await runScriptAndGetProcessInfo('timer-monitor-interval', config, run); + } + ); }); describe('after a heartbeat fails', () => { - it.skip('the new monitor interval timer is cleaned up by client.close()', async () => {}); + it( + 'the new monitor interval timer is cleaned up by client.close()', + metadata, + async () => { + const run = async function ({ MongoClient, expect, getTimerCount, once }) { + const heartbeatFrequencyMS = 2000; + const client = new MongoClient('mongodb://fakeUri', { heartbeatFrequencyMS }); + const willBeHeartbeatFailed = once(client, 'serverHeartbeatFailed'); + client.connect(); + await willBeHeartbeatFailed; + + function getMonitorTimer(servers) { + for (const [, server] of servers) { + return server?.monitor.monitorId.timerId; + } + } + const servers = client.topology.s.servers; + expect(getMonitorTimer(servers)).to.exist; + await client.close(); + // the current expected behavior is that timerId is set to undefined once it expires or is interrupted + expect(getMonitorTimer(servers)).to.not.exist; + + expect(getTimerCount()).to.equal(0); + }; + await runScriptAndGetProcessInfo('timer-heartbeat-failed-monitor', config, run); + } + ); }); }); }); - describe('Connection Monitoring', () => { + describe('Monitoring Connection', () => { describe('Node.js resource: Socket', () => { - it.skip('no sockets remain after client.close()', metadata, async function () {}); + it('no sockets remain after client.close()', metadata, async function () { + const run = async function ({ MongoClient, uri, expect, getSocketEndpoints }) { + const client = new MongoClient(uri); + await client.connect(); + + const servers = client.topology?.s.servers; + // assert socket creation + for (const [, server] of servers) { + const { host, port } = server.s.description.hostAddress; + expect(getSocketEndpoints()).to.deep.include({ host, port }); + } + + await client.close(); + + // assert socket destruction + for (const [, server] of servers) { + const { host, port } = server.s.description.hostAddress; + expect(getSocketEndpoints()).to.not.deep.include({ host, port }); + } + }; + await runScriptAndGetProcessInfo('socket-connection-monitoring', config, run); + }); }); }); describe('RTT Pinger', () => { describe('Node.js resource: Timer', () => { describe('after entering monitor streaming mode ', () => { - it.skip('the rtt pinger timer is cleaned up by client.close()', async () => { - // helloReply has a topologyVersion defined - }); + it( + 'the rtt pinger timer is cleaned up by client.close()', + metadata, + async function () { + const run = async function ({ MongoClient, uri, expect, getTimerCount, once }) { + const heartbeatFrequencyMS = 2000; + const client = new MongoClient(uri, { + serverMonitoringMode: 'stream', + heartbeatFrequencyMS + }); + await client.connect(); + await once(client, 'serverHeartbeatSucceeded'); + + function getRttTimer(servers) { + for (const [, server] of servers) { + return server?.monitor.rttPinger.monitorId; + } + } + + const servers = client.topology.s.servers; + expect(getRttTimer(servers)).to.exist; + + await client.close(); + expect(getRttTimer(servers)).to.not.exist; + + expect(getTimerCount()).to.equal(0); + }; + await runScriptAndGetProcessInfo('timer-rtt-monitor', config, run); + } + ); }); }); describe('Connection', () => { describe('Node.js resource: Socket', () => { describe('when rtt monitoring is turned on', () => { - it.skip('no sockets remain after client.close()', async () => {}); + it('no sockets remain after client.close()', metadata, async () => { + const run = async ({ MongoClient, uri, expect, getSockets, once, log }) => { + const heartbeatFrequencyMS = 500; + const client = new MongoClient(uri, { + serverMonitoringMode: 'stream', + heartbeatFrequencyMS + }); + await client.connect(); + + const socketsAddressesBeforeHeartbeat = getSockets().map(r => r.address); + + // set of servers whose heartbeats have occurred + const heartbeatOccurredSet = new Set(); + + const servers = client.topology.s.servers; + + while (heartbeatOccurredSet.size < servers.size) { + const ev = await once(client, 'serverHeartbeatSucceeded'); + log({ ev: ev[0] }); + heartbeatOccurredSet.add(ev[0].connectionId); + } + + const activeSocketsAfterHeartbeat = () => + getSockets() + .filter(r => !socketsAddressesBeforeHeartbeat.includes(r.address)) + .map(r => r.remoteEndpoint?.host + ':' + r.remoteEndpoint?.port); + // all servers should have had a heartbeat event and had a new socket created for rtt pinger + const activeSocketsBeforeClose = activeSocketsAfterHeartbeat(); + for (const [server] of servers) { + expect(activeSocketsBeforeClose).to.deep.contain(server); + } + + // close the client + await client.close(); + + log({ socketsAfterClose: getSockets() }); + // upon close, assert rttPinger sockets are cleaned up + const activeSocketsAfterClose = activeSocketsAfterHeartbeat(); + expect(activeSocketsAfterClose).to.have.lengthOf(0); + }; + + await runScriptAndGetProcessInfo('socket-connection-rtt-monitoring', config, run); + }); }); }); }); @@ -128,25 +298,126 @@ describe.skip('MongoClient.close() Integration', () => { describe('ConnectionPool', () => { describe('Node.js resource: minPoolSize timer', () => { describe('after new connection pool is created', () => { - it.skip('the minPoolSize timer is cleaned up by client.close()', async () => {}); + it('the minPoolSize timer is cleaned up by client.close()', async function () { + const run = async function ({ MongoClient, uri, expect, getTimerCount }) { + const client = new MongoClient(uri, { minPoolSize: 1 }); + let minPoolSizeTimerCreated = false; + client.on('connectionPoolReady', () => (minPoolSizeTimerCreated = true)); + await client.connect(); + + expect(minPoolSizeTimerCreated).to.be.true; + + const servers = client.topology?.s.servers; + + function getMinPoolSizeTimer(servers) { + for (const [, server] of servers) { + return server.pool.minPoolSizeTimer; + } + } + // note: minPoolSizeCheckFrequencyMS = 100 ms by client, so this test has a chance of being flaky + expect(getMinPoolSizeTimer(servers)).to.exist; + + await client.close(); + expect(getMinPoolSizeTimer(servers)).to.not.exist; + expect(getTimerCount()).to.equal(0); + }; + await runScriptAndGetProcessInfo('timer-min-pool-size', config, run); + }); }); }); describe('Node.js resource: checkOut Timer', () => { - // waitQueueTimeoutMS describe('after new connection pool is created', () => { - it.skip('the wait queue timer is cleaned up by client.close()', async () => {}); + let utilClient; + const waitQueueTimeoutMS = 1515; + + beforeEach(async function () { + // configure failPoint + utilClient = this.configuration.newClient(); + await utilClient.connect(); + const failPoint = { + configureFailPoint: 'failCommand', + mode: { times: 1 }, + data: { + appName: 'waitQueueTestClient', + blockConnection: true, + blockTimeMS: waitQueueTimeoutMS * 3, + failCommands: ['insert'] + } + }; + await utilClient.db('admin').command(failPoint); + }); + + afterEach(async function () { + await utilClient.db().admin().command({ + configureFailPoint: 'failCommand', + mode: 'off' + }); + await utilClient.close(); + }); + + it('the wait queue timer is cleaned up by client.close()', async function () { + const run = async function ({ MongoClient, uri, expect, getTimerCount, once }) { + const waitQueueTimeoutMS = 1515; + + const client = new MongoClient(uri, { + maxPoolSize: 1, + waitQueueTimeoutMS, + appName: 'waitQueueTestClient', + monitorCommands: true + }); + client + .db('db') + .collection('collection') + .insertOne({ x: 1 }) + .catch(e => e); + await once(client, 'connectionCheckedOut'); + + const blockedInsert = client + .db('db') + .collection('collection') + .insertOne({ x: 1 }) + .catch(e => e); + await once(client, 'connectionCheckOutStarted'); + + expect(getTimerCount()).to.not.equal(0); + await client.close(); + expect(getTimerCount()).to.equal(0); + + const err = await blockedInsert; + expect(err).to.be.instanceOf(Error); + expect(err.message).to.contain( + 'Timed out while checking out a connection from connection pool' + ); + }; + await runScriptAndGetProcessInfo('timer-check-out', config, run); + }); }); }); describe('Connection', () => { describe('Node.js resource: Socket', () => { - describe('after a connection is checked out', () => { - it.skip('no sockets remain after client.close()', async () => {}); - }); - describe('after a minPoolSize has been set on the ConnectionPool', () => { - it.skip('no sockets remain after client.close()', async () => {}); + it('no sockets remain after client.close()', async function () { + const run = async function ({ MongoClient, uri, expect, getSockets }) { + // assert no sockets to start with + expect(getSockets()).to.have.lengthOf(0); + const options = { minPoolSize: 1 }; + const client = new MongoClient(uri, options); + await client.connect(); + + // regardless of pool size: there should be a client connection socket for each server, and one monitor socket total + // with minPoolSize = 1, there should be one or more extra active sockets + expect(getSockets()).to.have.length.gte(client.topology?.s.servers.size + 2); + + await client.close(); + + // assert socket clean-up + expect(getSockets()).to.have.lengthOf(0); + }; + + await runScriptAndGetProcessInfo('socket-minPoolSize', config, run); + }); }); }); }); @@ -155,8 +426,26 @@ describe.skip('MongoClient.close() Integration', () => { describe('SrvPoller', () => { describe('Node.js resource: Timer', () => { + // requires an srv environment that can transition to sharded + const metadata: MongoDBMetadataUI = { requires: { topology: 'sharded' } }; + describe('after SRVPoller is created', () => { - it.skip('timers are cleaned up by client.close()', async () => {}); + it('timers are cleaned up by client.close()', metadata, async () => { + const run = async function ({ MongoClient, expect, getTimerCount }) { + const SRV_CONNECTION_STRING = `mongodb+srv://test1.test.build.10gen.cc`; + // 27018 localhost.test.build.10gen.cc. + // 27017 localhost.test.build.10gen.cc. + + const client = new MongoClient(SRV_CONNECTION_STRING); + await client.connect(); + // the current expected behavior is that _timeout is set to undefined until SRV polling starts + // then _timeout is set to undefined again when SRV polling stops + expect(client.topology.s.srvPoller._timeout).to.exist; + await client.close(); + expect(getTimerCount()).to.equal(0); + }; + await runScriptAndGetProcessInfo('timer-srv-poller', config, run); + }); }); }); }); @@ -201,14 +490,14 @@ describe.skip('MongoClient.close() Integration', () => { describe('KMS Request', () => { describe('Node.js resource: TLS file read', () => { describe('when KMSRequest reads an infinite TLS file', () => { - it('the file read is interrupted by client.close()', async () => { + it('the file read is interrupted by client.close()', metadata, async () => { await runScriptAndGetProcessInfo( 'tls-file-read-auto-encryption', config, - async function run({ MongoClient, uri, expect, ClientEncryption, BSON }) { + async function run({ MongoClient, uri, expect, mongodb }) { const infiniteFile = '/dev/zero'; - const kmsProviders = BSON.EJSON.parse(process.env.CSFLE_KMS_PROVIDERS); + const kmsProviders = mongodb.BSON.EJSON.parse(process.env.CSFLE_KMS_PROVIDERS); const masterKey = { region: 'us-east-1', key: 'arn:aws:kms:us-east-1:579766882180:key/89fcc2c4-08b0-4bd9-9f25-e30687b580d0' @@ -219,7 +508,7 @@ describe.skip('MongoClient.close() Integration', () => { await keyVaultClient.connect(); await keyVaultClient.db('keyvault').collection('datakeys'); - const clientEncryption = new ClientEncryption(keyVaultClient, { + const clientEncryption = new mongodb.ClientEncryption(keyVaultClient, { keyVaultNamespace: 'keyvault.datakeys', kmsProviders }); diff --git a/test/integration/node-specific/resource_tracking_script_builder.ts b/test/integration/node-specific/resource_tracking_script_builder.ts index cb135fff22..066fb9fad1 100644 --- a/test/integration/node-specific/resource_tracking_script_builder.ts +++ b/test/integration/node-specific/resource_tracking_script_builder.ts @@ -1,13 +1,17 @@ import { fork, spawn } from 'node:child_process'; import { on, once } from 'node:events'; +import { openSync } from 'node:fs'; import { readFile, unlink, writeFile } from 'node:fs/promises'; import * as path from 'node:path'; import { AssertionError, expect } from 'chai'; +import type * as timers from 'timers'; import { parseSnapshot } from 'v8-heapsnapshot'; -import { type BSON, type ClientEncryption, type MongoClient } from '../../mongodb'; +import type * as mongodb from '../../mongodb'; +import { type MongoClient } from '../../mongodb'; import { type TestConfiguration } from '../../tools/runner/config'; +import { type sleep } from '../../tools/utils'; export type ResourceTestFunction = HeapResourceTestFunction | ProcessResourceTestFunction; @@ -19,11 +23,16 @@ export type HeapResourceTestFunction = (options: { export type ProcessResourceTestFunction = (options: { MongoClient: typeof MongoClient; - uri: string; + uri?: string; log?: (out: any) => void; expect: typeof expect; - ClientEncryption?: typeof ClientEncryption; - BSON?: typeof BSON; + mongodb?: typeof mongodb; + sleep?: typeof sleep; + getTimerCount?: () => number; + timers?: typeof timers; + getSocketReport?: () => { host: string; port: string }; + getSocketEndpointReport?: () => any; + once?: () => typeof once; }) => Promise; const HEAP_RESOURCE_SCRIPT_PATH = path.resolve( @@ -168,7 +177,10 @@ export async function runScriptAndGetProcessInfo( await writeFile(scriptName, scriptContent, { encoding: 'utf8' }); const logFile = name + '.logs.txt'; - const script = spawn(process.execPath, [scriptName], { stdio: ['ignore', 'ignore', 'inherit'] }); + const stdErrFile = 'err.out'; + const script = spawn(process.execPath, [scriptName], { + stdio: ['ignore', 'ignore', openSync(stdErrFile, 'w')] + }); const willClose = once(script, 'close'); @@ -182,20 +194,27 @@ export async function runScriptAndGetProcessInfo( .map(line => JSON.parse(line)) .reduce((acc, curr) => ({ ...acc, ...curr }), {}); + const stdErrSize = await readFile(stdErrFile, { encoding: 'utf8' }); + // delete temporary files await unlink(scriptName); await unlink(logFile); + await unlink(stdErrFile); // assertions about exit status if (exitCode) { const assertionError = new AssertionError( - messages.error.message + '\n\t' + JSON.stringify(messages.error.resources, undefined, 2) + messages.error?.message + '\n\t' + JSON.stringify(messages.error?.resources, undefined, 2) ); - assertionError.stack = messages.error.stack + new Error().stack.slice('Error'.length); + assertionError.stack = messages.error?.stack + new Error().stack.slice('Error'.length); throw assertionError; } // assertions about resource status expect(messages.beforeExitHappened).to.be.true; - expect(messages.newResources).to.be.empty; + expect(messages.newResources.libuvResources).to.be.empty; + expect(messages.newResources.activeResources).to.be.empty; + + // assertion about error output + expect(stdErrSize).to.be.empty; } diff --git a/test/tools/fixtures/process_resource_script.in.js b/test/tools/fixtures/process_resource_script.in.js index 2b5a3b40ac..c2886af9a7 100644 --- a/test/tools/fixtures/process_resource_script.in.js +++ b/test/tools/fixtures/process_resource_script.in.js @@ -7,16 +7,19 @@ const func = FUNCTION_STRING; const scriptName = SCRIPT_NAME_STRING; const uri = URI_STRING; -const { MongoClient, ClientEncryption, BSON } = require(driverPath); +const mongodb = require(driverPath); +const { MongoClient } = mongodb; const process = require('node:process'); const util = require('node:util'); -const timers = require('node:timers'); const fs = require('node:fs'); const { expect } = require('chai'); -const { setTimeout } = require('timers'); +const timers = require('node:timers'); +const { setTimeout } = timers; +const { once } = require('node:events'); let originalReport; const logFile = scriptName + '.logs.txt'; +const sleep = util.promisify(setTimeout); const run = func; @@ -28,7 +31,6 @@ const run = func; * In order to be counted as a new resource, a resource MUST: * - Must NOT share an address with a libuv resource that existed at the start of script * - Must be referenced. See [here](https://nodejs.org/api/timers.html#timeoutref) for more context. - * - Must NOT be an inactive server * * We're using the following tool to track resources: `process.report.getReport().libuv` * For more context, see documentation for [process.report.getReport()](https://nodejs.org/api/report.html), and [libuv](https://docs.libuv.org/en/v1.x/handle.html). @@ -40,7 +42,6 @@ function getNewLibuvResourceArray() { /** * @typedef {Object} LibuvResource - * @property {boolean} is_active Is the resource active? For a socket, this means it is allowing I/O. For a timer, this means a timer is has not expired. * @property {string} type What is the resource type? For example, 'tcp' | 'timer' | 'udp' | 'tty'... (See more in [docs](https://docs.libuv.org/en/v1.x/handle.html)). * @property {boolean} is_referenced Is the resource keeping the JS event loop active? * @@ -49,9 +50,7 @@ function getNewLibuvResourceArray() { function isNewLibuvResource(resource) { const serverType = ['tcp', 'udp']; return ( - !originalReportAddresses.includes(resource.address) && - resource.is_referenced && // if a resource is unreferenced, it's not keeping the event loop open - (!serverType.includes(resource.type) || resource.is_active) + !originalReportAddresses.includes(resource.address) && resource.is_referenced // if a resource is unreferenced, it's not keeping the event loop open ); } @@ -66,13 +65,14 @@ function getNewLibuvResourceArray() { * In order to be counted as a new resource, a resource MUST either: * - Meet the criteria to be returned by our helper utility `getNewLibuvResourceArray()` * OR - * - Be returned by `process.getActiveResourcesInfo() + * - Be returned by `process.getActiveResourcesInfo() and is not 'TTYWrap' * * The reason we are using both methods to detect active resources is: * - `process.report.getReport().libuv` does not detect active requests (such as timers or file reads) accurately * - `process.getActiveResourcesInfo()` does not contain enough server information we need for our assertions * */ + function getNewResources() { return { libuvResources: getNewLibuvResourceArray(), @@ -80,6 +80,26 @@ function getNewResources() { }; } +/** + * @returns Number of active timers in event loop + */ +const getTimerCount = () => process.getActiveResourcesInfo().filter(r => r === 'Timeout').length; + +/** + * @returns Array of socket resources in the event loop + */ +const getSockets = () => process.report.getReport().libuv.filter(r => r.type === 'tcp'); + +/** + * @returns Array of remote endpoints of socket resources in the event loop + * @example [{ host: 'localhost', port: 27020 }, { host: 'localhost', port: 27107 }] + */ +const getSocketEndpoints = () => + process.report + .getReport() + .libuv.filter(r => r.type === 'tcp') + .map(r => r.remoteEndpoint); + // A log function for debugging function log(message) { // remove outer parentheses for easier parsing @@ -92,7 +112,18 @@ async function main() { process.on('beforeExit', () => { log({ beforeExitHappened: true }); }); - await run({ MongoClient, uri, log, expect, ClientEncryption, BSON }); + await run({ + MongoClient, + uri, + log, + expect, + mongodb, + sleep, + getTimerCount, + getSockets, + getSocketEndpoints, + once + }); log({ newResources: getNewResources() }); }