diff --git a/pkg/icingadb/sla.go b/pkg/icingadb/sla.go new file mode 100644 index 000000000..547b41042 --- /dev/null +++ b/pkg/icingadb/sla.go @@ -0,0 +1,79 @@ +package icingadb + +import ( + "context" + "crypto/rand" + "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" +) + +type SlaHistoryTrail struct { + v1.EntityWithoutChecksum `json:",inline"` + EnvironmentId types.Binary `json:"environment_id"` + ObjectType string `json:"object_type"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + EventTime types.UnixMilli `json:"event_time"` + EventType string `json:"event_type"` +} + +func CheckableToSlaTrailEntities(ctx context.Context, checkables <-chan contracts.Entity, eventType string) (<-chan contracts.Entity, <-chan error) { + entities := make(chan contracts.Entity) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(entities) + + for { + select { + case checkable, ok := <-checkables: + if !ok { + return nil + } + + id, err := generateBinaryId() + if err != nil { + return errors.Wrap(err, "can't generate sla history trail ID") + } + + entity := &SlaHistoryTrail{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: id}, + }, + ObjectType: utils.Name(checkable), + EventTime: types.UnixMilli{}, + EventType: eventType, + } + + switch ptr := checkable.(type) { + case *v1.Host: + entity.HostId = ptr.Id + entity.EnvironmentId = ptr.EnvironmentId + case *v1.Service: + entity.HostId = ptr.HostId + entity.ServiceId = ptr.Id + entity.EnvironmentId = ptr.EnvironmentId + } + + entities <- entity + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return entities, com.WaitAsync(g) +} + +// GenerateBinaryId generates a 20 byte length random id +func generateBinaryId() (types.Binary, error) { + id := make([]byte, 20) + _, err := rand.Read(id) + + return id, err +} diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 790f11e43..5281cd3f7 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -131,6 +131,16 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { g.Go(func() error { return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) }) + + subjectType := delta.Subject.Entity() + if _, ok := subjectType.(*v1.Checkable); ok { + slaTrails, errs := CheckableToSlaTrailEntities(ctx, entities, "create") + com.ErrgroupReceive(g, errs) + + g.Go(func() error { + return s.db.CreateStreamed(ctx, slaTrails) + }) + } } // Update diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index f13fe4cef..aa2a5e55e 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1321,6 +1321,19 @@ CREATE TABLE sla_history_downtime ( INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; +CREATE TABLE sla_history_trail ( + id binary(20) NOT NULL, + environment_id binary(20) NOT NULL COMMENT 'environment.id', + object_type enum('host', 'service') NOT NULL, + host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)', + service_id binary(20) NOT NULL COMMENT 'service.id (may reference already deleted services)', + + PRIMARY KEY (id), + + event_type enum('delete', 'create') NOT NULL, + event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred' +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; + CREATE TABLE icingadb_schema ( id int unsigned NOT NULL AUTO_INCREMENT, version smallint unsigned NOT NULL,