diff --git a/consumer/dvo_processing.go b/consumer/dvo_processing.go index fc7333dc..44ff941d 100644 --- a/consumer/dvo_processing.go +++ b/consumer/dvo_processing.go @@ -128,6 +128,10 @@ func (DVORulesProcessor) storeInDB(consumer *KafkaConsumer, msg *sarama.Consumer } tStored := time.Now() logDuration(tTimeCheck, tStored, msg.Offset, "db_store_report") + err = consumer.writeHeartbeats(msg, message, lastCheckedTime) + if err != nil { + return message.RequestID, message, err + } return message.RequestID, message, nil } @@ -163,3 +167,33 @@ func (consumer *KafkaConsumer) writeDVOReport( logMessageError(consumer, msg, &message, unexpectedStorageType, err) return err } + + +func (consumer *KafkaConsumer) writeHeartbeats( + msg *sarama.ConsumerMessage, message incomingMessage, lastCheckedTime time.Time, +) error { + if dvoStorage, ok := consumer.Storage.(storage.DVORecommendationsStorage); ok { + + var uids []string + + for _, w := range message.ParsedWorkloads { + for _, workload := range w.Workloads { + uids = append(uids,workload.UID) + } + } + + err := dvoStorage.WriteHeartbeats( + uids, + lastCheckedTime, + ) + if err != nil { + logMessageError(consumer, msg, &message, "Error writing heartbeats to database", err) + return err + } + logMessageDebug(consumer, msg, &message, "Stored heartbeats") + return nil + } + err := errors.New("heartbeats could not be stored") + logMessageError(consumer, msg, &message, unexpectedStorageType, err) + return err +} diff --git a/storage/dvo_recommendations_storage.go b/storage/dvo_recommendations_storage.go index 517e1000..58078529 100644 --- a/storage/dvo_recommendations_storage.go +++ b/storage/dvo_recommendations_storage.go @@ -57,6 +57,7 @@ type DVORecommendationsStorage interface { ) (types.DVOReport, error) DeleteReportsForOrg(orgID types.OrgID) error WriteHeartbeat(string, time.Time) error + WriteHeartbeats([]string, time.Time) error } const ( @@ -723,3 +724,56 @@ func (storage DVORecommendationsDBStorage) WriteHeartbeat( return err } + + +// WriteHeartbeat insert multiple heartbeats +func (storage DVORecommendationsDBStorage) WriteHeartbeats( + instanceIDs []string, + lastCheckedTime time.Time, +) error { + + timestamp := types.Timestamp(lastCheckedTime.UTC().Format(time.RFC3339)) + + + sqlStr := "INSERT INTO dvo.runtimes_heartbeats VALUES " + vals := []interface{}{} + + itemIndex := 1 + + for _, instanceID := range instanceIDs { + sqlStr += "($"+fmt.Sprint(itemIndex)+", $"+fmt.Sprint(itemIndex+1)+")," + vals = append(vals, instanceID, timestamp) + itemIndex += 2 + } + //trim the last , + sqlStr = sqlStr[0:len(sqlStr)-1] + + sqlStr += ";" + + + // Begin a new transaction. + tx, err := storage.connection.Begin() + if err != nil { + return err + } + + + err = func(tx *sql.Tx) error { + // Check if there is a more recent report for the cluster already in the database. + _, err := tx.Exec(sqlStr, vals...) + // stmt, err := tx.Prepare(sqlStr) + // if err != nil { + // return err + // } + // _, err = stmt.Exec(vals...) + if err != nil { + return err + } + + return nil + }(tx) + + finishTransaction(tx, err) + + return err +} diff --git a/storage/dvo_recommendations_storage_test.go b/storage/dvo_recommendations_storage_test.go index d81d95cc..7c9e675c 100644 --- a/storage/dvo_recommendations_storage_test.go +++ b/storage/dvo_recommendations_storage_test.go @@ -892,3 +892,83 @@ func TestFilterWorkloads(t *testing.T) { }) } } + + +func TestWriteHeartbeat(t *testing.T) { + mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true) + defer closer() + + + err := mockStorage.WriteHeartbeat( + "x", now, + ) + helpers.FailOnError(t, err) + + connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage)) + + query := ` + SELECT * + FROM dvo.runtimes_heartbeats + ` + + rows, err := connection.Query(query) + helpers.FailOnError(t, err) + + defer rows.Close() + + for rows.Next() { + var ( + instanceID string + timestamp time.Time + ) + err = rows.Scan( + &instanceID, ×tamp, + ) + helpers.FailOnError(t, err) + + assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339)) + assert.Equal(t, "x", instanceID) + } +} + + +func TestWriteHeartbeats(t *testing.T) { + mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true) + defer closer() + + data := []string{"x", "y", "z"} + + err := mockStorage.WriteHeartbeats( + data, now, + ) + helpers.FailOnError(t, err) + + connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage)) + + query := ` + SELECT * + FROM dvo.runtimes_heartbeats + ` + + rows, err := connection.Query(query) + helpers.FailOnError(t, err) + + defer rows.Close() + + var instances []string + + for rows.Next() { + var ( + instanceID string + timestamp time.Time + ) + err = rows.Scan( + &instanceID, ×tamp, + ) + helpers.FailOnError(t, err) + + assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339)) + instances = append(instances, instanceID) + } + assert.Equal(t, data, instances) +} \ No newline at end of file diff --git a/storage/noop_dvo_recommendations_storage.go b/storage/noop_dvo_recommendations_storage.go index 6ce6fd87..12663bfa 100644 --- a/storage/noop_dvo_recommendations_storage.go +++ b/storage/noop_dvo_recommendations_storage.go @@ -104,3 +104,11 @@ func (*NoopDVOStorage) WriteHeartbeat( ) error { return nil } + +// WriteHeartbeas noop +func (*NoopDVOStorage) WriteHeartbeats( + []string, + time.Time, +) error { + return nil +}