diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index a9eed7fec..900653a3c 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -506,7 +506,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int { // YieldAll executes the query with the supplied scope, // scans each resulting row into an entity returned by the factory function, // and streams them into a returned channel. -func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) { +func (db *DB) YieldAll( + ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, namedQueryParams bool, scope ...interface{}, +) (<-chan contracts.Entity, <-chan error) { entities := make(chan contracts.Entity, 1) g, ctx := errgroup.WithContext(ctx) @@ -515,7 +517,14 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF defer db.log(ctx, query, &counter).Stop() defer close(entities) - rows, err := db.NamedQueryContext(ctx, query, scope) + var rows *sqlx.Rows + var err error + if namedQueryParams { + rows, err = db.NamedQueryContext(ctx, query, scope) + } else { + rows, err = db.QueryxContext(ctx, query, scope...) + } + if err != nil { return internal.CantPerformQuery(err, query) } diff --git a/pkg/icingadb/sla.go b/pkg/icingadb/sla.go new file mode 100644 index 000000000..8ace8a0f2 --- /dev/null +++ b/pkg/icingadb/sla.go @@ -0,0 +1,87 @@ +package icingadb + +import ( + "context" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/types" + "golang.org/x/sync/errgroup" + "time" +) + +type SlaHistoryTrail struct { + Id types.Int `json:"id" db:"-"` + v1.EnvironmentMeta `json:",inline"` + HostId types.Binary `json:"host_id"` + ServiceId types.Binary `json:"service_id"` + EventType string `json:"event_type"` + EventTime types.UnixMilli `json:"event_time"` +} + +// Fingerprint implements the contracts.Fingerprinter interface. +func (e SlaHistoryTrail) Fingerprint() contracts.Fingerprinter { + return e +} + +// ID implements part of the contracts.IDer interface. +func (e SlaHistoryTrail) ID() contracts.ID { + return e.Id +} + +// SetID implements part of the contracts.IDer interface. +func (e *SlaHistoryTrail) SetID(id contracts.ID) { + e.Id = id.(types.Int) +} + +type SlaHostHistoryTrailColumns struct { + v1.EntityWithoutChecksum `json:",inline"` + v1.EnvironmentMeta `json:",inline"` +} + +type SlaServiceHistoryTrailColumns struct { + v1.EntityWithoutChecksum `json:",inline"` + v1.EnvironmentMeta `json:",inline"` + HostId types.Binary `json:"host_id"` +} + +func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity { + entities := make(chan contracts.Entity, 1) + + g.Go(func() error { + defer close(entities) + + for { + select { + case checkable, ok := <-checkables: + if !ok { + return nil + } + + entity := &SlaHistoryTrail{ + EventTime: types.UnixMilli(time.Now()), + EventType: eventType, + } + + switch ptr := checkable.(type) { + case *v1.Host: + entity.HostId = ptr.Id + entity.EnvironmentId = ptr.EnvironmentId + case *v1.Service: + entity.HostId = ptr.HostId + entity.ServiceId = ptr.Id + entity.EnvironmentId = ptr.EnvironmentId + } + + entities <- entity + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return entities +} + +var ( + _ contracts.Entity = (*SlaHistoryTrail)(nil) +) diff --git a/pkg/icingadb/sync.go b/pkg/icingadb/sync.go index 790f11e43..d4a5e03e6 100644 --- a/pkg/icingadb/sync.go +++ b/pkg/icingadb/sync.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" "runtime" + "strings" "time" ) @@ -85,7 +86,8 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error { actual, dbErrs := s.db.YieldAll( ctx, subject.FactoryForDelta(), - s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(), + s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), + true, e.Meta(), ) // Let errors from DB cancel our group. com.ErrgroupReceive(g, dbErrs) @@ -128,9 +130,31 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { entities = delta.Create.Entities(ctx) } + var slaTrailEntities chan contracts.Entity + onSuccessHandlers := []OnSuccess[contracts.Entity]{ + OnSuccessIncrement[contracts.Entity](stat), + } + + switch delta.Subject.Entity().(type) { + case *v1.Host, *v1.Service: + slaTrailEntities = make(chan contracts.Entity) + onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](slaTrailEntities)) + } + g.Go(func() error { - return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat)) + if slaTrailEntities != nil { + defer close(slaTrailEntities) + } + + return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...) }) + + if slaTrailEntities != nil { + s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + g.Go(func() error { + return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, slaTrailEntities, "create")) + }) + } } // Update @@ -160,6 +184,40 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error { // Delete if len(delta.Delete) > 0 { s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' ')) + entity := delta.Subject.Entity() + switch entity.(type) { + case *v1.Host, *v1.Service: + g.Go(func() error { + s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(entity), ' ')) + + var entities <-chan contracts.Entity + var columns interface{} + + if _, ok := entity.(*v1.Host); ok { + columns = &SlaHostHistoryTrailColumns{} + } else { + columns = &SlaServiceHistoryTrailColumns{} + } + + query := s.db.BuildSelectStmt(entity, columns) + if len(delta.Delete) == 1 { + query += ` WHERE id = ?` + } else { + var placeholders []string + for i := 0; i < len(delta.Delete); i++ { + placeholders = append(placeholders, "?") + } + + query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `)) + } + var err <-chan error + entities, err = s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...) + com.ErrgroupReceive(g, err) + + return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, entities, "delete")) + }) + } + g.Go(func() error { return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat)) }) @@ -187,7 +245,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { actualCvs, errs := s.db.YieldAll( ctx, cv.FactoryForDelta(), - s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(), + s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), + true, e.Meta(), ) com.ErrgroupReceive(g, errs) @@ -199,7 +258,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error { actualFlatCvs, errs := s.db.YieldAll( ctx, flatCv.FactoryForDelta(), - s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(), + s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), + true, e.Meta(), ) com.ErrgroupReceive(g, errs) diff --git a/pkg/types/int.go b/pkg/types/int.go index 0e51f2101..30cec449d 100644 --- a/pkg/types/int.go +++ b/pkg/types/int.go @@ -6,7 +6,9 @@ import ( "database/sql/driver" "encoding" "encoding/json" + "fmt" "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/contracts" "strconv" ) @@ -58,6 +60,10 @@ func (i *Int) UnmarshalJSON(data []byte) error { return nil } +func (i Int) String() string { + return fmt.Sprintf("%d", i.Int64) +} + // Assert interface compliance. var ( _ json.Marshaler = Int{} @@ -65,4 +71,5 @@ var ( _ encoding.TextUnmarshaler = (*Int)(nil) _ driver.Valuer = Int{} _ sql.Scanner = (*Int)(nil) + _ contracts.ID = (*Int)(nil) ) diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index f13fe4cef..46df68a92 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime ( INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; +CREATE TABLE sla_history_trail ( + id bigint NOT NULL AUTO_INCREMENT, + environment_id binary(20) NOT NULL COMMENT 'environment.id', + host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)', + service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)', + + event_type enum('delete', 'create') NOT NULL, + event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred', + + PRIMARY KEY (id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; + CREATE TABLE icingadb_schema ( id int unsigned NOT NULL AUTO_INCREMENT, version smallint unsigned NOT NULL, diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index b3b5be0ca..689a936c4 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' ); CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' ); CREATE TYPE state_type AS ENUM ( 'hard', 'soft' ); CREATE TYPE checkable_type AS ENUM ( 'host', 'service' ); +CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' ); CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' ); CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' ); CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' ); @@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime'; COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime'; +CREATE TABLE sla_history_trail ( + id bigserial NOT NULL, + environment_id bytea20 NOT NULL, + host_id bytea20 NOT NULL, + service_id bytea20 DEFAULT NULL, + + event_type sla_trail_event_type NOT NULL, + event_time biguint NOT NULL, + + CONSTRAINT pk_sla_history_trail PRIMARY KEY (id) +); + +COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id'; +COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)'; +COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)'; +COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred'; + CREATE SEQUENCE icingadb_schema_id_seq; CREATE TABLE icingadb_schema ( diff --git a/tests/object_sync_test.go b/tests/object_sync_test.go index 27d3c9551..12f88e449 100644 --- a/tests/object_sync_test.go +++ b/tests/object_sync_test.go @@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) { t.Skip() }) + t.Run("Sla History Trail", func(t *testing.T) { + t.Parallel() + + assert.Eventuallyf(t, func() bool { + var count int + err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL") + require.NoError(t, err, "querying hosts sla history trail should not fail") + return count == len(data.Hosts) + }, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database") + + assert.Eventuallyf(t, func() bool { + var count int + err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL") + require.NoError(t, err, "querying services sla history trail should not fail") + return count == len(data.Services) + }, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database") + }) + t.Run("RuntimeUpdates", func(t *testing.T) { t.Parallel()