Skip to content

Commit

Permalink
[PINAX-337] fix issues with Victoriametrics sync conflicting parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
Francois Blanchette committed Nov 22, 2023
1 parent 8e4ec36 commit 7190f4b
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 24 deletions.
8 changes: 3 additions & 5 deletions bin/cli.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#!/usr/bin/env node
import { commander } from "substreams-sink";
import { action, DEFAULT_VERBOSE, DEFAULT_ADDRESS, DEFAULT_PORT, DEFAULT_SCRAPE_INTERVAL, DEFAULT_CSV_ROOT, DEFAULT_FOLDER_GRANULAR, DEFAULT_FILE_GRANULAR } from "../index.js"
import { action, DEFAULT_VERBOSE, DEFAULT_HOST, DEFAULT_SCRAPE_INTERVAL, DEFAULT_CSV_ROOT, DEFAULT_FOLDER_GRANULAR, DEFAULT_FILE_GRANULAR } from "../index.js"
import { actionExportCsv, actionImportCsv } from "../src/csv.js"
import { handleLabels } from "../src/prom.js";

import pkg from "../package.json" assert { type: "json" };

const program = commander.program(pkg);
commander.run(program, pkg)
.option('-p --port <int>', 'Listens on port number.', String(DEFAULT_PORT))
.option('-a --address <string>', 'VictoriaMetrics address to connect.', DEFAULT_ADDRESS)
.option('--host <string>', `VictoriaMetrics address to connect (e.g. ${DEFAULT_HOST}).`, DEFAULT_HOST)
.option('-i --scrape-interval <int>', 'Scrape Interval', String(DEFAULT_SCRAPE_INTERVAL))
.option('-l --labels [...string]', "To apply generic labels to all default metrics (ex: --labels foo=bar)", handleLabels, {})
.action(action)
Expand All @@ -28,9 +27,8 @@ commander.run(cmdCsv, pkg).name("export")
// csv import
cmdCsv.command("import")
.description("Import CSV files to VictoriaMetrics")
.option('--host <string>', `VictoriaMetrics address to connect (e.g. ${DEFAULT_HOST}).`, DEFAULT_HOST)
.option('--verbose', 'Enable verbose logging', DEFAULT_VERBOSE)
.option('-p --port <int>', 'Listens on port number.', String(DEFAULT_PORT))
.option('-a --address <string>', 'VictoriaMetrics address to connect.', DEFAULT_ADDRESS)
.option('--csv-root <string>', 'CSV root', String(DEFAULT_CSV_ROOT))
.option('-l --labels [...string]', "To apply generic labels to all default metrics (ex: --labels foo=bar)", handleLabels, {})
.action(actionImportCsv)
Expand Down
13 changes: 5 additions & 8 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ logger.setName(pkg.name);
export { logger };

// default user options
export const DEFAULT_ADDRESS = '0.0.0.0';
export const DEFAULT_PORT = 8428;
export const DEFAULT_HOST = 'http://0.0.0.0:8428'
export const DEFAULT_SCRAPE_INTERVAL = 30;
export const DEFAULT_COLLECT_DEFAULT_METRICS = false;
export const DEFAULT_CSV_ROOT = './csv'
Expand All @@ -20,17 +19,15 @@ export const DEFAULT_VERBOSE = false

// Custom user options interface
export interface ActionOptions extends commander.RunOptions {
address: string;
port: number;
host: string;
scrapeInterval: number;
labels: Object;
manifest: string
}

export async function action(options: ActionOptions) {
// Get command options
const { address, port, scrapeInterval } = options;
const url = `http://${address}:${port}/api/v1/import/prometheus`
const url = `${options.host}/api/v1/import/prometheus`
logger.info("url", url)

// Download substreams
const spkg = await fetchSubstream(options.manifest);
Expand All @@ -40,7 +37,7 @@ export async function action(options: ActionOptions) {
// Run substreams
const { emitter } = await setup(options);
emitter.on("anyMessage", (messages, _cursor, clock) => {
handleImport(url, scrapeInterval, clock);
handleImport(url, options.scrapeInterval, clock);
handleOperations(messages as any);
});

Expand Down
15 changes: 4 additions & 11 deletions src/csv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ const EPOCH_HEADER = "#epoch"
type Row = Map<string, string>

export interface ActionOptions extends commander.RunOptions {
address: string;
port: number;
host: string;
scrapeInterval: number;
labels: Record<string, string>;
csvRoot: string;
Expand Down Expand Up @@ -141,13 +140,8 @@ export async function actionExportCsv(options: ActionOptions) {
}
}

// Get command options
const { address, port, scrapeInterval } = options;

logger.info("options:", options)
console.log(options)

logger.info(`vitals: ${address} ${port} ${scrapeInterval}`)
logger.info(`vitals: ${options.host} ${options.scrapeInterval}`)

// Download substreams
const substreamPackage = await readPackage(options.manifest);
Expand All @@ -168,7 +162,7 @@ export async function actionExportCsv(options: ActionOptions) {
const { emitter } = await setup(options);
emitter.on("anyMessage", (messages, cursor, clock) => {
handleOperations(messages as any);
handleExport(scrapeInterval, clock);
handleExport(options.scrapeInterval, clock);
});

// Start streaming
Expand Down Expand Up @@ -219,8 +213,7 @@ export async function actionImportCsv(options: ActionOptions) {
return injectedLabels.length !== 0 ? `${line}${csvSeparator}${injectedLabels}` : line
}).join(lineSeparator)

const { address, port } = options;
const url = `http://${address}:${port}/api/v1/import/csv?format=` + mapping.join(formatSeparator)
const url = `${options.host}/api/v1/import/csv?format=` + mapping.join(formatSeparator)
logger.debug(`URL: ${url}`)
logger.debug(`BODY: ${body}`)
await fetch(url, { method: 'POST', body }).catch((error) => {
Expand Down
2 changes: 2 additions & 0 deletions src/victoria_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export function appendEpoch(metrics: string, epoch: number) {
}

export async function handleImport(url: string, scrapeInterval: number, clock: any) {
logger.info("handleImport")

if (!clock.timestamp) return; // no timestamp (yet
const epoch = clock.timestamp.toDate().valueOf();
if (epoch / 1000 % scrapeInterval != 0) return; // only handle epoch intervals
Expand Down

0 comments on commit 7190f4b

Please sign in to comment.