Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket reconnect doesn't work #2563

Open
1 task done
justefg opened this issue Jul 30, 2024 · 8 comments
Open
1 task done

Websocket reconnect doesn't work #2563

justefg opened this issue Jul 30, 2024 · 8 comments

Comments

@justefg
Copy link

justefg commented Jul 30, 2024

Check existing issues

Viem Version

2.18.5

Current Behavior

Websocket doesn't reconnect after a connection is dropped preventing users from receiving events.

Expected Behavior

[2024-07-30 22:26:24.411] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d50dad340","0x000000000000000000000000000000000000000000000000000000000006444e"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f810","blockNumber":"59991410","transactionHash":"0xd01e7b361907762ed05bf7a460cfebee5b81022e160eae979f5d8cc1941e9cbd","transactionIndex":65,"blockHash":"0x6827b8a4059d7e188fcd3a1338a6209bb563d259252fdb067f91949cc4bef26a","logIndex":304,"removed":false,"args":{"current":"332069000000","roundId":"410702","updatedAt":"1722349584"},"eventName":"AnswerUpdated"}]
[2024-07-30 22:26:34.342] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d5a1d27f0","0x000000000000000000000000000000000000000000000000000000000006444f"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f81a","blockNumber":"59991415","transactionHash":"0x06c8fafeeadc8e269a91af5a8ecd7475f388b01176a3a78ec8a23548e6885565","transactionIndex":9,"blockHash":"0x430ec10a97a9bc4280d1320966d8136599f4edb8b73a67e02afc3efa572f5426","logIndex":51,"removed":false,"args":{"current":"332224342000","roundId":"410703","updatedAt":"1722349594"},"eventName":"AnswerUpdated"}]
[2024-07-30 22:26:44.392] [{"address":"0x63db7e86391f5d31bab58808bcf75edb272f4f5c","topics":["0x0559884fd3a460db3073b7fc896cc77986f16e378210ded43186175bf646fc5f","0x0000000000000000000000000000000000000000000000000000004d5ea35808","0x0000000000000000000000000000000000000000000000000000000000064450"],"data":"0x0000000000000000000000000000000000000000000000000000000066a8f824","blockNumber":"59991420","transactionHash":"0xd4a47f2d47ed475879ad1cd263c2360b4789af27ef6f459d709075cf4ad080ae","transactionIndex":28,"blockHash":"0x6710a37394365575d07c9253f7133045f2d2db982f603a8ca55366289c9f505b","logIndex":166,"removed":false,"args":{"current":"332300245000","roundId":"410704","updatedAt":"1722349604"},"eventName":"AnswerUpdated"}]

Steps To Reproduce

Run minimal reproducible example usingts-node:
ts-node example.ts

It works fine for about an hour but then after a connection drop events are no longer received. Tested with multiple websocket providers.

Link to Minimal Reproducible Example

https://gist.github.com/justefg/95acdcb5d8cbee6930b6177120c24cbc

Anything else?

No response

@jxom
Copy link
Member

jxom commented Jul 30, 2024

Hooked up onError to WebSocket subscriptions here: 889371e, so you should be able to receive socket closure errors on your onError callback in the next release. However, more work is needed to get reconnection to work properly, will work on that next. For now, you can manually reinvoke watchContractEvent when you receive a socket closure error.

@justefg
Copy link
Author

justefg commented Jul 31, 2024

thank you @jxom

@nstylo
Copy link

nstylo commented Aug 7, 2024

@justefg I suppose you are using chainstack as your node provider?
They do close websocket connections after a while and viem does not catch that case. This is my hack. Perhaps it helps:

  /**
   * NOTE: We are using a custom re-connect mechanism as node providers
   * might close the websocket even if keep-alives are used. Viem does not
   * support this reconnect mechanism out of the box, so we have to implement
   * this ourselves.
   */
  private async connect() {
    this._logger.info("Connecting...");

    const socketRpcClient = await this._client.transport.getRpcClient();

    const heartbeat = () => {
      this._logger.info("ping ->");
      this._client
        .getBlockNumber()
        .then((_) => {
          this._logger.info("<- pong");
        })
        .catch((err) => this._logger.error(err));
    };

    const intervalId = setInterval(heartbeat, 5 * 60 * 1000);

    const onOpen = (_: Event) => {
      // drop
    };
    const onMessage = (_: MessageEvent<any>) => {
      // drop
    };
    const onError = (ev: Event) => {
      this._logger.error(ev);
    };
    const onClose = async () => {
      this._logger.warn("Websocket connection closed!");
      socketRpcClient.socket.removeEventListener("open", onOpen);
      socketRpcClient.socket.removeEventListener("message", onMessage);
      socketRpcClient.socket.removeEventListener("error", onError);
      socketRpcClient.socket.removeEventListener("close", onClose);
      // NOTE: IMPORTANT: invalidate viem's socketClientCache! When close
      // happens on socket level, the same socketClient with the closed websocket will be
      // re-used from cache leading to 'Socket is closed.' error.
      socketRpcClient.close();
      clearInterval(intervalId);
      this._client = this._clientManager.getClient(this._chain);
      this._logger.info("Re-establishing connection!");
      this.connect();
    };

    const setupEventListeners = () => {
      socketRpcClient.socket.addEventListener("open", onOpen);
      socketRpcClient.socket.addEventListener("message", onMessage);
      socketRpcClient.socket.addEventListener("error", onError);
      socketRpcClient.socket.addEventListener("close", onClose);
    };

    setupEventListeners();

    heartbeat();
  }

@40818419
Copy link

40818419 commented Aug 20, 2024

Hey @jxom, hows progress on that? Just run into the issue today. Is is already fixed in latest release?

@justefg
Copy link
Author

justefg commented Aug 21, 2024

