Skip to content

Commit

Permalink
chore(metric): add type check for query data (#99)
Browse files Browse the repository at this point in the history
Because

- cause panic if data with legacy format exist in influxdb

This commit

- add type check for data queried from influxdb
  • Loading branch information
heiruwu authored Jul 18, 2023
1 parent a38727c commit daae4cb
Showing 1 changed file with 121 additions and 60 deletions.
181 changes: 121 additions & 60 deletions pkg/repository/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,35 @@ func (i *influxDB) QueryPipelineTriggerRecords(ctx context.Context, owner string
logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String()))
}

triggerTime, err := time.Parse(time.RFC3339Nano, result.Record().ValueByKey("trigger_time").(string))
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error())
}
records = append(
records,
&mgmtPB.PipelineTriggerRecord{
TriggerTime: timestamppb.New(triggerTime),
PipelineTriggerId: result.Record().ValueByKey("pipeline_trigger_id").(string),
PipelineId: result.Record().ValueByKey("pipeline_id").(string),
PipelineUid: result.Record().ValueByKey("pipeline_uid").(string),
TriggerMode: mgmtPB.Mode(mgmtPB.Mode_value[result.Record().ValueByKey("trigger_mode").(string)]),
ComputeTimeDuration: float32(result.Record().ValueByKey("compute_time_duration").(float64)),
Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]),
},
)
record := &mgmtPB.PipelineTriggerRecord{}

if v, match := result.Record().ValueByKey("trigger_time").(string); match {
triggerTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error())
}
record.TriggerTime = timestamppb.New(triggerTime)
}
if v, match := result.Record().ValueByKey("pipeline_trigger_id").(string); match {
record.PipelineTriggerId = v
}
if v, match := result.Record().ValueByKey("pipeline_id").(string); match {
record.PipelineId = v
}
if v, match := result.Record().ValueByKey("pipeline_uid").(string); match {
record.PipelineUid = v
}
if v, match := result.Record().ValueByKey("trigger_mode").(string); match {
record.TriggerMode = mgmtPB.Mode(mgmtPB.Mode_value[v])
}
if v, match := result.Record().ValueByKey("compute_time_duration").(float64); match {
record.ComputeTimeDuration = float32(v)
}
if v, match := result.Record().ValueByKey("status").(string); match {
record.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}

records = append(records, record)
}
// Check for an error
if result.Err() != nil {
Expand Down Expand Up @@ -261,8 +274,10 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
} else {
var chartRecord *mgmtPB.PipelineTriggerChartRecord

var currentTablePosition = -1
var chartRecord *mgmtPB.PipelineTriggerChartRecord

// Iterate over query response
for result.Next() {
// Notice when group key has changed
Expand All @@ -271,25 +286,37 @@ func (i *influxDB) QueryPipelineTriggerChartRecords(ctx context.Context, owner s
}

if result.Record().Table() != currentTablePosition {
chartRecord = &mgmtPB.PipelineTriggerChartRecord{
PipelineId: result.Record().ValueByKey("pipeline_id").(string),
PipelineUid: result.Record().ValueByKey("pipeline_uid").(string),
TriggerMode: mgmtPB.Mode(mgmtPB.Mode_value[result.Record().ValueByKey("trigger_mode").(string)]),
Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]),
TimeBuckets: []*timestamppb.Timestamp{},
TriggerCounts: []int64{},
ComputeTimeDuration: []float32{},

chartRecord = &mgmtPB.PipelineTriggerChartRecord{}

if v, match := result.Record().ValueByKey("pipeline_id").(string); match {
chartRecord.PipelineId = v
}
if v, match := result.Record().ValueByKey("pipeline_uid").(string); match {
chartRecord.PipelineUid = v
}
if v, match := result.Record().ValueByKey("trigger_mode").(string); match {
chartRecord.TriggerMode = mgmtPB.Mode(mgmtPB.Mode_value[v])
}
if v, match := result.Record().ValueByKey("status").(string); match {
chartRecord.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}
chartRecord.TimeBuckets = []*timestamppb.Timestamp{}
chartRecord.TriggerCounts = []int64{}
chartRecord.ComputeTimeDuration = []float32{}
records = append(records, chartRecord)
currentTablePosition = result.Record().Table()
}

if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error())
if v, match := result.Record().ValueByKey("_time").(time.Time); match {
chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(v))
}
if v, match := result.Record().ValueByKey("trigger_time").(int64); match {
chartRecord.TriggerCounts = append(chartRecord.TriggerCounts, v)
}
if v, match := result.Record().ValueByKey("compute_time_duration").(float64); match {
chartRecord.ComputeTimeDuration = append(chartRecord.ComputeTimeDuration, float32(v))
}
chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(result.Record().ValueByKey("_time").(time.Time)))
chartRecord.TriggerCounts = append(chartRecord.TriggerCounts, result.Record().ValueByKey("trigger_time").(int64))
chartRecord.ComputeTimeDuration = append(chartRecord.ComputeTimeDuration, float32(result.Record().ValueByKey("compute_time_duration").(float64)))
}
// Check for an error
if result.Err() != nil {
Expand Down Expand Up @@ -404,25 +431,44 @@ func (i *influxDB) QueryConnectorExecuteRecords(ctx context.Context, owner strin
logger.Debug(fmt.Sprintf("table: %s\n", result.TableMetadata().String()))
}

executeTime, err := time.Parse(time.RFC3339Nano, result.Record().ValueByKey("execute_time").(string))
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error())
}
records = append(
records,
&mgmtPB.ConnectorExecuteRecord{
ExecuteTime: timestamppb.New(executeTime),
ConnectorExecuteId: result.Record().ValueByKey("connector_execute_id").(string),
ConnectorId: result.Record().ValueByKey("connector_id").(string),
ConnectorUid: result.Record().ValueByKey("connector_uid").(string),
ConnectorDefinitionUid: result.Record().ValueByKey("connector_definition_uid").(string),
PipelineTriggerId: result.Record().ValueByKey("pipeline_trigger_id").(string),
PipelineId: result.Record().ValueByKey("pipeline_id").(string),
PipelineUid: result.Record().ValueByKey("pipeline_uid").(string),
ComputeTimeDuration: float32(result.Record().ValueByKey("compute_time_duration").(float64)),
Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]),
},
)
record := &mgmtPB.ConnectorExecuteRecord{}

