Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Feb 27, 2023
1 parent efec262 commit 80a76e5
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 0 deletions.
80 changes: 80 additions & 0 deletions pkg/icingadb/sla.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package icingadb

import (
"context"
"crypto/rand"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"time"
)

type SlaHistoryTrail struct {
v1.EntityWithoutChecksum `json:",inline"`
EnvironmentId types.Binary `json:"environment_id"`
ObjectType string `json:"object_type"`
HostId types.Binary `json:"host_id"`
ServiceId types.Binary `json:"service_id"`
EventTime types.UnixMilli `json:"event_time"`
EventType string `json:"event_type"`
}

func CheckableToSlaTrailEntities(ctx context.Context, checkables <-chan contracts.Entity, eventType string) (<-chan contracts.Entity, <-chan error) {
entities := make(chan contracts.Entity)
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
defer close(entities)

for {
select {
case checkable, ok := <-checkables:
if !ok {
return nil
}

id, err := generateBinaryId()
if err != nil {
return errors.Wrap(err, "can't generate sla history trail ID")
}

entity := &SlaHistoryTrail{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: id},
},
ObjectType: utils.Name(checkable),
EventTime: types.UnixMilli(time.Now()),
EventType: eventType,
}

switch ptr := checkable.(type) {
case *v1.Host:
entity.HostId = ptr.Id
entity.EnvironmentId = ptr.EnvironmentId
case *v1.Service:
entity.HostId = ptr.HostId
entity.ServiceId = ptr.Id
entity.EnvironmentId = ptr.EnvironmentId
}

entities <- entity
case <-ctx.Done():
return ctx.Err()
}
}
})

return entities, com.WaitAsync(g)
}

// GenerateBinaryId generates a 20 byte length random id
func generateBinaryId() (types.Binary, error) {
id := make([]byte, 20)
_, err := rand.Read(id)

return id, err
}
27 changes: 27 additions & 0 deletions pkg/icingadb/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
g, ctx := errgroup.WithContext(ctx)
stat := getCounterForEntity(delta.Subject.Entity())

var subjectType = delta.Subject.Entity()

// Create
if len(delta.Create) > 0 {
s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
Expand All @@ -131,6 +133,19 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
g.Go(func() error {
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
})

switch subjectType.(type) {
case *v1.Host, *v1.Service:
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))

var slaTrails <-chan contracts.Entity
slaTrails, errs := CheckableToSlaTrailEntities(ctx, entities, "create")
com.ErrgroupReceive(g, errs)

g.Go(func() error {
return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat))
})
}
}

// Update
Expand Down Expand Up @@ -163,6 +178,18 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
})

switch subjectType.(type) {
case *v1.Host, *v1.Service:
s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
var slaTrails <-chan contracts.Entity
slaTrails, errors := CheckableToSlaTrailEntities(ctx, delta.Delete.Entities(ctx), "delete")
com.ErrgroupReceive(g, errors)

g.Go(func() error {
return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat))
})
}
}

return g.Wait()
Expand Down
13 changes: 13 additions & 0 deletions schema/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,19 @@ CREATE TABLE sla_history_downtime (
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

CREATE TABLE sla_history_trail (
id binary(20) NOT NULL,
environment_id binary(20) NOT NULL COMMENT 'environment.id',
object_type enum('host', 'service') NOT NULL,
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
service_id binary(20) NOT NULL COMMENT 'service.id (may reference already deleted services)',

PRIMARY KEY (id),

event_type enum('delete', 'create') NOT NULL,
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

CREATE TABLE icingadb_schema (
id int unsigned NOT NULL AUTO_INCREMENT,
version smallint unsigned NOT NULL,
Expand Down
18 changes: 18 additions & 0 deletions tests/object_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
t.Skip()
})

t.Run("Sla History Trail", func(t *testing.T) {
t.Parallel()

assert.Eventuallyf(t, func() bool {
var count int
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL"))
require.NoError(t, err, "querying hosts sla history trail should not fail")
return count == len(data.Hosts)
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")

assert.Eventuallyf(t, func() bool {
var count int
err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL"))
require.NoError(t, err, "querying services sla history trail should not fail")
return count == len(data.Services)
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
})

t.Run("RuntimeUpdates", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 80a76e5

Please sign in to comment.