Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WebSockets metrics #2649

Merged
merged 12 commits into from
Aug 15, 2024
39 changes: 36 additions & 3 deletions packages/transport-websockets/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import * as filters from './filters.js'
import { createListener } from './listener.js'
import { socketToMaConn } from './socket-to-conn.js'
import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents } from '@libp2p/interface'
import type { Transport, MultiaddrFilter, CreateListenerOptions, DialTransportOptions, Listener, AbortOptions, ComponentLogger, Logger, Connection, OutboundConnectionUpgradeEvents, Metrics, CounterGroup } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
Expand All @@ -82,6 +82,13 @@

export interface WebSocketsComponents {
logger: ComponentLogger
metrics?: Metrics
}

export interface WebSocketsMetrics {
dialerEvents: CounterGroup
listenerEvents: CounterGroup
component: Metrics
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
}

export type WebSocketsDialEvents =
Expand All @@ -92,11 +99,26 @@
private readonly log: Logger
private readonly init?: WebSocketsInit
private readonly logger: ComponentLogger
private readonly metrics?: WebSocketsMetrics

constructor (components: WebSocketsComponents, init?: WebSocketsInit) {
this.log = components.logger.forComponent('libp2p:websockets')
this.logger = components.logger
this.init = init

if (components.metrics != null) {
this.metrics = {
component: components.metrics,
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
dialerEvents: components.metrics.registerCounterGroup('libp2p_websockets_dialer_events_total', {
label: 'event',
help: 'Total count of WebSockets dialer events by type'
}),
listenerEvents: components.metrics.registerCounterGroup('libp2p_websockets_listener_events_total', {
label: 'event',
help: 'Total count of WebSockets listener events by type'
})
}
}

Check warning on line 121 in packages/transport-websockets/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/index.ts#L110-L121

Added lines #L110 - L121 were not covered by tests
}

readonly [transportSymbol] = true
Expand All @@ -113,11 +135,13 @@

const socket = await this._connect(ma, options)
const maConn = socketToMaConn(socket, ma, {
logger: this.logger
logger: this.logger,
metrics: this.metrics?.dialerEvents
})
this.log('new outbound connection %s', maConn.remoteAddr)

const conn = await options.upgrader.upgradeOutbound(maConn, options)
this.metrics?.dialerEvents.increment({ upgrade_success: true })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need an upgrade_error somewhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense.

this.log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
Expand All @@ -136,22 +160,30 @@
// https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/error_event
const err = new CodeError(`Could not connect to ${ma.toString()}`, 'ERR_CONNECTION_FAILED')
this.log.error('connection error:', err)
this.metrics?.dialerEvents.increment({ socket_open_error: true })
errorPromise.reject(err)
})

try {
options.onProgress?.(new CustomProgressEvent('websockets:open-connection'))
await raceSignal(Promise.race([rawSocket.connected(), errorPromise.promise]), options.signal)
} catch (err: any) {
if (options.signal?.aborted === true) {
this.metrics?.dialerEvents.increment({ socket_open_abort: true })
} else {
this.metrics?.dialerEvents.increment({ socket_open_error: true })
}
rawSocket.close()
.catch(err => {
this.log.error('error closing raw socket', err)
this.metrics?.dialerEvents.increment({ socket_close_error: true })

Check warning on line 179 in packages/transport-websockets/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/index.ts#L179

Added line #L179 was not covered by tests
})

throw err
}

this.log('connected %s', ma)
this.metrics?.dialerEvents.increment({ socket_open_success: true })
return rawSocket
}

