Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do Not merge #2090

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions consumer/dvo_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (DVORulesProcessor) storeInDB(consumer *KafkaConsumer, msg *sarama.Consumer
}
tStored := time.Now()
logDuration(tTimeCheck, tStored, msg.Offset, "db_store_report")
err = consumer.writeHeartbeats(msg, message, lastCheckedTime)
if err != nil {
return message.RequestID, message, err
}
return message.RequestID, message, nil
}

Expand Down Expand Up @@ -163,3 +167,31 @@ func (consumer *KafkaConsumer) writeDVOReport(
logMessageError(consumer, msg, &message, unexpectedStorageType, err)
return err
}

func (consumer *KafkaConsumer) writeHeartbeats(
msg *sarama.ConsumerMessage, message incomingMessage, lastCheckedTime time.Time,
) error {
if dvoStorage, ok := consumer.Storage.(storage.DVORecommendationsStorage); ok {
var uids []string

for _, w := range message.ParsedWorkloads {
for _, workload := range w.Workloads {
uids = append(uids, workload.UID)
}
}

err := dvoStorage.WriteHeartbeats(
uids,
lastCheckedTime,
)
if err != nil {
logMessageError(consumer, msg, &message, "Error writing heartbeats to database", err)
return err
}
logMessageDebug(consumer, msg, &message, "Stored heartbeats")
return nil
}
err := errors.New("heartbeats could not be stored")
logMessageError(consumer, msg, &message, unexpectedStorageType, err)
return err
}
33 changes: 33 additions & 0 deletions migration/dvomigrations/actual_migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,36 @@ func Test0004RuleHitsCount(t *testing.T) {
helpers.FailOnError(t, err)
assert.Equal(t, ruleHitsInput, ruleHits)
}

func TestMigration5_TableRuntimesHeartbeatsAlreadyExists(t *testing.T) {
db, closer := helpers.PrepareDBDVO(t)
defer closer()

dbConn := db.GetConnection()

err := migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), 4, dvomigrations.UsableDVOMigrations)
helpers.FailOnError(t, err)

_, err = dbConn.Exec(`CREATE TABLE dvo.runtimes_heartbeats(c INTEGER);`)
helpers.FailOnError(t, err)

err = migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), db.GetMaxVersion(), dvomigrations.UsableDVOMigrations)
assert.EqualError(t, err, "table runtimes_heartbeats already exists")
}

func TestMigration5_TableRuntimesHeartbeatsDoesNotExist(t *testing.T) {
db, closer := helpers.PrepareDBDVO(t)
defer closer()

dbConn := db.GetConnection()

err := migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), 5, dvomigrations.UsableDVOMigrations)
helpers.FailOnError(t, err)

_, err = dbConn.Exec(`DROP TABLE dvo.runtimes_heartbeats;`)
helpers.FailOnError(t, err)

// try to set to the first version
err = migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), 4, dvomigrations.UsableDVOMigrations)
assert.EqualError(t, err, "no such table: runtimes_heartbeats")
}
1 change: 1 addition & 0 deletions migration/dvomigrations/dvo_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ var UsableDVOMigrations = []migration.Migration{
mig0002CreateDVOReportIndexes,
mig0003CCXDEV12602DeleteBuggyRecords,
mig0004AddRuleHitsCount,
mig0005CreateRuntimesHeartbeats,
}
35 changes: 35 additions & 0 deletions migration/dvomigrations/mig_0005_runtimes_heartbeats_last_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package dvomigrations

import (
"database/sql"

"github.com/RedHatInsights/insights-results-aggregator/migration"
"github.com/RedHatInsights/insights-results-aggregator/types"
)

var mig0005CreateRuntimesHeartbeats = migration.Migration{
StepUp: func(tx *sql.Tx, _ types.DBDriver) error {
_, err := tx.Exec(`
CREATE TABLE dvo.runtimes_heartbeats (
instance_id VARCHAR NOT NULL,
last_checked_at TIMESTAMP,
PRIMARY KEY(instance_id)
);
`)
if err != nil {
return err
}

_, err = tx.Exec(`
COMMENT ON TABLE dvo.runtimes_heartbeats IS 'This table is used to store information of when the hearbeats was last received.';
COMMENT ON COLUMN dvo.runtimes_heartbeats.instance_id IS 'instance ID';
COMMENT ON COLUMN dvo.runtimes_heartbeats.last_checked_at IS 'timestamp of the received heartbeat';
`)

return err
},
StepDown: func(tx *sql.Tx, _ types.DBDriver) error {
_, err := tx.Exec(`DROP TABLE dvo.runtimes_heartbeats;`)
return err
},
}
127 changes: 126 additions & 1 deletion storage/dvo_recommendations_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type DVORecommendationsStorage interface {
string,
) (types.DVOReport, error)
DeleteReportsForOrg(orgID types.OrgID) error
WriteHeartbeats([]string, time.Time) error
}

