Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Measure latency of RPC to rate-limit service and duration of report parsing" #6355

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion integration-tests/docker-compose.integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ services:
usage:
environment:
RATE_LIMIT_ENDPOINT: '${RATE_LIMIT_ENDPOINT}'
RATE_LIMIT_TTL: 1000
LOG_LEVEL: debug
depends_on:
broker:
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/api/rate-limit/emails.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test('rate limit approaching and reached for organization', async () => {

// Make sure we don't send the same email again
const collectEvenMoreResult = await collectOperations([op, op]);
expect(collectEvenMoreResult.status).toEqual(429);
expect(collectEvenMoreResult.status).toEqual(200);

await waitFor(5000);

Expand Down
1 change: 0 additions & 1 deletion packages/services/usage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"got": "14.4.5",
"graphql": "16.9.0",
"kafkajs": "2.2.4",
"lru-cache": "11.0.2",
"pino-pretty": "11.3.0",
"tiny-lru": "8.0.2",
"zod": "3.24.1"
Expand Down
2 changes: 0 additions & 2 deletions packages/services/usage/src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const EnvironmentModel = zod.object({
PORT: emptyString(NumberFromString.optional()),
TOKENS_ENDPOINT: zod.string().url(),
RATE_LIMIT_ENDPOINT: emptyString(zod.string().url().optional()),
RATE_LIMIT_TTL: emptyString(NumberFromString.optional()).default(30_000),
ENVIRONMENT: emptyString(zod.string().optional()),
RELEASE: emptyString(zod.string().optional()),
});
Expand Down Expand Up @@ -142,7 +141,6 @@ export const env = {
rateLimit: base.RATE_LIMIT_ENDPOINT
? {
endpoint: base.RATE_LIMIT_ENDPOINT,
ttl: base.RATE_LIMIT_TTL,
}
: null,
},
Expand Down
83 changes: 29 additions & 54 deletions packages/services/usage/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
httpRequestsWithNoAccess,
httpRequestsWithNonExistingToken,
httpRequestsWithoutToken,
parseReportDuration,
tokensDuration,
usedAPIVersion,
} from './metrics';
Expand Down Expand Up @@ -72,17 +71,12 @@ async function main() {
logger: server.log,
});

const rateLimit = createUsageRateLimit(
env.hive.rateLimit
? {
endpoint: env.hive.rateLimit.endpoint,
ttlMs: env.hive.rateLimit.ttl,
logger: server.log,
}
: {
logger: server.log,
},
);
const rateLimit = env.hive.rateLimit
? createUsageRateLimit({
endpoint: env.hive.rateLimit.endpoint,
logger: server.log,
})
: null;

server.route<{
Body: unknown;
Expand Down Expand Up @@ -166,23 +160,25 @@ async function main() {
status: 'success',
});

