Skip to content

Commit

Permalink
refactor: rewrite queries with InfluxQL
Browse files Browse the repository at this point in the history
I first started to implement query by series, to support different
aggregates (min, max) per paths, but switched to doing everything
in InfluxQL instead because of way better performance. Flux
performed abysmally.

InfluxQL queries now use InfluxV2's V1 api, with the v1 client
library.
  • Loading branch information
Teppo Kurki authored and tkurki committed Aug 17, 2023
1 parent 11583d0 commit 7c651d1
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 35 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
"@js-joda/core": "^5.3.0",
"@js-joda/timezone": "^2.12.1",
"@signalk/signalk-schema": "^1.6.0",
"influx": "^5.9.3",
"s2-geometry": "^1.2.10"
},
"peerDependencies": {
Expand Down
232 changes: 197 additions & 35 deletions src/HistoryAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,17 @@ import { DateTimeFormatter, ZonedDateTime } from '@js-joda/core'

import { Request, Response, Router } from 'express'
import { SKInflux } from './influx'
import { InfluxDB as InfluxV1 } from 'influx'
import { FluxResultObserver, FluxTableMetaData } from '@influxdata/influxdb-client'

function makeArray(d1: number, d2: number) {
const arr = []
for (let i = 0; i < d1; i++) {
arr.push(new Array(d2))
}
return arr
}

export function registerHistoryApiRoute(
router: Pick<Router, 'get'>,
influx: SKInflux,
Expand Down Expand Up @@ -61,17 +70,154 @@ interface ValuesResult {
data: ValuesResultRow[]
}

interface SimpleResponse {
status: (s: number) => void
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
json: (j: any) => void
}

interface SimpleRequest {
query: {
resolution?: string
paths?: string
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ValuesResultRow = any[]

async function getValues(
function getPositions(
v1Client: InfluxV1,
context: string,
from: ZonedDateTime,
to: ZonedDateTime,
timeResolutionMillis: number,
debug: (s: string) => void,
res: SimpleResponse,
) {
const query = `
select
first(lat) as lat, first(lon) as lon
from
"navigation.position"
where
"context" = '${context}'
and
time >= '${from.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}Z'
and
time <= '${to.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}Z'
group by time(${timeResolutionMillis}ms)`

debug(query)

// eslint-disable-next-line @typescript-eslint/no-explicit-any
v1Client.query(query).then((rows: any[]) => {
const resultData = rows.map((row) => {
return [row.time.toISOString(), [row.lon, row.lat]]
})

res.json({
context,
range: {
from: from.toString(),
to: to.toString(),
},
values: [{ path: 'navigation.position', method: 'first' }],
data: resultData,
})
})
}

export function getValues(
influx: SKInflux,
context: string,
from: ZonedDateTime,
to: ZonedDateTime,
debug: (s: string) => void,
req: SimpleRequest,
res: SimpleResponse,
) {
const start = Date.now()
const timeResolutionMillis =
(req.query.resolution
? Number.parseFloat(req.query.resolution as string)
: (to.toEpochSecond() - from.toEpochSecond()) / 500) * 1000
const pathExpressions = ((req.query.paths as string) || '').replace(/[^0-9a-z.,:]/gi, '').split(',')
const pathSpecs: PathSpec[] = pathExpressions.map(splitPathExpression)

if (pathSpecs[0].path === 'navigation.position') {
getPositions(influx.v1Client, context, from, to, timeResolutionMillis, debug, res)
return
}

const uniquePaths = pathSpecs.reduce<string[]>((acc, ps) => {
if (acc.indexOf(ps.path) === -1) {
acc.push(ps.path)
}
return acc
}, [])
const uniqueAggregates = pathSpecs.reduce<string[]>((acc, ps) => {
if (acc.indexOf(ps.aggregateFunction) === -1) {
acc.push(ps.aggregateFunction)
}
return acc
}, [])

const query = `
select
${uniqueAggregates.map((aggregateFunction) => `${aggregateFunction}(value)`).join(',')}
from
${uniquePaths.map((s) => `"${s}"`).join(',')}
where
"context" = '${context}'
and
time >= '${from.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}Z'
and
time <= '${to.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}Z'
group by time(${timeResolutionMillis}ms)`
debug(query)

influx.v1Client
.query(query)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.then((rows: any[]) => {
debug(`got rows ${Date.now() - start}`)
const resultLength = rows.length / uniquePaths.length
const resultData = makeArray(resultLength, pathSpecs.length + 1)

for (let j = 0; j < resultLength; j++) {
resultData[j][0] = rows[j].time.toISOString()
}
pathSpecs.forEach((ps, i) => {
const pathIndex = uniquePaths.indexOf(ps.path)
const firstRow = pathIndex * resultLength
const fieldIndex = i + 1 // first is Date
for (let j = 0; j < resultLength; j++) {
resultData[j][fieldIndex] = rows[firstRow + j][ps.aggregateFunction]
}
})
debug(`rows done ${Date.now() - start}`)
res.json({
context,
range: {
from: from.toString(),
to: to.toString(),
},
values: pathSpecs.map(({ path, aggregateMethod }: PathSpec) => ({ path, method: aggregateMethod })),
data: resultData,
})
})
.catch((e) => console.error(e))
}

export async function getValuesFlux(
influx: SKInflux,
context: string,
from: ZonedDateTime,
to: ZonedDateTime,
debug: (s: string) => void,
req: Request,
res: Response,
req: SimpleRequest,
res: SimpleResponse,
): Promise<ValuesResult | void> {
const timeResolutionMillis =
(req.query.resolution
Expand All @@ -80,29 +226,27 @@ async function getValues(

const pathExpressions = ((req.query.paths as string) || '').replace(/[^0-9a-z.,:]/gi, '').split(',')
const pathSpecs: PathSpec[] = pathExpressions.map(splitPathExpression)
const valuesResult: ValuesResult = {
context,
range: {
from: from.toString(),
to: to.toString(),
},
values: pathSpecs.map(({ path, aggregateMethod }: PathSpec) => ({ path, method: aggregateMethod })),
data: [],
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const resultData: any[] = []

const measurementsOrClause = pathSpecs.map(({ path }) => `r._measurement == "${path}"`).join(' or ')
const measurements = pathSpecs
.map(
({ path, aggregateFunction, queryResultName }, i) => `
dataForContext
|> filter(fn: (r) => r._measurement == "${path}")
|> aggregateWindow(every: ${timeResolutionMillis.toFixed(0)}ms, fn: ${aggregateFunction})
|> yield(name: "${queryResultName + i}")
`,
)
.join('\n')
let query = `
from(bucket: "${influx.bucket}")
dataForContext = from(bucket: "${influx.bucket}")
|> range(start: ${from.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)}Z, stop: ${to.format(
DateTimeFormatter.ISO_LOCAL_DATE_TIME,
)}Z)
|> filter(fn: (r) =>
r.context == "${context}" and
${measurementsOrClause} and
r._field == "value"
)
|> aggregateWindow(every: ${timeResolutionMillis.toFixed(0)}ms, fn: ${pathSpecs[0].aggregateFunction})
|> pivot(rowKey: ["_time"], columnKey: ["_measurement"], valueColumn: "_value")
|> filter(fn: (r) => r.context == "${context}")
${measurements}
`

if (pathSpecs[0].path === 'navigation.position') {
Expand All @@ -122,12 +266,28 @@ async function getValues(
}
debug(query)

const queryResultNames = pathSpecs.map(({ queryResultName }, i) => `${queryResultName + i}`)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const resultTimes: Record<any, number> = {}
let i = 0
let j = 0

const start = Date.now()
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const o: FluxResultObserver<any> = {
next: (row: string[], tableMeta: FluxTableMetaData) => {
if (j++ === 0) {
debug(`start ${Date.now() - start}`)
}
const time = tableMeta.get(row, '_time')
const dataRow = [time, ...pathSpecs.map((pathSpec) => pathSpec.extractValue(pathSpec.path, row, tableMeta))]
valuesResult.data.push(dataRow)
if (resultTimes[time] === undefined) {
resultTimes[time] = i++
resultData.push([time])
}
const result = tableMeta.get(row, 'result')
const value = tableMeta.get(row, '_value')
const fieldIndex = queryResultNames.indexOf(result)
resultData[resultTimes[time]][fieldIndex + 1] = value
return true
},
error: (s: Error) => {
Expand All @@ -136,7 +296,18 @@ async function getValues(
res.status(500)
res.json(s)
},
complete: () => res.json(valuesResult),
complete: () => {
debug(`complete ${Date.now() - start}`)
res.json({
context,
range: {
from: from.toString(),
to: to.toString(),
},
values: pathSpecs.map(({ path, aggregateMethod }: PathSpec) => ({ path, method: aggregateMethod })),
data: resultData,
})
},
}
influx.queryApi.queryRows(query, o)
}
Expand All @@ -148,33 +319,24 @@ function getContext(contextFromQuery: string, selfId: string) {
return contextFromQuery.replace(/ /gi, '')
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type ExtractValue = (path: string, row: string[], tableMeta: FluxTableMetaData) => any

interface PathSpec {
path: string
queryResultName: string
aggregateMethod: string
aggregateFunction: string
extractValue: ExtractValue
}
const EXTRACT_POSITION = (path: string, row: string[], tableMeta: FluxTableMetaData) => [
tableMeta.get(row, 'lon'),
tableMeta.get(row, 'lat'),
]
const EXTRACT_NUMBER = (path: string, row: string[], tableMeta: FluxTableMetaData) => tableMeta.get(row, path)

function splitPathExpression(pathExpression: string): PathSpec {
const parts = pathExpression.split(':')
let aggregateMethod = parts[1] || 'average'
let extractValue: ExtractValue = EXTRACT_NUMBER
if (parts[0] === 'navigation.position') {
aggregateMethod = 'first'
extractValue = EXTRACT_POSITION
}
return {
path: parts[0],
queryResultName: parts[0].replace(/\./g, '_'),
aggregateMethod,
extractValue,
aggregateFunction: (functionForAggregate[aggregateMethod] as string) || 'mean()',
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/influx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import { SKContext } from '@chacal/signalk-ts'
import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client'
import { BucketsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis'
import { InfluxDB as InfluxV1 } from 'influx'
import { getUnits } from '@signalk/signalk-schema'

import { Logging, QueryParams } from './plugin'
Expand Down Expand Up @@ -88,6 +89,8 @@ export class SKInflux {
public lastWriteCallbackSucceeded = false
public url: string
private onlySelf: boolean
public v1Client: InfluxV1

constructor(config: SKInfluxConfig, private logging: Logging, triggerStatusUpdate: () => void) {
const { org, bucket, url, onlySelf } = config
this.influx = new InfluxDB(config)
Expand All @@ -109,6 +112,21 @@ export class SKInflux {
},
})
this.queryApi = this.influx.getQueryApi(org)
const parsedUrl = new URL(url)
this.v1Client = new InfluxV1({
host: parsedUrl.hostname,
username: org,
// leave password empty
password: '',
port: Number(parsedUrl.port),
protocol: <"http" | "https">parsedUrl.protocol.slice(0, -1),
database: bucket,
options: {
headers: {
Authorization: `Token ${config.token}`,
},
},
})
}

init() {
Expand Down
39 changes: 39 additions & 0 deletions src/query.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { ZonedDateTime } from '@js-joda/core'
import { getValues } from './HistoryAPI'
import { SKInflux } from './influx'

/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/no-unused-vars */

const toConsole = (s: any) => console.log(s)
const logging = {
debug: toConsole,
error: toConsole,
}

const skinflux = new SKInflux(
{
onlySelf: true,
url: process.env.URL || '!!!',
token: process.env.TOKEN || '!!!',
bucket: process.env.BUCKET || '!!!',
org: process.env.ORG || '!!!',
writeOptions: {},
},
logging,
() => undefined,
)
const context = process.env.CONTEXT || 'no-context'
const start = ZonedDateTime.parse('2023-07-24T13:03:29.048Z')
const end = ZonedDateTime.parse('2023-07-24T13:04:29.048Z')
const req = {
query: {
paths: process.env.PATHS,
resolution: '60s',
},
}
const resp = {
status: (n: number) => undefined,
json: (s: any) => console.log(JSON.stringify(s, null, 2)),
}
getValues(skinflux, context, start, end, toConsole, req, resp)

0 comments on commit 7c651d1

Please sign in to comment.