Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CCXDEV-14564] DVO writer write heartbeats #2093

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions consumer/dvo_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (DVORulesProcessor) storeInDB(consumer *KafkaConsumer, msg *sarama.Consumer
}
tStored := time.Now()
logDuration(tTimeCheck, tStored, msg.Offset, "db_store_report")
err = consumer.writeHeartbeats(msg, message, lastCheckedTime)
if err != nil {
return message.RequestID, message, err
}
return message.RequestID, message, nil
}

Expand Down Expand Up @@ -163,3 +167,31 @@ func (consumer *KafkaConsumer) writeDVOReport(
logMessageError(consumer, msg, &message, unexpectedStorageType, err)
return err
}

func (consumer *KafkaConsumer) writeHeartbeats(
msg *sarama.ConsumerMessage, message incomingMessage, lastCheckedTime time.Time,
) error {
if dvoStorage, ok := consumer.Storage.(storage.DVORecommendationsStorage); ok {
var uids []string

for _, w := range message.ParsedWorkloads {
for _, workload := range w.Workloads {
uids = append(uids, workload.UID)
}
}

err := dvoStorage.WriteHeartbeats(
uids,
lastCheckedTime,
)
if err != nil {
logMessageError(consumer, msg, &message, "Error writing heartbeats to database", err)
return err
}
logMessageDebug(consumer, msg, &message, "Stored heartbeats")
return nil
}
err := errors.New("heartbeats could not be stored")
logMessageError(consumer, msg, &message, unexpectedStorageType, err)
return err
}
49 changes: 49 additions & 0 deletions storage/dvo_recommendations_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type DVORecommendationsStorage interface {
) (types.DVOReport, error)
DeleteReportsForOrg(orgID types.OrgID) error
WriteHeartbeat(string, time.Time) error
WriteHeartbeats([]string, time.Time) error
}

const (
Expand Down Expand Up @@ -723,3 +724,51 @@ func (storage DVORecommendationsDBStorage) WriteHeartbeat(

return err
}

// WriteHeartbeats insert multiple heartbeats
func (storage DVORecommendationsDBStorage) WriteHeartbeats(
ikerreyes marked this conversation as resolved.
Show resolved Hide resolved
instanceIDs []string,
lastCheckedTime time.Time,
) error {
timestamp := types.Timestamp(lastCheckedTime.UTC().Format(time.RFC3339))

sqlStr := "INSERT INTO dvo.runtimes_heartbeats VALUES "
vals := []interface{}{}

itemIndex := 1

for _, instanceID := range instanceIDs {
sqlStr += "($" + fmt.Sprint(itemIndex) + ", $" + fmt.Sprint(itemIndex+1) + "),"
vals = append(vals, instanceID, timestamp)
itemIndex += 2
}
//trim the last ,
sqlStr = sqlStr[0 : len(sqlStr)-1]

sqlStr += ";"

// Begin a new transaction.
tx, err := storage.connection.Begin()
if err != nil {
return err
}

err = func(tx *sql.Tx) error {
ikerreyes marked this conversation as resolved.
Show resolved Hide resolved
// Check if there is a more recent report for the cluster already in the database.
_, err := tx.Exec(sqlStr, vals...)
// stmt, err := tx.Prepare(sqlStr)
// if err != nil {
// return err
// }
// _, err = stmt.Exec(vals...)
if err != nil {
return err
}

return nil
}(tx)

finishTransaction(tx, err)

return err
}
77 changes: 77 additions & 0 deletions storage/dvo_recommendations_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,3 +892,80 @@ func TestFilterWorkloads(t *testing.T) {
})
}
}

func TestWriteHeartbeat(t *testing.T) {
mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true)
defer closer()

err := mockStorage.WriteHeartbeat(
"x", now,
)
helpers.FailOnError(t, err)

connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage))

query := `
SELECT *
FROM dvo.runtimes_heartbeats
`

rows, err := connection.Query(query)
helpers.FailOnError(t, err)

defer func() { _ = rows.Close() }()

for rows.Next() {
var (
instanceID string
timestamp time.Time
)
err = rows.Scan(
&instanceID, &timestamp,
)
helpers.FailOnError(t, err)

assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339))
assert.Equal(t, "x", instanceID)
}
}

func TestWriteHeartbeats(t *testing.T) {
mockStorage, closer := ira_helpers.MustGetPostgresStorageDVO(t, true)
defer closer()

data := []string{"x", "y", "z"}

err := mockStorage.WriteHeartbeats(
data, now,
)
helpers.FailOnError(t, err)

connection := storage.GetConnectionDVO(mockStorage.(*storage.DVORecommendationsDBStorage))

query := `
SELECT *
FROM dvo.runtimes_heartbeats
`

rows, err := connection.Query(query)
helpers.FailOnError(t, err)

defer func() { _ = rows.Close() }()

var instances []string

for rows.Next() {
var (
instanceID string
timestamp time.Time
)
err = rows.Scan(
&instanceID, &timestamp,
)
helpers.FailOnError(t, err)

assert.Equal(t, now.UTC().Format(time.RFC3339), timestamp.UTC().Format(time.RFC3339))
instances = append(instances, instanceID)
}
assert.Equal(t, data, instances)
}
8 changes: 8 additions & 0 deletions storage/noop_dvo_recommendations_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ func (*NoopDVOStorage) WriteHeartbeat(
) error {
return nil
}

// WriteHeartbeats noop
func (*NoopDVOStorage) WriteHeartbeats(
[]string,
time.Time,
) error {
return nil
}
Loading