Skip to content

Commit

Permalink
Write heartbeats timestampt into the table once the dvo writer gets a…
Browse files Browse the repository at this point in the history
… report
  • Loading branch information
ikerreyes committed Nov 14, 2024
1 parent 59a49e4 commit c89c250
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 0 deletions.
34 changes: 34 additions & 0 deletions consumer/dvo_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (DVORulesProcessor) storeInDB(consumer *KafkaConsumer, msg *sarama.Consumer
}
tStored := time.Now()
logDuration(tTimeCheck, tStored, msg.Offset, "db_store_report")
err = consumer.writeHeartbeats(msg, message, lastCheckedTime)
if err != nil {
return message.RequestID, message, err
}
return message.RequestID, message, nil
}

Expand Down Expand Up @@ -163,3 +167,33 @@ func (consumer *KafkaConsumer) writeDVOReport(
logMessageError(consumer, msg, &message, unexpectedStorageType, err)
return err
}


Check failure on line 171 in consumer/dvo_processing.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
func (consumer *KafkaConsumer) writeHeartbeats(
msg *sarama.ConsumerMessage, message incomingMessage, lastCheckedTime time.Time,
) error {
if dvoStorage, ok := consumer.Storage.(storage.DVORecommendationsStorage); ok {

Check failure on line 175 in consumer/dvo_processing.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

var uids []string

for _, w := range message.ParsedWorkloads {
for _, workload := range w.Workloads {
uids = append(uids,workload.UID)
}
}

err := dvoStorage.WriteHeartbeats(
uids,
lastCheckedTime,
)
if err != nil {
logMessageError(consumer, msg, &message, "Error writing heartbeats to database", err)
return err
}
logMessageDebug(consumer, msg, &message, "Stored heartbeats")
return nil
}
err := errors.New("heartbeats could not be stored")
logMessageError(consumer, msg, &message, unexpectedStorageType, err)
return err
}
54 changes: 54 additions & 0 deletions storage/dvo_recommendations_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type DVORecommendationsStorage interface {
) (types.DVOReport, error)
DeleteReportsForOrg(orgID types.OrgID) error
WriteHeartbeat(string, time.Time) error
WriteHeartbeats([]string, time.Time) error
}

const (
Expand Down Expand Up @@ -723,3 +724,56 @@ func (storage DVORecommendationsDBStorage) WriteHeartbeat(

return err
}


Check failure on line 728 in storage/dvo_recommendations_storage.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
// WriteHeartbeat insert multiple heartbeats
func (storage DVORecommendationsDBStorage) WriteHeartbeats(
instanceIDs []string,
lastCheckedTime time.Time,
) error {

Check failure on line 733 in storage/dvo_recommendations_storage.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

timestamp := types.Timestamp(lastCheckedTime.UTC().Format(time.RFC3339))


sqlStr := "INSERT INTO dvo.runtimes_heartbeats VALUES "
vals := []interface{}{}

itemIndex := 1

for _, instanceID := range instanceIDs {
sqlStr += "($"+fmt.Sprint(itemIndex)+", $"+fmt.Sprint(itemIndex+1)+"),"
vals = append(vals, instanceID, timestamp)
itemIndex += 2
}
//trim the last ,
sqlStr = sqlStr[0:len(sqlStr)-1]

sqlStr += ";"


// Begin a new transaction.
tx, err := storage.connection.Begin()
if err != nil {
return err
}


err = func(tx *sql.Tx) error {
// Check if there is a more recent report for the cluster already in the database.
_, err := tx.Exec(sqlStr, vals...)
// stmt, err := tx.Prepare(sqlStr)
// if err != nil {
// return err
// }
// _, err = stmt.Exec(vals...)
if err != nil {
return err
}

return nil
}(tx)

finishTransaction(tx, err)

return err
}
80 changes: 80 additions & 0 deletions storage/dvo_recommendations_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,3 +892,83 @@ func TestFilterWorkloads(t *testing.T) {
})
}
}


Check failure on line 896 in storage/dvo_recommendations_storage_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
func TestWriteHeartbeat(t *testing.T) {
mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true)
defer closer()


err := mockStorage.WriteHeartbeat(
"x", now,
)
helpers.FailOnError(t, err)

connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage))

query := `
SELECT *
FROM dvo.runtimes_heartbeats
`

rows, err := connection.Query(query)
helpers.FailOnError(t, err)

defer rows.Close()

for rows.Next() {
var (
instanceID string
timestamp time.Time
)
err = rows.Scan(
&instanceID, &timestamp,
)
helpers.FailOnError(t, err)

assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339))
assert.Equal(t, "x", instanceID)
}
}


func TestWriteHeartbeats(t *testing.T) {
mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true)
defer closer()

data := []string{"x", "y", "z"}

err := mockStorage.WriteHeartbeats(
data, now,
)
helpers.FailOnError(t, err)

connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage))

query := `
SELECT *
FROM dvo.runtimes_heartbeats
`

rows, err := connection.Query(query)
helpers.FailOnError(t, err)

defer rows.Close()

var instances []string

for rows.Next() {
var (
instanceID string
timestamp time.Time
)
err = rows.Scan(
&instanceID, &timestamp,
)
helpers.FailOnError(t, err)

assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339))
instances = append(instances, instanceID)
}
assert.Equal(t, data, instances)
}
8 changes: 8 additions & 0 deletions storage/noop_dvo_recommendations_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ func (*NoopDVOStorage) WriteHeartbeat(
) error {
return nil
}

// WriteHeartbeas noop
func (*NoopDVOStorage) WriteHeartbeats(
[]string,
time.Time,
) error {
return nil
}

0 comments on commit c89c250

Please sign in to comment.