const (
Expand Down Expand Up @@ -597,11 +598,135 @@ func (storage DVORecommendationsDBStorage) ReadWorkloadsForClusterAndNamespace(

log.Debug().Int(orgIDStr, int(orgID)).Msgf("ReadWorkloadsForClusterAndNamespace took %v", time.Since(tStart))

return dvoReport, err
if err != nil {
return dvoReport, err
}

return storage.filterReportWithHeartbeats(dvoReport)
}

// filter a DVO report based on the data in dvo.runtimes_heartbeats
// It the last timestampt there is longer than 30 secs (value for the demo)
// it gets removed from the report.
func (storage DVORecommendationsDBStorage) filterReportWithHeartbeats(dvoReport types.DVOReport) (
workload types.DVOReport,
err error,
) {
query := `
SELECT instance_id
FROM dvo.runtimes_heartbeats
WHERE last_checked_at > (now() - interval '30 seconds')
`

rows, err := storage.connection.Query(query)
if err != nil {
return dvoReport, err
}

defer closeRows(rows)

aliveInstances := map[string]bool{}

for rows.Next() {
var (
instanceID string
)
err = rows.Scan(
&instanceID,
)
if err != nil {
log.Error().Err(err).Msg("ReadWorkloadsForOrganization getting alive instances")
}

aliveInstances[instanceID] = true
}

// do not do any filtering if there is no data.
// This is a hack for previosly existing unit tests
if len(aliveInstances) == 0 {
return dvoReport, nil
}

processedReport := dvoReport.Report
processedReport = strings.ReplaceAll(processedReport, "\\n", "")
processedReport = strings.ReplaceAll(processedReport, "\\", "")
processedReport = processedReport[1 : len(processedReport)-1]

// filter report
var reportData types.DVOMetrics // here we will miss part of the original report, but a part that is not used anywhere
err = json.Unmarshal([]byte(processedReport), &reportData)
if err != nil {
return dvoReport, err
}

seenObjects := map[string]bool{}

for i, rec := range reportData.WorkloadRecommendations {
reportData.WorkloadRecommendations[i].Workloads = FilterWorkloads(rec.Workloads, aliveInstances, seenObjects)
}

bReport, err := json.Marshal(reportData)
if err != nil {
return dvoReport, err
}
fmt.Print(string(bReport))
dvoReport.Report = string(bReport)
dvoReport.Objects = uint(len(seenObjects)) // #nosec G115

return dvoReport, nil
}

// FilterWorkloads use aliveInstances to filter the workloads and add seen objects to a map
func FilterWorkloads(workloads []types.DVOWorkload, aliveInstances map[string]bool, seenObjects map[string]bool) []types.DVOWorkload {
i := 0 // output index
for _, x := range workloads {
if _, ok := aliveInstances[x.UID]; ok {
// copy and increment index
workloads[i] = x
i++
seenObjects[x.UID] = true
}
}
return workloads[:i]
}

// DeleteReportsForOrg deletes all reports related to the specified organization from the storage.
func (storage DVORecommendationsDBStorage) DeleteReportsForOrg(orgID types.OrgID) error {
_, err := storage.connection.Exec("DELETE FROM dvo.dvo_report WHERE org_id = $1;", orgID)
return err
}

// WriteHeartbeats insert multiple heartbeats
func (storage DVORecommendationsDBStorage) WriteHeartbeats(
instanceIDs []string,
lastCheckedTime time.Time,
) error {
timestamp := types.Timestamp(lastCheckedTime.UTC().Format(time.RFC3339))

sqlStr := "INSERT INTO dvo.runtimes_heartbeats VALUES "
vals := []interface{}{}

itemIndex := 1

for _, instanceID := range instanceIDs {
sqlStr += "($" + fmt.Sprint(itemIndex) + ", $" + fmt.Sprint(itemIndex+1) + "),"
vals = append(vals, instanceID, timestamp)
itemIndex += 2
}
//trim the last ,
sqlStr = sqlStr[0 : len(sqlStr)-1]

sqlStr += ";"

// Begin a new transaction.
tx, err := storage.connection.Begin()
if err != nil {
return err
}

_, err = tx.Exec(sqlStr, vals...)

finishTransaction(tx, err)

return err
}
Loading
Loading