@nstylo interesting. have you tested it?

@nstylo
Copy link

nstylo commented Aug 21, 2024

@justefg yes, works very reliably. I have to see on which version of view I am using this solution.

@arpan-jain
Copy link

arpan-jain commented Oct 21, 2024

Hey @nstylo
I am running a node.js application, which listens to events on websocket, via the viem client,. and then inserts these events in a mongoDb collection.
I am re-using the code you pasted above to fix the connection close issue, but i am facing a very weird issue on my side.
After few days of activity, my node.js application runs out of sockets/file-descriptors, and all the network requests (viem ws rpc and outbound mongodb requests) fail with socket timeout error.
It seems that after a reconnection attempt, node.js GC is not able to clean up the closed sockets.

Have anyone else also faced a similar issue? Or am i doing something wrong here?

export class WsListener {
  private logger = new Logger(WsListener.name);
  private client: PublicClient;
  private contractAddress;
  private eventAbi;
  private handlerFn;
  private chainId;
  private wsUrl;
  private intervalId: NodeJS.Timeout;
  private connectionEventTracker;
  private eventName;

  constructor({
    chainId,
    wsUrl,
    contractAddress,
    eventName,
    eventAbi,
    handlerFn,
    connectionEventTracker,
  }: {
    chainId: number;
    wsUrl: string;
    contractAddress: `0x${string}`;
    eventName: string;
    eventAbi: string;
    handlerFn: (chainId: number, log: EVENT_LOG[]) => any;
    connectionEventTracker: (params: {
      chainId: number;
      trackedEvent: string;
      type: WebsocketConnectionEventType;
    }) => Promise<void>;
  }) {
    this.wsUrl = wsUrl;
    this.contractAddress = contractAddress;
    this.eventName = eventName;
    this.eventAbi = eventAbi;
    this.handlerFn = handlerFn;
    this.chainId = chainId;
    this.connectionEventTracker = connectionEventTracker;
    this.connectClient();
  }

  private async connectClient() {
    try {
      // add a log in db, when a new connection is created 
      this.connectionEventTracker({
        chainId: this.chainId,
        trackedEvent: this.eventName,
        type: WebsocketConnectionEventType.Connection,
      });
      this.client = createPublicClient({
        transport: webSocket(this.wsUrl, {
          keepAlive: { interval: 1_000 }, // 1000 ms (will send keep alive ping messages every 1 sec)
          reconnect: true,
          retryCount: 5,
          timeout: 60_000, // 60 secs
        }),
      });
      this.attachHandler();
      await this.setupEventListeners();
      // ping every min
      this.intervalId = setInterval(() => this.heartbeat(), 1 * 60 * 1000);
    } catch (err) {
      this.logger.error(`Error while connecting client: `, err);
    }
  }

  private attachHandler() {
    // listen to event
    this.client.watchContractEvent({
      address: this.contractAddress,
      abi: parseAbi([this.eventAbi]),
      eventName: this.eventName,
      onLogs: async (logs: EVENT_LOG[]) => {
        this.logger.log(
          `...received event ${this.eventName} on ${this.chainId}...`,
        );
        await this.handlerFn(this.chainId, logs);
      },
    });
  }

  private async setupEventListeners() {
    const socketRpcClient = await this.client.transport.getRpcClient();

    // using arrow function wrappers over the event listeners, to preserve `this` context
    socketRpcClient.socket.addEventListener('open', (args: any) =>
      this.onOpen(args),
    );
    socketRpcClient.socket.addEventListener('message', (args: any) =>
      this.onMessage(args),
    );
    socketRpcClient.socket.addEventListener('error', (args: any) =>
      this.onError(args),
    );
    socketRpcClient.socket.addEventListener('close', () => this.onClose());
  }

  getClient() {
    return this.client;
  }

  async heartbeat() {
    try {
      this.logger.log(`...ping.........`);
      await this.client.getBlockNumber();
      this.logger.log(`.........pong...`);
    } catch (err) {
      this.logger.error(`---heartbeat-error---`, err);
      throw err;
    }
  }

  // eslint-disable-next-line @typescript-eslint/no-unused-vars
  onOpen(_: Event) {}

  // eslint-disable-next-line @typescript-eslint/no-unused-vars
  onMessage(_: MessageEvent<any>) {}

  onError(ev: Event) {
    this.logger.error(`websocket error: `, ev);
  }

  async onClose() {
    try {
      this.logger.warn('Websocket connection closed!');
      const socketRpcClient = await this.client.transport.getRpcClient();
      socketRpcClient.socket.removeEventListener('open', (args: any) =>
        this.onOpen(args),
      );
      socketRpcClient.socket.removeEventListener('message', (args: any) =>
        this.onMessage(args),
      );
      socketRpcClient.socket.removeEventListener('error', (args: any) =>
        this.onError(args),
      );
      socketRpcClient.socket.removeEventListener('close', () => this.onClose());
      // NOTE: IMPORTANT: invalidate viem's socketClientCache! When close
      // happens on socket level, the same socketClient with the closed websocket will be
      // re-used from cache leading to 'Socket is closed.' error.
      socketRpcClient.close();
      
      // logs in db, when a disconnection happens
      this.connectionEventTracker({
        chainId: this.chainId,
        trackedEvent: this.eventName,
        type: WebsocketConnectionEventType.Disconnection,
      });

      clearInterval(this.intervalId);
      this.logger.log('....Re-establishing connection!..');
      this.connectClient();
      this.logger.log('....Re-established connection!..');
    } catch (err) {
      this.logger.error(`Error while closing connection: `, err);
    }
  }
}

@justefg
Copy link
Author

justefg commented Oct 21, 2024

@arpan-jain have you checked websockets/ws#1869? what does netstat tell you?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants