diff --git a/consumer/dvo_processing.go b/consumer/dvo_processing.go index fc7333dc..5f5286a1 100644 --- a/consumer/dvo_processing.go +++ b/consumer/dvo_processing.go @@ -128,6 +128,10 @@ func (DVORulesProcessor) storeInDB(consumer *KafkaConsumer, msg *sarama.Consumer } tStored := time.Now() logDuration(tTimeCheck, tStored, msg.Offset, "db_store_report") + err = consumer.writeHeartbeats(msg, message, lastCheckedTime) + if err != nil { + return message.RequestID, message, err + } return message.RequestID, message, nil } @@ -163,3 +167,31 @@ func (consumer *KafkaConsumer) writeDVOReport( logMessageError(consumer, msg, &message, unexpectedStorageType, err) return err } + +func (consumer *KafkaConsumer) writeHeartbeats( + msg *sarama.ConsumerMessage, message incomingMessage, lastCheckedTime time.Time, +) error { + if dvoStorage, ok := consumer.Storage.(storage.DVORecommendationsStorage); ok { + var uids []string + + for _, w := range message.ParsedWorkloads { + for _, workload := range w.Workloads { + uids = append(uids, workload.UID) + } + } + + err := dvoStorage.WriteHeartbeats( + uids, + lastCheckedTime, + ) + if err != nil { + logMessageError(consumer, msg, &message, "Error writing heartbeats to database", err) + return err + } + logMessageDebug(consumer, msg, &message, "Stored heartbeats") + return nil + } + err := errors.New("heartbeats could not be stored") + logMessageError(consumer, msg, &message, unexpectedStorageType, err) + return err +} diff --git a/migration/dvomigrations/actual_migrations_test.go b/migration/dvomigrations/actual_migrations_test.go index c074fe42..6a6c5d59 100644 --- a/migration/dvomigrations/actual_migrations_test.go +++ b/migration/dvomigrations/actual_migrations_test.go @@ -126,3 +126,36 @@ func Test0004RuleHitsCount(t *testing.T) { helpers.FailOnError(t, err) assert.Equal(t, ruleHitsInput, ruleHits) } + +func TestMigration5_TableRuntimesHeartbeatsAlreadyExists(t *testing.T) { + db, closer := helpers.PrepareDBDVO(t) + defer closer() + + dbConn := db.GetConnection() + + err := migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), 4, dvomigrations.UsableDVOMigrations) + helpers.FailOnError(t, err) + + _, err = dbConn.Exec(`CREATE TABLE dvo.runtimes_heartbeats(c INTEGER);`) + helpers.FailOnError(t, err) + + err = migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), db.GetMaxVersion(), dvomigrations.UsableDVOMigrations) + assert.EqualError(t, err, "table runtimes_heartbeats already exists") +} + +func TestMigration5_TableRuntimesHeartbeatsDoesNotExist(t *testing.T) { + db, closer := helpers.PrepareDBDVO(t) + defer closer() + + dbConn := db.GetConnection() + + err := migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), 5, dvomigrations.UsableDVOMigrations) + helpers.FailOnError(t, err) + + _, err = dbConn.Exec(`DROP TABLE dvo.runtimes_heartbeats;`) + helpers.FailOnError(t, err) + + // try to set to the first version + err = migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), 4, dvomigrations.UsableDVOMigrations) + assert.EqualError(t, err, "no such table: runtimes_heartbeats") +} diff --git a/migration/dvomigrations/dvo_migrations.go b/migration/dvomigrations/dvo_migrations.go index bc08cf8e..900e9938 100644 --- a/migration/dvomigrations/dvo_migrations.go +++ b/migration/dvomigrations/dvo_migrations.go @@ -8,4 +8,5 @@ var UsableDVOMigrations = []migration.Migration{ mig0002CreateDVOReportIndexes, mig0003CCXDEV12602DeleteBuggyRecords, mig0004AddRuleHitsCount, + mig0005CreateRuntimesHeartbeats, } diff --git a/migration/dvomigrations/mig_0005_runtimes_heartbeats_last_check.go b/migration/dvomigrations/mig_0005_runtimes_heartbeats_last_check.go new file mode 100644 index 00000000..32fe9b0f --- /dev/null +++ b/migration/dvomigrations/mig_0005_runtimes_heartbeats_last_check.go @@ -0,0 +1,35 @@ +package dvomigrations + +import ( + "database/sql" + + "github.com/RedHatInsights/insights-results-aggregator/migration" + "github.com/RedHatInsights/insights-results-aggregator/types" +) + +var mig0005CreateRuntimesHeartbeats = migration.Migration{ + StepUp: func(tx *sql.Tx, _ types.DBDriver) error { + _, err := tx.Exec(` + CREATE TABLE dvo.runtimes_heartbeats ( + instance_id VARCHAR NOT NULL, + last_checked_at TIMESTAMP, + PRIMARY KEY(instance_id) + ); + `) + if err != nil { + return err + } + + _, err = tx.Exec(` + COMMENT ON TABLE dvo.runtimes_heartbeats IS 'This table is used to store information of when the hearbeats was last received.'; + COMMENT ON COLUMN dvo.runtimes_heartbeats.instance_id IS 'instance ID'; + COMMENT ON COLUMN dvo.runtimes_heartbeats.last_checked_at IS 'timestamp of the received heartbeat'; + `) + + return err + }, + StepDown: func(tx *sql.Tx, _ types.DBDriver) error { + _, err := tx.Exec(`DROP TABLE dvo.runtimes_heartbeats;`) + return err + }, +} diff --git a/storage/dvo_recommendations_storage.go b/storage/dvo_recommendations_storage.go index 54dfdd1a..e14b05a9 100644 --- a/storage/dvo_recommendations_storage.go +++ b/storage/dvo_recommendations_storage.go @@ -56,6 +56,7 @@ type DVORecommendationsStorage interface { string, ) (types.DVOReport, error) DeleteReportsForOrg(orgID types.OrgID) error + WriteHeartbeats([]string, time.Time) error } const ( @@ -597,7 +598,96 @@ func (storage DVORecommendationsDBStorage) ReadWorkloadsForClusterAndNamespace( log.Debug().Int(orgIDStr, int(orgID)).Msgf("ReadWorkloadsForClusterAndNamespace took %v", time.Since(tStart)) - return dvoReport, err + if err != nil { + return dvoReport, err + } + + return storage.filterReportWithHeartbeats(dvoReport) +} + +// filter a DVO report based on the data in dvo.runtimes_heartbeats +// It the last timestampt there is longer than 30 secs (value for the demo) +// it gets removed from the report. +func (storage DVORecommendationsDBStorage) filterReportWithHeartbeats(dvoReport types.DVOReport) ( + workload types.DVOReport, + err error, +) { + query := ` + SELECT instance_id + FROM dvo.runtimes_heartbeats + WHERE last_checked_at > (now() - interval '30 seconds') + ` + + rows, err := storage.connection.Query(query) + if err != nil { + return dvoReport, err + } + + defer closeRows(rows) + + aliveInstances := map[string]bool{} + + for rows.Next() { + var ( + instanceID string + ) + err = rows.Scan( + &instanceID, + ) + if err != nil { + log.Error().Err(err).Msg("ReadWorkloadsForOrganization getting alive instances") + } + + aliveInstances[instanceID] = true + } + + // do not do any filtering if there is no data. + // This is a hack for previosly existing unit tests + if len(aliveInstances) == 0 { + return dvoReport, nil + } + + processedReport := dvoReport.Report + processedReport = strings.ReplaceAll(processedReport, "\\n", "") + processedReport = strings.ReplaceAll(processedReport, "\\", "") + processedReport = processedReport[1 : len(processedReport)-1] + + // filter report + var reportData types.DVOMetrics // here we will miss part of the original report, but a part that is not used anywhere + err = json.Unmarshal([]byte(processedReport), &reportData) + if err != nil { + return dvoReport, err + } + + seenObjects := map[string]bool{} + + for i, rec := range reportData.WorkloadRecommendations { + reportData.WorkloadRecommendations[i].Workloads = FilterWorkloads(rec.Workloads, aliveInstances, seenObjects) + } + + bReport, err := json.Marshal(reportData) + if err != nil { + return dvoReport, err + } + fmt.Print(string(bReport)) + dvoReport.Report = string(bReport) + dvoReport.Objects = uint(len(seenObjects)) // #nosec G115 + + return dvoReport, nil +} + +// FilterWorkloads use aliveInstances to filter the workloads and add seen objects to a map +func FilterWorkloads(workloads []types.DVOWorkload, aliveInstances map[string]bool, seenObjects map[string]bool) []types.DVOWorkload { + i := 0 // output index + for _, x := range workloads { + if _, ok := aliveInstances[x.UID]; ok { + // copy and increment index + workloads[i] = x + i++ + seenObjects[x.UID] = true + } + } + return workloads[:i] } // DeleteReportsForOrg deletes all reports related to the specified organization from the storage. @@ -605,3 +695,38 @@ func (storage DVORecommendationsDBStorage) DeleteReportsForOrg(orgID types.OrgID _, err := storage.connection.Exec("DELETE FROM dvo.dvo_report WHERE org_id = $1;", orgID) return err } + +// WriteHeartbeats insert multiple heartbeats +func (storage DVORecommendationsDBStorage) WriteHeartbeats( + instanceIDs []string, + lastCheckedTime time.Time, +) error { + timestamp := types.Timestamp(lastCheckedTime.UTC().Format(time.RFC3339)) + + sqlStr := "INSERT INTO dvo.runtimes_heartbeats VALUES " + vals := []interface{}{} + + itemIndex := 1 + + for _, instanceID := range instanceIDs { + sqlStr += "($" + fmt.Sprint(itemIndex) + ", $" + fmt.Sprint(itemIndex+1) + ")," + vals = append(vals, instanceID, timestamp) + itemIndex += 2 + } + //trim the last , + sqlStr = sqlStr[0 : len(sqlStr)-1] + + sqlStr += ";" + + // Begin a new transaction. + tx, err := storage.connection.Begin() + if err != nil { + return err + } + + _, err = tx.Exec(sqlStr, vals...) + + finishTransaction(tx, err) + + return err +} diff --git a/storage/dvo_recommendations_storage_test.go b/storage/dvo_recommendations_storage_test.go index 7dd3e8bb..11fd5445 100644 --- a/storage/dvo_recommendations_storage_test.go +++ b/storage/dvo_recommendations_storage_test.go @@ -17,6 +17,7 @@ limitations under the License. package storage_test import ( + _ "embed" "encoding/json" "fmt" "strconv" @@ -807,3 +808,128 @@ func TestDVOStorageReadWorkloadsForNamespace_MissingData(t *testing.T) { assert.Equal(t, &types.ItemNotFoundError{ItemID: fmt.Sprintf("%d:%s:%s", testdata.OrgID, nonExistingCluster, ira_data.NamespaceAUID)}, err) }) } + +//go:embed report_for_heartbeats.json +var heartbeatsFilteringReport string + +func TestReadWorkloadsForClusterAndNamespace_HearbeatsFiltering(t *testing.T) { + mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true) + defer closer() + + err := mockStorage.WriteReportForCluster( + testdata.OrgID, + testdata.ClusterName, + types.ClusterReport(heartbeatsFilteringReport), + ira_data.ValidDVORecommendation, + now, + now, + now, + testdata.RequestID1, + ) + helpers.FailOnError(t, err) + + // write heartbeats + err = mockStorage.WriteHeartbeats([]string{"UID-0099"}, time.Now().UTC()) + helpers.FailOnError(t, err) + err = mockStorage.WriteHeartbeats([]string{"UID-0100"}, time.Now().UTC().Add(-1*time.Hour).UTC()) + helpers.FailOnError(t, err) + + report, err := mockStorage.ReadWorkloadsForClusterAndNamespace(testdata.OrgID, testdata.ClusterName, "NAMESPACE-UID-A") + helpers.FailOnError(t, err) + + assert.Equal(t, uint(1), report.Objects) + assert.Contains(t, report.Report, "UID-0099") + assert.NotContains(t, report.Report, "UID-0100") +} + +func NewDVOWorkload(uid string) types.DVOWorkload { + return types.DVOWorkload{ + UID: uid, + } +} + +func TestFilterWorkloads(t *testing.T) { + aliveInstances := map[string]bool{"x": true, "y": true, "z": true} + + testCases := []struct { + workloads []types.DVOWorkload + seen []string + }{ + { + workloads: []types.DVOWorkload{NewDVOWorkload("x")}, + seen: []string{"x"}, + }, + { + workloads: []types.DVOWorkload{NewDVOWorkload("a")}, + seen: []string{}, + }, + { + workloads: []types.DVOWorkload{NewDVOWorkload("a"), NewDVOWorkload("x")}, + seen: []string{"x"}, + }, + { + workloads: []types.DVOWorkload{NewDVOWorkload("a"), NewDVOWorkload("x"), NewDVOWorkload("b"), NewDVOWorkload("y")}, + seen: []string{"x", "y"}, + }, + } + + for i, tt := range testCases { + t.Run("case-"+fmt.Sprint(i), func(t *testing.T) { + gotSeen := map[string]bool{} + expectedSeen := map[string]bool{} + for _, v := range tt.seen { + expectedSeen[v] = true + } + got := storage.FilterWorkloads(tt.workloads, aliveInstances, gotSeen) + + assert.Equal(t, expectedSeen, gotSeen) + assert.Len(t, got, len(tt.seen)) + gotUIDs := []string{} + for _, workload := range got { + gotUIDs = append(gotUIDs, workload.UID) + } + assert.Equal(t, tt.seen, gotUIDs) + }) + } +} + +func TestWriteHeartbeats(t *testing.T) { + mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true) + defer closer() + + data := []string{"x", "y", "z"} + + err := mockStorage.WriteHeartbeats( + data, now, + ) + helpers.FailOnError(t, err) + + connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage)) + + query := ` + SELECT * + FROM dvo.runtimes_heartbeats + ` + + rows, err := connection.Query(query) + helpers.FailOnError(t, err) + + defer func() { _ = rows.Close() }() + + var instances []string + + for rows.Next() { + var ( + instanceID string + timestamp time.Time + ) + err = rows.Scan( + &instanceID, ×tamp, + ) + helpers.FailOnError(t, err) + + assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339)) + instances = append(instances, instanceID) + } + assert.Equal(t, data, instances) +} diff --git a/storage/noop_dvo_recommendations_storage.go b/storage/noop_dvo_recommendations_storage.go index 86e912b7..33c2fe0c 100644 --- a/storage/noop_dvo_recommendations_storage.go +++ b/storage/noop_dvo_recommendations_storage.go @@ -96,3 +96,19 @@ func (*NoopDVOStorage) ReadWorkloadsForClusterAndNamespace( func (*NoopDVOStorage) DeleteReportsForOrg(types.OrgID) error { return nil } + +// WriteHeartbeat noop +func (*NoopDVOStorage) WriteHeartbeat( + string, + time.Time, +) error { + return nil +} + +// WriteHeartbeats noop +func (*NoopDVOStorage) WriteHeartbeats( + []string, + time.Time, +) error { + return nil +} diff --git a/storage/report_for_heartbeats.json b/storage/report_for_heartbeats.json new file mode 100644 index 00000000..07dc1870 --- /dev/null +++ b/storage/report_for_heartbeats.json @@ -0,0 +1,50 @@ +{ + "system": { + "metadata": {}, + "hostname": null + }, + "fingerprints": [], + "version": 1, + "analysis_metadata": {}, + "workload_recommendations": [ + { + "response_id": "an_issue|DVO_AN_ISSUE", + "component": "ccx_rules_ocp.external.dvo.an_issue_pod.recommendation", + "key": "DVO_AN_ISSUE", + "details": { + "check_name": "", + "check_url": "", + "samples": [ + { + "namespace_uid": "NAMESPACE-UID-A", + "kind": "DaemonSet", + "uid": "193a2099-1234-5678-916a-d570c9aac158" + } + ] + }, + "tags": [], + "links": { + "jira": [ + "https://issues.redhat.com/browse/AN_ISSUE" + ], + "product_documentation": [] + }, + "workloads": [ + { + "namespace": "namespace-name-A", + "namespace_uid": "NAMESPACE-UID-A", + "kind": "DaemonSet", + "name": "test-name-0099", + "uid": "UID-0099" + }, + { + "namespace": "namespace-name-A", + "namespace_uid": "NAMESPACE-UID-A", + "kind": "DaemonSet", + "name": "test-name-0100", + "uid": "UID-0100" + } + ] + } + ] + } \ No newline at end of file