Expand All @@ -162,7 +194,8 @@
*/
createListener (options: CreateListenerOptions): Listener {
return createListener({
logger: this.logger
logger: this.logger,
metrics: this.metrics?.listenerEvents
}, {
...this.init,
...options
Expand Down
13 changes: 11 additions & 2 deletions packages/transport-websockets/src/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import { multiaddr, protocols } from '@multiformats/multiaddr'
import { createServer } from 'it-ws/server'
import { socketToMaConn } from './socket-to-conn.js'
import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions } from '@libp2p/interface'
import type { ComponentLogger, Logger, Connection, Listener, ListenerEvents, CreateListenerOptions, CounterGroup } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Server } from 'http'
import type { DuplexWebSocket } from 'it-ws/duplex'
import type { WebSocketServer } from 'it-ws/server'

export interface WebSocketListenerComponents {
logger: ComponentLogger
metrics?: CounterGroup
}

export interface WebSocketListenerInit extends CreateListenerOptions {
Expand All @@ -28,6 +29,7 @@
super()

this.log = components.logger.forComponent('libp2p:websockets:listener')
const metrics = components.metrics
// Keep track of open connections to destroy when the listener is closed
this.connections = new Set<DuplexWebSocket>()

Expand All @@ -36,8 +38,12 @@
this.server = createServer({
...init,
onConnection: (stream: DuplexWebSocket) => {
const listeningAddrDetails = this.listeningMultiaddr?.toOptions()
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

const maConn = socketToMaConn(stream, toMultiaddr(stream.remoteAddress ?? '', stream.remotePort ?? 0), {
logger: components.logger
logger: components.logger,
metrics,
metricPrefix: `${listeningAddrDetails?.transport}_${listeningAddrDetails?.host}:${listeningAddrDetails?.port} `
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved
})
this.log('new inbound connection %s', maConn.remoteAddr)

Expand All @@ -51,6 +57,7 @@
void init.upgrader.upgradeInbound(maConn)
.then((conn) => {
this.log('inbound connection %s upgraded', maConn.remoteAddr)
metrics?.increment({ upgrade_success: true })
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

if (init?.handler != null) {
init?.handler(conn)
Expand All @@ -62,11 +69,13 @@
})
.catch(async err => {
this.log.error('inbound connection failed to upgrade', err)
metrics?.increment({ upgrade_error: true })

Check warning on line 72 in packages/transport-websockets/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/listener.ts#L72

Added line #L72 was not covered by tests

await maConn.close().catch(err => {
this.log.error('inbound connection failed to close after upgrade failed', err)
})
})
metrics?.increment({ socket_open_success: true })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could fire after the .catch handler. should remove

} catch (err) {
this.log.error('inbound connection failed to upgrade', err)
maConn.close().catch(err => {
Expand Down
12 changes: 11 additions & 1 deletion packages/transport-websockets/src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
import { CodeError } from '@libp2p/interface'
import { CLOSE_TIMEOUT } from './constants.js'
import type { AbortOptions, ComponentLogger, MultiaddrConnection } from '@libp2p/interface'
import type { AbortOptions, ComponentLogger, CounterGroup, MultiaddrConnection } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { DuplexWebSocket } from 'it-ws/duplex'

export interface SocketToConnOptions {
localAddr?: Multiaddr
logger: ComponentLogger
metrics?: CounterGroup
metricPrefix?: string
}

// Convert a stream into a MultiaddrConnection
// https://github.com/libp2p/interface-transport#multiaddrconnection
export function socketToMaConn (stream: DuplexWebSocket, remoteAddr: Multiaddr, options: SocketToConnOptions): MultiaddrConnection {
const log = options.logger.forComponent('libp2p:websockets:maconn')
const metrics = options.metrics
const metricPrefix = options.metricPrefix ?? ''

const maConn: MultiaddrConnection = {
log,
Expand All @@ -30,8 +34,10 @@
})())
} catch (err: any) {
if (err.type !== 'aborted') {
metrics?.increment({ [`${metricPrefix}maconn_sink_error`]: true })

Check warning on line 37 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L37

Added line #L37 was not covered by tests
log.error(err)
}
metrics?.increment({ [`${metricPrefix}maconn_sink_abort`]: true })

Check warning on line 40 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L40

Added line #L40 was not covered by tests
}
},

Expand All @@ -57,6 +63,7 @@
const { host, port } = maConn.remoteAddr.toOptions()
log('timeout closing stream to %s:%s after %dms, destroying it manually',
host, port, Date.now() - start)
metrics?.increment({ [`${metricPrefix}maconn_close_abort`]: true })

Check warning on line 66 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L66

Added line #L66 was not covered by tests
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the this.abort(err), that calls stream.destroy will trigger this.. i'm trying to avoid duplicate abort and error events but might have missed something


this.abort(new CodeError('Socket close timeout', 'ERR_SOCKET_CLOSE_TIMEOUT'))
}
Expand All @@ -65,8 +72,10 @@

try {
await stream.close()
metrics?.increment({ [`${metricPrefix}maconn_close_success`]: true })
} catch (err: any) {
log.error('error closing WebSocket gracefully', err)
metrics?.increment({ [`${metricPrefix}maconn_close_error`]: true })

Check warning on line 78 in packages/transport-websockets/src/socket-to-conn.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-websockets/src/socket-to-conn.ts#L78

Added line #L78 was not covered by tests
this.abort(err)
} finally {
options.signal?.removeEventListener('abort', listener)
Expand All @@ -91,6 +100,7 @@
if (maConn.timeline.close == null) {
maConn.timeline.close = Date.now()
}
metrics?.increment({ [`${metricPrefix}maconn_socket_close_success`]: true })
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may not want this one? but based on the comment on lines 97-99, it seems like it could be useful

}, { once: true })

return maConn
Expand Down
Loading