const isRateLimited = await rateLimit
.isRateLimited({
targetId: tokenInfo.target,
token,
})
.catch(error => {
authenticatedRequestLogger.error('Failed to check rate limit');
authenticatedRequestLogger.error(error);
Sentry.captureException(error, {
level: 'error',
});

// If we can't check rate limit, we should not drop the report
return false;
});
if (
await rateLimit
?.isRateLimited({
id: tokenInfo.target,
type: 'operations-reporting',
token,
entityType: 'target',
})
.catch(error => {
authenticatedRequestLogger.error('Failed to check rate limit');
authenticatedRequestLogger.error(error);
Sentry.captureException(error, {
level: 'error',
});

if (isRateLimited) {
// If we can't check rate limit, we should not drop the report
return false;
})
) {
droppedReports
.labels({ targetId: tokenInfo.target, orgId: tokenInfo.organization })
.inc();
Expand All @@ -197,15 +193,14 @@ async function main() {
return;
}

const retentionInfo = await rateLimit
.getRetentionForTargetId(tokenInfo.target)
.catch(error => {
const retentionInfo =
(await rateLimit?.getRetentionForTargetId?.(tokenInfo.target).catch(error => {
authenticatedRequestLogger.error(error);
Sentry.captureException(error, {
level: 'error',
});
return null;
});
})) || null;

if (typeof retentionInfo !== 'number') {
authenticatedRequestLogger.error('Failed to get retention info');
Expand All @@ -226,10 +221,7 @@ async function main() {
}

if (apiVersion === undefined || apiVersion === '1') {
const result = measureParsing(
() => usageProcessorV1(server.log, req.body as any, tokenInfo, retentionInfo),
'v1',
);
const result = usageProcessorV1(server.log, req.body as any, tokenInfo, retentionInfo);
collect(result.report);
stopTimer({
status: 'success',
Expand All @@ -239,10 +231,7 @@ async function main() {
operations: result.operations,
});
} else if (apiVersion === '2') {
const result = measureParsing(
() => usageProcessorV2(server.log, req.body, tokenInfo, retentionInfo),
'v2',
);
const result = usageProcessorV2(server.log, req.body, tokenInfo, retentionInfo);

if (result.success === false) {
stopTimer({
Expand Down Expand Up @@ -329,17 +318,3 @@ main().catch(err => {
console.error(err);
process.exit(1);
});

function measureParsing<T>(fn: () => T, version: 'v1' | 'v2'): T {
const stop = parseReportDuration.startTimer({ version });
try {
const result = fn();

return result;
} catch (error) {
Sentry.captureException(error);
throw error;
} finally {
stop();
}
}
12 changes: 0 additions & 12 deletions packages/services/usage/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ export const tokensDuration = new metrics.Histogram({
labelNames: ['status'],
});

export const rateLimitDuration = new metrics.Histogram({
name: 'usage_rate_limit_duration_seconds',
help: 'Duration of an HTTP Request to Rate Limit service in seconds',
labelNames: ['type'],
});

export const httpRequests = new metrics.Counter({
name: 'usage_http_requests',
help: 'Number of http requests',
Expand Down Expand Up @@ -115,9 +109,3 @@ export const usedAPIVersion = new metrics.Counter({
help: 'The used API version (x-api-version header)',
labelNames: ['version'],
});

export const parseReportDuration = new metrics.Histogram({
name: 'usage_parse_duration_seconds',
help: 'Duration of parsing a report in seconds',
labelNames: ['version'],
});
160 changes: 44 additions & 116 deletions packages/services/usage/src/rate-limit.ts
Original file line number Diff line number Diff line change
@@ -1,149 +1,77 @@
import { LRUCache } from 'lru-cache';
import type { RateLimitApi } from '@hive/rate-limit';
import LRU from 'tiny-lru';
import type { RateLimitApi, RateLimitApiInput, RateLimitApiOutput } from '@hive/rate-limit';
import { ServiceLogger } from '@hive/service-common';
import { createTRPCProxyClient, httpLink } from '@trpc/client';
import { rateLimitDuration } from './metrics';

interface IsRateLimitedInput {
targetId: string;
token: string;
}

type TargetId = string;
type Token = string;
// lru-cache does reference equality check on keys, so we can't use objects as keys.
type RateLimitedCacheKey = `${TargetId}::${Token}`;

const rateLimitCacheKey = {
encodeCacheKey(input: IsRateLimitedInput): RateLimitedCacheKey {
return `${input.targetId}::${input.token}` as RateLimitedCacheKey;
},
decodeCacheKey(key: string) {
const [targetId, token] = key.split('::');

if (!targetId || !token) {
throw new Error('Invalid input. Expected format: "<targetId>::<token>"');
}

// Quick check if it's UUID v4.
// Third group should start with '4'
// It does not have to be a strict UUID v4 check,
// we just need to make sure it's not the token instead.
if (targetId.charAt(14) !== '4' || targetId.charAt(13) !== '-') {
throw new Error('Invalid targetId. Expected UUID v4 format');
}

return {
targetId,
token,
};
},
};

export function createUsageRateLimit(
config: {
logger: ServiceLogger;
} & (
| {
endpoint: string;
ttlMs: number;
}
| {
endpoint?: null;
ttlMs?: null;
}
),
) {
export function createUsageRateLimit(config: { endpoint: string | null; logger: ServiceLogger }) {
const logger = config.logger;

if (!config.endpoint) {
logger.warn(`Usage service is not configured to use rate-limit (missing config)`);

return {
async isRateLimited(_input: IsRateLimitedInput): Promise<boolean> {
async isRateLimited(_input: RateLimitApiInput['checkRateLimit']): Promise<boolean> {
return false;
},
async getRetentionForTargetId(_targetId: string): Promise<number | null> {
return null;
},
};
}
const endpoint = config.endpoint.replace(/\/$/, '');
const rateLimit = createTRPCProxyClient<RateLimitApi>({
links: [
httpLink({
url: `${endpoint}/trpc`,
fetch(input, init) {
return fetch(input, {
...init,
// Abort requests that take longer than 5 seconds
signal: AbortSignal.timeout(5000),
});
},
fetch,
headers: {
'x-requesting-service': 'usage',
},
}),
],
});
const cache = LRU<Promise<RateLimitApiOutput['checkRateLimit'] | null>>(1000, 30_000);
const retentionCache = LRU<Promise<RateLimitApiOutput['getRetention'] | null>>(1000, 30_000);

const rateLimitCache = new LRUCache<RateLimitedCacheKey, boolean>({
max: 1000,
ttl: config.ttlMs,
allowStale: false,
// If a cache entry is stale or missing, this method is called
// to fill the cache with fresh data.
// This method is called only once per cache key,
// even if multiple requests are waiting for it.
async fetchMethod(input) {
const { targetId, token } = rateLimitCacheKey.decodeCacheKey(input);
const timer = rateLimitDuration.startTimer();
const result = await rateLimit.checkRateLimit
.query({
id: targetId,
type: 'operations-reporting',
token,
entityType: 'target',
})
.finally(() => {
timer({
type: 'rate-limit',
});
});

if (!result) {
return false;
}

return result.limited;
},
});
async function fetchFreshRetentionInfo(input: RateLimitApiInput['getRetention']) {
return rateLimit.getRetention.query(input);
}

const retentionCache = new LRUCache<string, number>({
max: 1000,
ttl: config.ttlMs,
// Allow to return stale data if the fetchMethod is slow
allowStale: false,
// If a cache entry is stale or missing, this method is called
// to fill the cache with fresh data.
// This method is called only once per cache key,
// even if multiple requests are waiting for it.
fetchMethod(targetId) {
const timer = rateLimitDuration.startTimer();
return rateLimit.getRetention.query({ targetId }).finally(() => {
timer({
type: 'retention',
});
});
},
});
async function fetchFreshLimitInfo(input: RateLimitApiInput['checkRateLimit']) {
return rateLimit.checkRateLimit.query(input);
}

return {
async getRetentionForTargetId(targetId: string) {
return (await retentionCache.fetch(targetId)) ?? null;
const retentionResponse = await retentionCache.get(targetId);

if (!retentionResponse) {
const result = fetchFreshRetentionInfo({ targetId });

if (result) {
retentionCache.set(targetId, result);

return result;
}

return null;
}

return retentionResponse;
},
async isRateLimited(input: IsRateLimitedInput) {
return (await rateLimitCache.fetch(rateLimitCacheKey.encodeCacheKey(input))) ?? false;
async isRateLimited(input: RateLimitApiInput['checkRateLimit']): Promise<boolean> {
const limitInfo = await cache.get(input.id);

if (!limitInfo) {
const result = fetchFreshLimitInfo(input);

if (result) {
cache.set(input.id, result);

return result.then(r => r !== null && r.limited);
}

return false;
}

return limitInfo.limited;
},
};
}
Loading
Loading