Skip to content

Commit

Permalink
requested changes at desk - not tested.. yet..
Browse files Browse the repository at this point in the history
requested changes

lint
  • Loading branch information
aditi-khare-mongoDB committed Jan 22, 2025
1 parent a91e9af commit 13a8a25
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 71 deletions.
6 changes: 6 additions & 0 deletions socket-connection-rtt-monitoring.logs.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"ev":{"name":"serverHeartbeatSucceeded","connectionId":"localhost:31000","duration":4,"reply":{"topologyVersion":{"processId":"67917651e47cc2d141909817","counter":6},"hosts":["localhost:31000","localhost:31001","localhost:31002"],"arbiters":["localhost:31003"],"setName":"rs","setVersion":1,"isWritablePrimary":true,"secondary":false,"primary":"localhost:31000","me":"localhost:31000","electionId":"7fffffff0000000000000001","lastWrite":{"opTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"lastWriteDate":"2025-01-22T23:12:53.000Z","majorityOpTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"majorityWriteDate":"2025-01-22T23:12:53.000Z"},"maxBsonObjectSize":16777216,"maxMessageSizeBytes":48000000,"maxWriteBatchSize":100000,"localTime":"2025-01-22T23:13:01.572Z","logicalSessionTimeoutMinutes":30,"connectionId":374,"minWireVersion":0,"maxWireVersion":21,"readOnly":false,"ok":1,"$clusterTime":{"clusterTime":{"$timestamp":"7462881799971012609"},"signature":{"hash":"0ZNDkBzDMPYZGu7Xf3UGFzhng/A=","keyId":{"low":5,"high":1737586273,"unsigned":false}}},"operationTime":{"$timestamp":"7462881799971012609"}},"awaited":true}}
{"ev":{"name":"serverHeartbeatSucceeded","connectionId":"localhost:31003","duration":2,"reply":{"topologyVersion":{"processId":"679176542928a27b6927e95d","counter":2},"hosts":["localhost:31000","localhost:31001","localhost:31002"],"arbiters":["localhost:31003"],"setName":"rs","setVersion":1,"isWritablePrimary":false,"secondary":false,"primary":"localhost:31000","arbiterOnly":true,"me":"localhost:31003","lastWrite":{"opTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"lastWriteDate":"2025-01-22T23:12:53.000Z","majorityOpTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"majorityWriteDate":"2025-01-22T23:12:53.000Z"},"maxBsonObjectSize":16777216,"maxMessageSizeBytes":48000000,"maxWriteBatchSize":100000,"localTime":"2025-01-22T23:13:01.577Z","logicalSessionTimeoutMinutes":30,"connectionId":169,"minWireVersion":0,"maxWireVersion":21,"readOnly":false,"ok":1},"awaited":true}}
{"ev":{"name":"serverHeartbeatSucceeded","connectionId":"localhost:31001","duration":2,"reply":{"topologyVersion":{"processId":"67917652195aa0dc84acece6","counter":4},"hosts":["localhost:31000","localhost:31001","localhost:31002"],"arbiters":["localhost:31003"],"setName":"rs","setVersion":1,"isWritablePrimary":false,"secondary":true,"primary":"localhost:31000","me":"localhost:31001","lastWrite":{"opTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"lastWriteDate":"2025-01-22T23:12:53.000Z","majorityOpTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"majorityWriteDate":"2025-01-22T23:12:53.000Z"},"maxBsonObjectSize":16777216,"maxMessageSizeBytes":48000000,"maxWriteBatchSize":100000,"localTime":"2025-01-22T23:13:01.578Z","logicalSessionTimeoutMinutes":30,"connectionId":200,"minWireVersion":0,"maxWireVersion":21,"readOnly":false,"ok":1,"$clusterTime":{"clusterTime":{"$timestamp":"7462881799971012609"},"signature":{"hash":"0ZNDkBzDMPYZGu7Xf3UGFzhng/A=","keyId":{"low":5,"high":1737586273,"unsigned":false}}},"operationTime":{"$timestamp":"7462881799971012609"}},"awaited":true}}
{"ev":{"name":"serverHeartbeatSucceeded","connectionId":"localhost:31002","duration":2,"reply":{"topologyVersion":{"processId":"6791765368696db3e1252bb1","counter":4},"hosts":["localhost:31000","localhost:31001","localhost:31002"],"arbiters":["localhost:31003"],"setName":"rs","setVersion":1,"isWritablePrimary":false,"secondary":true,"primary":"localhost:31000","me":"localhost:31002","lastWrite":{"opTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"lastWriteDate":"2025-01-22T23:12:53.000Z","majorityOpTime":{"ts":{"$timestamp":"7462881799971012609"},"t":1},"majorityWriteDate":"2025-01-22T23:12:53.000Z"},"maxBsonObjectSize":16777216,"maxMessageSizeBytes":48000000,"maxWriteBatchSize":100000,"localTime":"2025-01-22T23:13:01.579Z","logicalSessionTimeoutMinutes":30,"connectionId":201,"minWireVersion":0,"maxWireVersion":21,"readOnly":false,"ok":1,"$clusterTime":{"clusterTime":{"$timestamp":"7462881799971012609"},"signature":{"hash":"0ZNDkBzDMPYZGu7Xf3UGFzhng/A=","keyId":{"low":5,"high":1737586273,"unsigned":false}}},"operationTime":{"$timestamp":"7462881799971012609"}},"awaited":true}}
{"socketsAfterClose":[{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x0000000129e1e690","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000013b10eb50","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000013b009eb0","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000013b009c00","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x0000000139f41c80","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":true,"is_referenced":true,"address":"0x000000013b00ff90","localEndpoint":{"host":"localhost","port":65364},"remoteEndpoint":{"host":"localhost","port":31000},"sendBufferSize":146808,"recvBufferSize":407800,"fd":26,"writeQueueSize":0,"readable":true,"writable":true},{"type":"tcp","is_active":true,"is_referenced":true,"address":"0x000000010ab0a9e0","localEndpoint":{"host":"localhost","port":65365},"remoteEndpoint":{"host":"localhost","port":31001},"sendBufferSize":146808,"recvBufferSize":407800,"fd":28,"writeQueueSize":0,"readable":true,"writable":true},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000010ab0ab90","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000010ab0af40","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false}]}
{"error":{"message":"expected [ 'localhost:31000', …(3) ] to have a length of +0 but got 4","stack":"AssertionError: expected [ 'localhost:31000', …(3) ] to have a length of +0 but got 4\n at func (/Users/aditi.khare/Desktop/node-mongodb-native/socket-connection-rtt-monitoring.cjs:35:81)\n at process.processTicksAndRejections (node:internal/process/task_queues:95:5)\n at async main (/Users/aditi.khare/Desktop/node-mongodb-native/socket-connection-rtt-monitoring.cjs:145:3)","resources":{"libuvResources":[{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x0000000129e1e690","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000013b10eb50","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000013b009eb0","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000013b009c00","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x0000000139f41c80","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":true,"is_referenced":true,"address":"0x000000013b00ff90","localEndpoint":{"host":"localhost","port":65364},"remoteEndpoint":{"host":"localhost","port":31000},"sendBufferSize":146808,"recvBufferSize":407800,"fd":26,"writeQueueSize":0,"readable":true,"writable":true},{"type":"tcp","is_active":true,"is_referenced":true,"address":"0x000000010ab0a9e0","localEndpoint":{"host":"localhost","port":65365},"remoteEndpoint":{"host":"localhost","port":31001},"sendBufferSize":146808,"recvBufferSize":407800,"fd":28,"writeQueueSize":0,"readable":true,"writable":true},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000010ab0ab90","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false},{"type":"tcp","is_active":false,"is_referenced":true,"address":"0x000000010ab0af40","localEndpoint":null,"remoteEndpoint":null,"sendBufferSize":0,"recvBufferSize":0,"writeQueueSize":0,"readable":false,"writable":false}],"activeResources":["ConnectWrap","ConnectWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","TCPSocketWrap","Timeout","Timeout"]}}}
80 changes: 24 additions & 56 deletions test/integration/node-specific/client_close.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,16 @@ describe.skip('MongoClient.close() Integration', () => {
'monitor interval timer is cleaned up by client.close()',
metadata,
async function () {
const run = async function ({
MongoClient,
uri,
expect,
getTimerCount,
promiseWithResolvers
}) {
const run = async function ({ MongoClient, uri, expect, getTimerCount, once }) {
const heartbeatFrequencyMS = 2000;
const client = new MongoClient(uri, { heartbeatFrequencyMS });
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatSucceeded', () => resolve());
const willBeHeartbeatSucceeded = once(client, 'serverHeartbeatSucceeded');
await client.connect();
await promise;
await willBeHeartbeatSucceeded;

function monitorTimersExist(servers) {
for (const [, server] of servers) {
// the current expected behavior is that timerId is set to undefined once it expires or is interrupted
if (server?.monitor.monitorId.timerId === undefined) {
return false;
}
Expand All @@ -159,18 +153,12 @@ describe.skip('MongoClient.close() Integration', () => {
'the new monitor interval timer is cleaned up by client.close()',
metadata,
async () => {
const run = async function ({
MongoClient,
expect,
getTimerCount,
promiseWithResolvers
}) {
const run = async function ({ MongoClient, expect, getTimerCount, once }) {
const heartbeatFrequencyMS = 2000;
const client = new MongoClient('mongodb://fakeUri', { heartbeatFrequencyMS });
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatFailed', () => resolve());
const willBeHeartbeatFailed = once(client, 'serverHeartbeatFailed');
client.connect();
await promise;
await willBeHeartbeatFailed;

function getMonitorTimer(servers) {
for (const [, server] of servers) {
Expand All @@ -180,6 +168,7 @@ describe.skip('MongoClient.close() Integration', () => {
const servers = client.topology.s.servers;
expect(getMonitorTimer(servers)).to.exist;
await client.close();
// the current expected behavior is that timerId is set to undefined once it expires or is interrupted
expect(getMonitorTimer(servers)).to.not.exist;

expect(getTimerCount()).to.equal(0);
Expand Down Expand Up @@ -225,22 +214,14 @@ describe.skip('MongoClient.close() Integration', () => {
'the rtt pinger timer is cleaned up by client.close()',
metadata,
async function () {
const run = async function ({
MongoClient,
uri,
expect,
getTimerCount,
promiseWithResolvers
}) {
const run = async function ({ MongoClient, uri, expect, getTimerCount, once }) {
const heartbeatFrequencyMS = 2000;
const client = new MongoClient(uri, {
serverMonitoringMode: 'stream',
heartbeatFrequencyMS
});
await client.connect();
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatSucceeded', () => resolve());
await promise;
await once(client, 'serverHeartbeatSucceeded');

function getRttTimer(servers) {
for (const [, server] of servers) {
Expand All @@ -266,13 +247,7 @@ describe.skip('MongoClient.close() Integration', () => {
describe('Node.js resource: Socket', () => {
describe('when rtt monitoring is turned on', () => {
it('no sockets remain after client.close()', metadata, async () => {
const run = async ({
MongoClient,
uri,
expect,
getSockets,
promiseWithResolvers
}) => {
const run = async ({ MongoClient, uri, expect, getSockets, once, log }) => {
const heartbeatFrequencyMS = 500;
const client = new MongoClient(uri, {
serverMonitoringMode: 'stream',
Expand All @@ -288,12 +263,9 @@ describe.skip('MongoClient.close() Integration', () => {
const servers = client.topology.s.servers;

while (heartbeatOccurredSet.size < servers.size) {
const { promise, resolve } = promiseWithResolvers();
client.once('serverHeartbeatSucceeded', ev => {
heartbeatOccurredSet.add(ev.connectionId);
resolve();
});
await promise;
const ev = await once(client, 'serverHeartbeatSucceeded');
log({ ev: ev[0] });
heartbeatOccurredSet.add(ev[0].connectionId);
}

const activeSocketsAfterHeartbeat = () =>
Expand All @@ -309,6 +281,7 @@ describe.skip('MongoClient.close() Integration', () => {
// close the client
await client.close();

log({ socketsAfterClose: getSockets() });
// upon close, assert rttPinger sockets are cleaned up
const activeSocketsAfterClose = activeSocketsAfterHeartbeat();
expect(activeSocketsAfterClose).to.have.lengthOf(0);
Expand Down Expand Up @@ -456,24 +429,19 @@ describe.skip('MongoClient.close() Integration', () => {
describe('SrvPoller', () => {
describe('Node.js resource: Timer', () => {
// requires an srv environment that can transition to sharded
const metadata: MongoDBMetadataUI = {
requires: {
predicate: () =>
process.env.ATLAS_SRV_REPL ? true : 'Skipped: this test requires an SRV environment'
}
};
const metadata: MongoDBMetadataUI = { requires: { topology: 'sharded' } };

describe('after SRVPoller is created', () => {
it('timers are cleaned up by client.close()', metadata, async () => {
const run = async function ({ MongoClient, uri, expect, getTimerCount }) {
const client = new MongoClient(uri);
const run = async function ({ MongoClient, expect, getTimerCount }) {
const SRV_CONNECTION_STRING = `mongodb+srv://test1.test.build.10gen.cc`;
// 27018 localhost.test.build.10gen.cc.
// 27017 localhost.test.build.10gen.cc.

const client = new MongoClient(SRV_CONNECTION_STRING);
await client.connect();
const description = client.topology.s.description;
// simulate transition to sharded
client.topology.emit('topologyDescriptionChanged', description, {
...description,
type: 'Sharded'
});
// the current expected behavior is that _timeout is set to undefined until SRV polling starts
// then _timeout is set to undefined again when SRV polling stops
expect(client.topology.s.srvPoller._timeout).to.exist;
await client.close();
expect(getTimerCount()).to.equal(0);
Expand Down
16 changes: 13 additions & 3 deletions test/integration/node-specific/resource_tracking_script_builder.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { fork, spawn } from 'node:child_process';
import { on, once } from 'node:events';
import { openSync, statSync } from 'node:fs';
import { readFile, unlink, writeFile } from 'node:fs/promises';
import * as path from 'node:path';

Expand Down Expand Up @@ -31,7 +32,7 @@ export type ProcessResourceTestFunction = (options: {
timers?: typeof timers;
getSocketReport?: () => { host: string; port: string };
getSocketEndpointReport?: () => any;
promiseWithResolvers?: () => any;
once?: () => typeof once;
}) => Promise<void>;

const HEAP_RESOURCE_SCRIPT_PATH = path.resolve(
Expand Down Expand Up @@ -176,7 +177,10 @@ export async function runScriptAndGetProcessInfo(
await writeFile(scriptName, scriptContent, { encoding: 'utf8' });
const logFile = name + '.logs.txt';

const script = spawn(process.execPath, [scriptName], { stdio: ['ignore', 'ignore', 'inherit'] });
const stdErrFile = 'err.out';
const script = spawn(process.execPath, [scriptName], {
stdio: ['ignore', 'ignore', openSync(stdErrFile, 'w')]
});

const willClose = once(script, 'close');

Expand All @@ -190,9 +194,12 @@ export async function runScriptAndGetProcessInfo(
.map(line => JSON.parse(line))
.reduce((acc, curr) => ({ ...acc, ...curr }), {});

const stdErrSize = statSync(stdErrFile).size;

// delete temporary files
await unlink(scriptName);
await unlink(logFile);
// await unlink(logFile);
await unlink(stdErrFile);

// assertions about exit status
if (exitCode) {
Expand All @@ -203,6 +210,9 @@ export async function runScriptAndGetProcessInfo(
throw assertionError;
}

// assertion about error output
expect(stdErrSize).to.equal(0);

// assertions about resource status
expect(messages.beforeExitHappened).to.be.true;
expect(messages.newResources.libuvResources).to.be.empty;
Expand Down
Loading

0 comments on commit 13a8a25

Please sign in to comment.