Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQPClient.connect() may hang indefinitely under certain race conditions #121

Open
KristjanTammekivi opened this issue Sep 11, 2024 · 0 comments

Comments

@KristjanTammekivi
Copy link
Contributor

Originally misattributed this problem as #101 where I observed application exiting 0 during reconnects. As it turns out that just happened because Node.js event loop didn't have anything more to do and decided to exit. After adding a setInterval the application stayed up.

The real issue is that after the socket connects the socket may close without openok happening

Here's an illustration with my lovely console.logs
Things happen in the order 1, 2, 3.
4 Never happens

image

It seems like a doable fix, but I can only think about hacky solutions, maybe you'll be able to figure out a proper solution.

Here's my examples/close.js
After starting this up I just cycle rabbitmq off and on until it happens.

import { setTimeout } from 'node:timers/promises';
import { AMQPClient } from '../lib/mjs/amqp-client.js'

const log = (...args) => {
  const stack = new Error().stack.split('\n')
  const caller = stack[2].split(' ').pop().match(/(close\.js:\d+:\d+)/)[1]
  return console.log(new Date().toISOString(), ...args, `-${caller}`);
}

async function loopGetConnection() {
  let connectionAttempt = 0
  // eslint-disable-next-line no-constant-condition
  while (true) {
    log('Trying to connect', ++connectionAttempt)
    try {
      const amqp = new AMQPClient("amqp://localhost")
      await amqp.connect()
      console.log('success connect')
      return amqp
    } catch (err) {
      log('Failed to connect')
      await setTimeout(50)
    }
  }
}

let conn = null
async function getConnection() {
  if (conn) {
    return conn
  }
  conn = await loopGetConnection()
  conn.logger = {
    debug: (...args) => log('debug', ...args),
    info: (...args) => log('info', ...args),
    warn: (...args) => log('warn', ...args),
    error: (...args) => log('error', ...args),
  }
  conn.onerror = (err) => {
    log(err);
    conn = null;
  }
  return conn;
}

process.on('exit', (code) => {
  log('Process exited')
});

async function run() {
  {
    const conn = await getConnection()
    const ch = await conn.channel()
    await ch.queue('test', { autoDelete: true })
  }
  // eslint-disable-next-line no-constant-condition
  while (true) {
    const conn = await getConnection()
    const ch = await conn.channel()
    try {
      await ch.queue('test', { autoDelete: true })
      log('Published a message');
    } catch (err) {
      log('Failed to publish', err)
    } finally {
      await ch.close()
    }
    await setTimeout(1000)
  }
}
try {
  await run()
  log('Run exited normally unexpectedly')
} catch (e) {
  log('Run exited unexpectedly', e)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant