Skip to content

Commit

Permalink
fix: add more tracing on Uploader (#558)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos authored Sep 30, 2024
1 parent bf91ce2 commit da3ce61
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 72 deletions.
131 changes: 72 additions & 59 deletions src/http/plugins/log-request.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import fastifyPlugin from 'fastify-plugin'
import { logSchema, redactQueryParamFromRequest } from '@internal/monitoring'
import { trace } from '@opentelemetry/api'
import { FastifyRequest } from 'fastify/types/request'
import { FastifyReply } from 'fastify/types/reply'

interface RequestLoggerOptions {
excludeUrls?: string[]
Expand All @@ -20,13 +22,32 @@ declare module 'fastify' {
}
}

/**
* Request logger plugin
* @param options
*/
export const logRequest = (options: RequestLoggerOptions) =>
fastifyPlugin(
async (fastify) => {
fastify.addHook('onRequest', async (req) => {
fastify.addHook('onRequest', async (req, res) => {
req.startTime = Date.now()

// Request was aborted before the server finishes to return a response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
if (aborted) {
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED RES',
responseTime: (Date.now() - req.startTime) / 1000,
})
}
})
})

/**
* Adds req.resources and req.operation to the request object
*/
fastify.addHook('preHandler', async (req) => {
const resourceFromParams = Object.values(req.params || {}).join('/')
const resources = getFirstDefined<string[]>(
Expand All @@ -53,75 +74,67 @@ export const logRequest = (options: RequestLoggerOptions) =>
})

fastify.addHook('onRequestAbort', async (req) => {
if (options.excludeUrls?.includes(req.url)) {
return
}

const rMeth = req.method
const rUrl = redactQueryParamFromRequest(req, [
'token',
'X-Amz-Credential',
'X-Amz-Signature',
'X-Amz-Security-Token',
])
const uAgent = req.headers['user-agent']
const rId = req.id
const cIP = req.ip
const error = (req.raw as any).executionError || req.executionError
const tenantId = req.tenantId

const buildLogMessage = `${tenantId} | ${rMeth} | ABORTED | ${cIP} | ${rId} | ${rUrl} | ${uAgent}`

logSchema.request(req.log, buildLogMessage, {
type: 'request',
req,
doRequestLog(req, {
excludeUrls: options.excludeUrls,
statusCode: 'ABORTED REQ',
responseTime: (Date.now() - req.startTime) / 1000,
error: error,
owner: req.owner,
operation: req.operation?.type ?? req.routeConfig.operation?.type,
resources: req.resources,
serverTimes: req.serverTimings,
})
})

fastify.addHook('onResponse', async (req, reply) => {
if (options.excludeUrls?.includes(req.url)) {
return
}

const rMeth = req.method
const rUrl = redactQueryParamFromRequest(req, [
'token',
'X-Amz-Credential',
'X-Amz-Signature',
'X-Amz-Security-Token',
])
const uAgent = req.headers['user-agent']
const rId = req.id
const cIP = req.ip
const statusCode = reply.statusCode
const error = (req.raw as any).executionError || req.executionError
const tenantId = req.tenantId

const buildLogMessage = `${tenantId} | ${rMeth} | ${statusCode} | ${cIP} | ${rId} | ${rUrl} | ${uAgent}`

logSchema.request(req.log, buildLogMessage, {
type: 'request',
req,
res: reply,
responseTime: reply.getResponseTime(),
error: error,
owner: req.owner,
role: req.jwtPayload?.role,
resources: req.resources,
operation: req.operation?.type ?? req.routeConfig.operation?.type,
serverTimes: req.serverTimings,
doRequestLog(req, {
reply,
excludeUrls: options.excludeUrls,
statusCode: reply.statusCode,
responseTime: reply.elapsedTime,
})
})
},
{ name: 'log-request' }
)

interface LogRequestOptions {
reply?: FastifyReply
excludeUrls?: string[]
statusCode: number | 'ABORTED REQ' | 'ABORTED RES'
responseTime: number
}

function doRequestLog(req: FastifyRequest, options: LogRequestOptions) {
if (options.excludeUrls?.includes(req.url)) {
return
}

const rMeth = req.method
const rUrl = redactQueryParamFromRequest(req, [
'token',
'X-Amz-Credential',
'X-Amz-Signature',
'X-Amz-Security-Token',
])
const uAgent = req.headers['user-agent']
const rId = req.id
const cIP = req.ip
const statusCode = options.statusCode
const error = (req.raw as any).executionError || req.executionError

Check warning on line 119 in src/http/plugins/log-request.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 20

Unexpected any. Specify a different type
const tenantId = req.tenantId

const buildLogMessage = `${tenantId} | ${rMeth} | ${statusCode} | ${cIP} | ${rId} | ${rUrl} | ${uAgent}`

logSchema.request(req.log, buildLogMessage, {
type: 'request',
req,
res: options.reply,
responseTime: options.responseTime,
error: error,
owner: req.owner,
role: req.jwtPayload?.role,
resources: req.resources,
operation: req.operation?.type ?? req.routeConfig.operation?.type,
serverTimes: req.serverTimings,
})
}

function getFirstDefined<T>(...values: any[]): T | undefined {
for (const value of values) {
if (value !== undefined) {
Expand Down
2 changes: 1 addition & 1 deletion src/http/plugins/signals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const signals = fastifyPlugin(
disconnect: new AbortController(),
}

// Client terminated the request before the body was fully received
// Client terminated the request before server finished sending the response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
if (aborted) {
Expand Down
28 changes: 28 additions & 0 deletions src/http/plugins/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,34 @@ export const traceServerTime = fastifyPlugin(
if (!tracingEnabled) {
return
}
fastify.addHook('onRequest', async (req, res) => {
// Request was aborted before the server finishes to return a response
res.raw.once('close', () => {
const aborted = !res.raw.writableFinished
if (aborted) {
try {
const span = trace.getSpan(context.active())
const traceId = span?.spanContext().traceId

span?.setAttribute('res_aborted', true)

if (traceId) {
const spans = traceCollector.getSpansForTrace(traceId)
if (spans) {
req.serverTimings = spansToServerTimings(spans, true)
}
traceCollector.clearTrace(traceId)
}
} catch (e) {
logSchema.error(logger, 'failed parsing server times on abort', {
error: e,
type: 'otel',
})
}
}
})
})

fastify.addHook('onResponse', async (request, reply) => {
try {
const traceId = trace.getSpan(context.active())?.spanContext().traceId
Expand Down
1 change: 1 addition & 0 deletions src/http/routes/object/createObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export default async function routes(fastify: FastifyInstance) {
objectName,
owner,
isUpsert,
signal: request.signals.body.signal,
})

return response.status(objectMetadata?.httpStatusCode ?? 200).send({
Expand Down
7 changes: 6 additions & 1 deletion src/http/routes/s3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ export default async function routes(fastify: FastifyInstance) {
if (route.disableContentTypeParser) {
reply.header('connection', 'close')
reply.raw.on('finish', () => {
req.raw.destroy()
// wait sometime so that the client can receive the response
setTimeout(() => {
if (!req.raw.destroyed) {
req.raw.destroy()
}
}, 3000)
})
}
throw e
Expand Down
2 changes: 1 addition & 1 deletion src/internal/auth/jwt.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as crypto from 'crypto'
import * as crypto from 'node:crypto'
import jwt from 'jsonwebtoken'
import { ERRORS } from '@internal/errors'

Expand Down
2 changes: 1 addition & 1 deletion src/internal/database/tenant.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import crypto from 'crypto'
import crypto from 'node:crypto'
import { getConfig } from '../../config'
import { decrypt, encrypt, verifyJWT } from '../auth'
import { multitenantKnex } from './multitenant-db'
Expand Down
2 changes: 1 addition & 1 deletion src/internal/monitoring/logger.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pino, { BaseLogger } from 'pino'
import { getConfig } from '../../config'
import { FastifyReply, FastifyRequest } from 'fastify'
import { URL } from 'url'
import { URL } from 'node:url'
import { normalizeRawError } from '@internal/errors'

const { logLevel, logflareApiKey, logflareSourceToken, logflareEnabled, region } = getConfig()
Expand Down
32 changes: 30 additions & 2 deletions src/internal/monitoring/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'
import { SpanExporter, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'
import * as grpc from '@grpc/grpc-js'
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
import { IncomingMessage } from 'http'
import { IncomingMessage } from 'node:http'
import { logger, logSchema } from '@internal/monitoring/logger'
import { traceCollector } from '@internal/monitoring/otel-processor'
import { ClassInstrumentation } from './otel-instrumentation'
Expand All @@ -32,6 +32,7 @@ import { StorageKnexDB } from '@storage/database'
import { TenantConnection } from '@internal/database'
import { S3Store } from '@tus/s3-store'
import { Upload } from '@aws-sdk/lib-storage'
import { StreamSplitter } from '@tus/server'

const tracingEnabled = process.env.TRACING_ENABLED === 'true'
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
Expand Down Expand Up @@ -222,11 +223,31 @@ const sdk = new NodeSDK({
'getUpload',
'declareUploadLength',
'uploadIncompletePart',
'uploadPart',
'downloadIncompletePart',
'uploadParts',
],
setName: (name) => 'Tus.' + name,
}),
new ClassInstrumentation({
targetClass: StreamSplitter,
enabled: true,
methodsToInstrument: ['emitEvent'],
setName: (name: string, attrs: any) => {
if (attrs.event) {
return name + '.' + attrs.event
}
return name
},
setAttributes: {
emitEvent: function (event) {
return {
part: this.part as any,
event,
}
},
},
}),
new ClassInstrumentation({
targetClass: S3Client,
enabled: true,
Expand All @@ -243,7 +264,14 @@ const sdk = new NodeSDK({
new ClassInstrumentation({
targetClass: Upload,
enabled: true,
methodsToInstrument: ['done', '__notifyProgress'],
methodsToInstrument: [
'done',
'__doConcurrentUpload',
'__uploadUsingPut',
'__createMultipartUpload',
'__notifyProgress',
'markUploadAsAborted',
],
}),
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-http': {
Expand Down
2 changes: 1 addition & 1 deletion src/internal/pubsub/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import EventEmitter from 'events'
import EventEmitter from 'node:events'
import createSubscriber, { Subscriber } from 'pg-listen'
import { ERRORS } from '@internal/errors'
import { logger, logSchema } from '@internal/monitoring'
Expand Down
2 changes: 1 addition & 1 deletion src/start/server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import '@internal/monitoring/otel'
import { FastifyInstance } from 'fastify'
import { IncomingMessage, Server, ServerResponse } from 'http'
import { IncomingMessage, Server, ServerResponse } from 'node:http'

import build from '../app'
import buildAdmin from '../admin-app'
Expand Down
3 changes: 1 addition & 2 deletions src/storage/backend/s3.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CompleteMultipartUploadCommandOutput,
CopyObjectCommand,
CreateMultipartUploadCommand,
DeleteObjectCommand,
Expand Down Expand Up @@ -228,7 +227,7 @@ export class S3Backend implements StorageBackendAdapter {
{ once: true }
)

const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput
const data = await paralellUploadS3.done()

const metadata = await this.headObject(bucketName, key, version)

Expand Down
2 changes: 1 addition & 1 deletion src/storage/object.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { FastifyRequest } from 'fastify'
import { randomUUID } from 'crypto'
import { randomUUID } from 'node:crypto'
import { SignedUploadToken, signJWT, verifyJWT } from '@internal/auth'
import { ERRORS } from '@internal/errors'
import { getJwtSecret } from '@internal/database'
Expand Down
2 changes: 1 addition & 1 deletion src/storage/protocols/s3/s3-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ function toAwsMeatadataHeaders(records: Record<string, any>) {
if (records) {
Object.keys(records).forEach((key) => {
const value = records[key]
if (isUSASCII(value)) {
if (value && isUSASCII(value)) {
metadataHeaders['x-amz-meta-' + key.toLowerCase()] = value
} else {
missingCount++
Expand Down

0 comments on commit da3ce61

Please sign in to comment.