if v, match := result.Record().ValueByKey("execute_time").(string); match {
executeTime, err := time.Parse(time.RFC3339Nano, v)
if err != nil {
return nil, 0, "", status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error())
}
record.ExecuteTime = timestamppb.New(executeTime)
}
if v, match := result.Record().ValueByKey("pipeline_trigger_id").(string); match {
record.PipelineTriggerId = v
}
if v, match := result.Record().ValueByKey("pipeline_id").(string); match {
record.PipelineId = v
}
if v, match := result.Record().ValueByKey("pipeline_uid").(string); match {
record.PipelineUid = v
}
if v, match := result.Record().ValueByKey("connector_execute_id").(string); match {
record.ConnectorExecuteId = v
}
if v, match := result.Record().ValueByKey("connector_id").(string); match {
record.ConnectorId = v
}
if v, match := result.Record().ValueByKey("connector_uid").(string); match {
record.ConnectorUid = v
}
if v, match := result.Record().ValueByKey("connector_definition_uid").(string); match {
record.ConnectorDefinitionUid = v
}
if v, match := result.Record().ValueByKey("compute_time_duration").(float64); match {
record.ComputeTimeDuration = float32(v)
}
if v, match := result.Record().ValueByKey("status").(string); match {
record.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}

records = append(records, record)
}
// Check for an error
if result.Err() != nil {
Expand Down Expand Up @@ -515,8 +561,10 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid query: %s", err.Error())
} else {
var chartRecord *mgmtPB.ConnectorExecuteChartRecord

var currentTablePosition = -1
var chartRecord *mgmtPB.ConnectorExecuteChartRecord

// Iterate over query response
for result.Next() {
// Notice when group key has changed
Expand All @@ -525,24 +573,37 @@ func (i *influxDB) QueryConnectorExecuteChartRecords(ctx context.Context, owner
}

if result.Record().Table() != currentTablePosition {
chartRecord = &mgmtPB.ConnectorExecuteChartRecord{
ConnectorId: result.Record().ValueByKey("connector_id").(string),
ConnectorUid: result.Record().ValueByKey("connector_uid").(string),
Status: mgmtPB.Status(mgmtPB.Status_value[result.Record().ValueByKey("status").(string)]),
TimeBuckets: []*timestamppb.Timestamp{},
ExecuteCounts: []int64{},
ComputeTimeDuration: []float32{},

chartRecord = &mgmtPB.ConnectorExecuteChartRecord{}

if v, match := result.Record().ValueByKey("connector_id").(string); match {
chartRecord.ConnectorId = v
}
if v, match := result.Record().ValueByKey("connector_uid").(string); match {
chartRecord.ConnectorUid = v
}
if v, match := result.Record().ValueByKey("status").(string); match {
chartRecord.Status = mgmtPB.Status(mgmtPB.Status_value[v])
}
chartRecord.TimeBuckets = []*timestamppb.Timestamp{}
chartRecord.ExecuteCounts = []int64{}
chartRecord.ComputeTimeDuration = []float32{}
records = append(records, chartRecord)
currentTablePosition = result.Record().Table()
}

if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Invalid parse key: %s", err.Error())
}
chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(result.Record().ValueByKey("_time").(time.Time)))
chartRecord.ExecuteCounts = append(chartRecord.ExecuteCounts, result.Record().ValueByKey("execute_time").(int64))
chartRecord.ComputeTimeDuration = append(chartRecord.ComputeTimeDuration, float32(result.Record().ValueByKey("compute_time_duration").(float64)))
if v, match := result.Record().ValueByKey("_time").(time.Time); match {
chartRecord.TimeBuckets = append(chartRecord.TimeBuckets, timestamppb.New(v))
}
if v, match := result.Record().ValueByKey("execute_time").(int64); match {
chartRecord.ExecuteCounts = append(chartRecord.ExecuteCounts, v)
}
if v, match := result.Record().ValueByKey("compute_time_duration").(float64); match {
chartRecord.ComputeTimeDuration = append(chartRecord.ComputeTimeDuration, float32(v))
}
}
// Check for an error
if result.Err() != nil {
Expand Down

0 comments on commit daae4cb

Please sign in to comment.