From 7190f4ba48c40f4a3f4561253dbd9cdbebcfd4d3 Mon Sep 17 00:00:00 2001 From: Francois Blanchette Date: Wed, 22 Nov 2023 14:09:41 -0500 Subject: [PATCH] [PINAX-337] fix issues with Victoriametrics sync conflicting parameters --- bin/cli.ts | 8 +++----- index.ts | 13 +++++-------- src/csv.ts | 15 ++++----------- src/victoria_metrics.ts | 2 ++ 4 files changed, 14 insertions(+), 24 deletions(-) diff --git a/bin/cli.ts b/bin/cli.ts index e661139..d771386 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -1,6 +1,6 @@ #!/usr/bin/env node import { commander } from "substreams-sink"; -import { action, DEFAULT_VERBOSE, DEFAULT_ADDRESS, DEFAULT_PORT, DEFAULT_SCRAPE_INTERVAL, DEFAULT_CSV_ROOT, DEFAULT_FOLDER_GRANULAR, DEFAULT_FILE_GRANULAR } from "../index.js" +import { action, DEFAULT_VERBOSE, DEFAULT_HOST, DEFAULT_SCRAPE_INTERVAL, DEFAULT_CSV_ROOT, DEFAULT_FOLDER_GRANULAR, DEFAULT_FILE_GRANULAR } from "../index.js" import { actionExportCsv, actionImportCsv } from "../src/csv.js" import { handleLabels } from "../src/prom.js"; @@ -8,8 +8,7 @@ import pkg from "../package.json" assert { type: "json" }; const program = commander.program(pkg); commander.run(program, pkg) - .option('-p --port ', 'Listens on port number.', String(DEFAULT_PORT)) - .option('-a --address ', 'VictoriaMetrics address to connect.', DEFAULT_ADDRESS) + .option('--host ', `VictoriaMetrics address to connect (e.g. ${DEFAULT_HOST}).`, DEFAULT_HOST) .option('-i --scrape-interval ', 'Scrape Interval', String(DEFAULT_SCRAPE_INTERVAL)) .option('-l --labels [...string]', "To apply generic labels to all default metrics (ex: --labels foo=bar)", handleLabels, {}) .action(action) @@ -28,9 +27,8 @@ commander.run(cmdCsv, pkg).name("export") // csv import cmdCsv.command("import") .description("Import CSV files to VictoriaMetrics") + .option('--host ', `VictoriaMetrics address to connect (e.g. ${DEFAULT_HOST}).`, DEFAULT_HOST) .option('--verbose', 'Enable verbose logging', DEFAULT_VERBOSE) - .option('-p --port ', 'Listens on port number.', String(DEFAULT_PORT)) - .option('-a --address ', 'VictoriaMetrics address to connect.', DEFAULT_ADDRESS) .option('--csv-root ', 'CSV root', String(DEFAULT_CSV_ROOT)) .option('-l --labels [...string]', "To apply generic labels to all default metrics (ex: --labels foo=bar)", handleLabels, {}) .action(actionImportCsv) diff --git a/index.ts b/index.ts index 89d8eb4..21d1992 100644 --- a/index.ts +++ b/index.ts @@ -9,8 +9,7 @@ logger.setName(pkg.name); export { logger }; // default user options -export const DEFAULT_ADDRESS = '0.0.0.0'; -export const DEFAULT_PORT = 8428; +export const DEFAULT_HOST = 'http://0.0.0.0:8428' export const DEFAULT_SCRAPE_INTERVAL = 30; export const DEFAULT_COLLECT_DEFAULT_METRICS = false; export const DEFAULT_CSV_ROOT = './csv' @@ -20,17 +19,15 @@ export const DEFAULT_VERBOSE = false // Custom user options interface export interface ActionOptions extends commander.RunOptions { - address: string; - port: number; + host: string; scrapeInterval: number; labels: Object; manifest: string } export async function action(options: ActionOptions) { - // Get command options - const { address, port, scrapeInterval } = options; - const url = `http://${address}:${port}/api/v1/import/prometheus` + const url = `${options.host}/api/v1/import/prometheus` + logger.info("url", url) // Download substreams const spkg = await fetchSubstream(options.manifest); @@ -40,7 +37,7 @@ export async function action(options: ActionOptions) { // Run substreams const { emitter } = await setup(options); emitter.on("anyMessage", (messages, _cursor, clock) => { - handleImport(url, scrapeInterval, clock); + handleImport(url, options.scrapeInterval, clock); handleOperations(messages as any); }); diff --git a/src/csv.ts b/src/csv.ts index 621ac3e..ccf261b 100644 --- a/src/csv.ts +++ b/src/csv.ts @@ -9,8 +9,7 @@ const EPOCH_HEADER = "#epoch" type Row = Map export interface ActionOptions extends commander.RunOptions { - address: string; - port: number; + host: string; scrapeInterval: number; labels: Record; csvRoot: string; @@ -141,13 +140,8 @@ export async function actionExportCsv(options: ActionOptions) { } } - // Get command options - const { address, port, scrapeInterval } = options; - logger.info("options:", options) - console.log(options) - - logger.info(`vitals: ${address} ${port} ${scrapeInterval}`) + logger.info(`vitals: ${options.host} ${options.scrapeInterval}`) // Download substreams const substreamPackage = await readPackage(options.manifest); @@ -168,7 +162,7 @@ export async function actionExportCsv(options: ActionOptions) { const { emitter } = await setup(options); emitter.on("anyMessage", (messages, cursor, clock) => { handleOperations(messages as any); - handleExport(scrapeInterval, clock); + handleExport(options.scrapeInterval, clock); }); // Start streaming @@ -219,8 +213,7 @@ export async function actionImportCsv(options: ActionOptions) { return injectedLabels.length !== 0 ? `${line}${csvSeparator}${injectedLabels}` : line }).join(lineSeparator) - const { address, port } = options; - const url = `http://${address}:${port}/api/v1/import/csv?format=` + mapping.join(formatSeparator) + const url = `${options.host}/api/v1/import/csv?format=` + mapping.join(formatSeparator) logger.debug(`URL: ${url}`) logger.debug(`BODY: ${body}`) await fetch(url, { method: 'POST', body }).catch((error) => { diff --git a/src/victoria_metrics.ts b/src/victoria_metrics.ts index c0b82c0..7cf1711 100644 --- a/src/victoria_metrics.ts +++ b/src/victoria_metrics.ts @@ -11,6 +11,8 @@ export function appendEpoch(metrics: string, epoch: number) { } export async function handleImport(url: string, scrapeInterval: number, clock: any) { + logger.info("handleImport") + if (!clock.timestamp) return; // no timestamp (yet const epoch = clock.timestamp.toDate().valueOf(); if (epoch / 1000 % scrapeInterval != 0) return; // only handle epoch intervals