Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
Add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Oct 6, 2022
1 parent c538cf0 commit bc0bf79
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 34 deletions.
12 changes: 10 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import { CreateListenerOptions, DialOptions, symbol, Transport } from '@libp2p/i
import type { AbortOptions, Multiaddr } from '@multiformats/multiaddr'
import type { Socket, IpcSocketConnectOpts, TcpSocketConnectOpts } from 'net'
import type { Connection } from '@libp2p/interface-connection'
import type { TcpMetrics } from './metrics.js'

const log = logger('libp2p:tcp')

export { TcpMetrics }

export interface TCPOptions {
/**
* An optional number in ms that is used as an inactivity timeout after which the socket will be closed
Expand Down Expand Up @@ -51,9 +54,11 @@ export interface TCPCreateListenerOptions extends CreateListenerOptions, TCPSock

export class TCP implements Transport {
private readonly opts: TCPOptions
private readonly metrics: TcpMetrics | null

constructor (options: TCPOptions = {}) {
constructor (options: TCPOptions = {}, metrics?: TcpMetrics | null) {
this.opts = options
this.metrics = metrics ?? null
}

get [symbol] (): true {
Expand All @@ -72,6 +77,7 @@ export class TCP implements Transport {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.socketEvents.inc({ event: 'error' })
})

const maConn = toMultiaddrConnection(socket, {
Expand Down Expand Up @@ -107,6 +113,7 @@ export class TCP implements Transport {

const onTimeout = () => {
log('connection timeout %s', cOptsStr)
this.metrics?.listenerErrors.inc({ error: 'outbound_connection_timeout' })

const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
// Note: this will result in onError() being called
Expand Down Expand Up @@ -159,7 +166,8 @@ export class TCP implements Transport {
return new TcpListener({
...options,
socketInactivityTimeout: this.opts.inboundSocketInactivityTimeout,
socketCloseTimeout: this.opts.socketCloseTimeout
socketCloseTimeout: this.opts.socketCloseTimeout,
metrics: this.metrics
})
}

Expand Down
65 changes: 35 additions & 30 deletions src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { MultiaddrConnection, Connection } from '@libp2p/interface-connecti
import type { Upgrader, Listener } from '@libp2p/interface-transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { TCPCreateListenerOptions } from './index.js'
import { ServerStatusMetric, TcpMetrics } from './metrics.js'

const log = logger('libp2p:tcp:listener')

Expand All @@ -30,6 +31,7 @@ interface Context extends TCPCreateListenerOptions {
upgrader: Upgrader
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics: TcpMetrics | null
}

type Status = {started: false} | {started: true, listeningAddr: Multiaddr, peerId: string | null }
Expand All @@ -38,6 +40,7 @@ export class TcpListener extends EventEmitter<any> implements Listener {
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private readonly metrics: TcpMetrics | null

private status: Status = { started: false }

Expand All @@ -46,64 +49,66 @@ export class TcpListener extends EventEmitter<any> implements Listener {

context.keepAlive = context.keepAlive ?? true

this.server = net.createServer(context, this.onSocket.bind(this))
this.server = net.createServer(context, socket => {
this.onSocket(socket).catch(e => log('onSocket error', e))
})

this.server
.on('listening', () => this.dispatchEvent(new CustomEvent('listening')))
.on('error', err => this.dispatchEvent(new CustomEvent<Error>('error', { detail: err })))
.on('close', () => this.dispatchEvent(new CustomEvent('close')))

this.metrics = context.metrics
this.metrics?.connections.addCollect(() => {
this.metrics?.connections.set(this.connections.size)
this.metrics?.serverStatus.set(this.status.started ? ServerStatusMetric.started : ServerStatusMetric.stopped)
})
}

private onSocket (socket: net.Socket) {
private async onSocket (socket: net.Socket) {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
this.metrics?.socketEvents.inc({ event: 'error' })
})

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, {
listeningAddr: this.status.started ? this.status.listeningAddr : undefined,
socketInactivityTimeout: this.context.socketInactivityTimeout,
socketCloseTimeout: this.context.socketCloseTimeout
socketCloseTimeout: this.context.socketCloseTimeout,
metrics: this.metrics
})
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.listenerErrors.inc({ error: 'inbound_to_connection' })
return
}

log('new inbound connection %s', maConn.remoteAddr)
try {
this.context.upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
this.connections.delete(maConn)
})

if (this.context.handler != null) {
this.context.handler(conn)
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
})
.catch(async err => {
log.error('inbound connection failed', err)

await attemptClose(maConn)
})
.catch(err => {
log.error('closing inbound connection failed', err)
})
const conn = await this.context.upgrader.upgradeInbound(maConn)
log('inbound connection %s upgraded', maConn.remoteAddr)
this.connections.add(maConn)

socket.once('close', () => {
this.connections.delete(maConn)
})

if (this.context.handler != null) {
this.context.handler(conn)
}

this.dispatchEvent(new CustomEvent<Connection>('connection', { detail: conn }))
} catch (err) {
log.error('inbound connection failed', err)
this.metrics?.listenerErrors.inc({ error: 'inbound_upgrade' })

attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
})
attemptClose(maConn).catch(err => {
log.error('closing inbound connection failed', err)
this.metrics?.listenerErrors.inc({ error: 'inbound_closing_failed' })
})
}
}

Expand Down
34 changes: 34 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/* eslint-disable etc/prefer-interface, @typescript-eslint/method-signature-style */

type LabelsGeneric = Record<string, string | undefined>
type CollectFn<Labels extends LabelsGeneric> = (metric: Gauge<Labels>) => void

interface Gauge<Labels extends LabelsGeneric = never> {
// Sorry for this mess, `prom-client` API choices are not great
// If the function signature was `inc(value: number, labels?: Labels)`, this would be simpler
inc(value?: number): void
inc(labels: Labels, value?: number): void
inc(arg1?: Labels | number, arg2?: number): void

dec(value?: number): void
dec(labels: Labels, value?: number): void
dec(arg1?: Labels | number, arg2?: number): void

set(value: number): void
set(labels: Labels, value: number): void
set(arg1?: Labels | number, arg2?: number): void

addCollect(collectFn: CollectFn<Labels>): void
}

export enum ServerStatusMetric {
stopped = 0,
started = 1
}

export interface TcpMetrics {
serverStatus: Gauge
connections: Gauge
listenerErrors: Gauge<{error: string}>
socketEvents: Gauge<{event: string}>
}
9 changes: 7 additions & 2 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import errCode from 'err-code'
import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interface-connection'
import type { TcpMetrics } from './metrics.js'

const log = logger('libp2p:tcp:socket')

Expand All @@ -19,14 +20,15 @@ interface ToConnectionOptions {
signal?: AbortSignal
socketInactivityTimeout?: number
socketCloseTimeout?: number
metrics?: TcpMetrics | null
}

/**
* Convert a socket into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOptions) => {
options = options ?? {}
export const toMultiaddrConnection = (socket: Socket, options: ToConnectionOptions) => {
const metrics = options.metrics
const inactivityTimeout = options.socketInactivityTimeout ?? SOCKET_TIMEOUT
const closeTimeout = options.socketCloseTimeout ?? CLOSE_TIMEOUT

Expand Down Expand Up @@ -61,6 +63,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#socketsettimeouttimeout-callback
socket.setTimeout(inactivityTimeout, () => {
log('%s socket read timeout', lOptsStr)
metrics?.socketEvents.inc({ event: 'timeout' })

// only destroy with an error if the remote has not sent the FIN message
let err: Error | undefined
Expand All @@ -75,6 +78,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti

socket.once('close', () => {
log('%s socket read timeout', lOptsStr)
metrics?.socketEvents.inc({ event: 'close' })

// In instances where `close` was not explicitly called,
// such as an iterable stream ending, ensure we have set the close
Expand All @@ -88,6 +92,7 @@ export const toMultiaddrConnection = (socket: Socket, options?: ToConnectionOpti
// the remote sent a FIN packet which means no more data will be sent
// https://nodejs.org/dist/latest-v16.x/docs/api/net.html#event-end
log('socket ended', maConn.remoteAddr.toString())
metrics?.socketEvents.inc({ event: 'end' })
})

const maConn: MultiaddrConnection = {
Expand Down

0 comments on commit bc0bf79

Please sign in to comment.