From 80a76e5f288744fed8357fb0fe63968f486caa2e Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 23 Feb 2023 22:40:18 +0100 Subject: [PATCH] WIP --- pkg/icingadb/sla.go | 80 +++++++++++++++++++++++++++++++++++++++ pkg/icingadb/sync.go | 27 +++++++++++++ schema/mysql/schema.sql | 13 +++++++ tests/object_sync_test.go | 18 +++++++++ 4 files changed, 138 insertions(+) create mode 100644 pkg/icingadb/sla.go diff --git a/pkg/icingadb/sla.go b/pkg/icingadb/sla.go new file mode 100644 index 000000000..1e8c9b3cc --- /dev/null +++ b/pkg/icingadb/sla.go @@ -0,0 +1,80 @@ +package icingadb + +import ( + "context" + "crypto/rand" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "time" +) + +type SlaHistoryTrail struct { + v1.EntityWithoutChecksum `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + EventTime types.UnixMilli `json:"event_time"` + EventType string `json:"event_type"` +} + +func CheckableToSlaTrailEntities(ctx context.Context, checkables <-chan contracts.Entity, eventType string) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entities) + + for { + select { + case checkable, ok := <-checkables: + if !ok { + return nil + } + + id, err := generateBinaryId() + if err != nil { + return errors.Wrap(err, "can't generate sla history trail ID") + } + + entity := &SlaHistoryTrail{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: id}, + }, + ObjectType: utils.Name(checkable), + EventTime: types.UnixMilli(time.Now()), + EventType: eventType, + } + + switch ptr := checkable.(type) { + case *v1.Host: + entity.HostId = ptr.Id + entity.EnvironmentId = ptr.EnvironmentId + case *v1.Service: + entity.HostId = ptr.HostId + entity.ServiceId = ptr.Id + entity.EnvironmentId = ptr.EnvironmentId + } + + entities <- entity + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return entities, com.WaitAsync(g) +} + +// GenerateBinaryId generates a 20 byte length random id +func generateBinaryId() (types.Binary, error) { + id := make([]byte, 20) + _, err := rand.Read(id) + + return id, err +} diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 790f11e43..f45bdf5d7 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -106,6 +106,8 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g, ctx := errgroup.WithContext(ctx) stat := getCounterForEntity(delta.Subject.Entity()) + var subjectType = delta.Subject.Entity() + // Create if len(delta.Create) > 0 { s.logger.Infof("Inserting %d items of type %s", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) @@ -131,6 +133,19 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) + + switch subjectType.(type) { + case *v1.Host, *v1.Service: + s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + + var slaTrails <-chan contracts.Entity + slaTrails, errs := CheckableToSlaTrailEntities(ctx, entities, "create") + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat)) + }) + } } // Update @@ -163,6 +178,18 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) }) + + switch subjectType.(type) { + case *v1.Host, *v1.Service: + s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + var slaTrails <-chan contracts.Entity + slaTrails, errors := CheckableToSlaTrailEntities(ctx, delta.Delete.Entities(ctx), "delete") + com.ErrgroupReceive(g, errors) + + g.Go(func() error { + return s.db.CreateStreamed(ctx, slaTrails, OnSuccessIncrement[contracts.Entity](stat)) + }) + } } return g.Wait() diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index f13fe4cef..aa2a5e55e 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1321,6 +1321,19 @@ CREATE TABLE sla_history_downtime ( INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; +CREATE TABLE sla_history_trail ( + id binary(20) NOT NULL, + environment_id binary(20) NOT NULL COMMENT 'environment.id', + object_type enum('host', 'service') NOT NULL, + host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)', + service_id binary(20) NOT NULL COMMENT 'service.id (may reference already deleted services)', + + PRIMARY KEY (id), + + event_type enum('delete', 'create') NOT NULL, + event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; + CREATE TABLE icingadb_schema ( id int unsigned NOT NULL AUTO_INCREMENT, version smallint unsigned NOT NULL, diff --git a/tests/object_sync_test.go b/tests/object_sync_test.go index 27d3c9551..8d5971504 100644 --- a/tests/object_sync_test.go +++ b/tests/object_sync_test.go @@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) { t.Skip() }) + t.Run("Sla History Trail", func(t *testing.T) { + t.Parallel() + + assert.Eventuallyf(t, func() bool { + var count int + err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL")) + require.NoError(t, err, "querying hosts sla history trail should not fail") + return count == len(data.Hosts) + }, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database") + + assert.Eventuallyf(t, func() bool { + var count int + err := db.Get(&count, db.Rebind("SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL")) + require.NoError(t, err, "querying services sla history trail should not fail") + return count == len(data.Services) + }, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database") + }) + t.Run("RuntimeUpdates", func(t *testing.T) { t.Parallel()