Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: smoke and sanity integration tests are failing. #2501

Merged
merged 37 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dce182b
fix(sinks): fixes on custom headers validation.
Jul 14, 2023
48a03cb
fix(sinks): general fixes on working.
Jul 14, 2023
066add3
fix(sinks): fix conflict error code.
Jul 14, 2023
44edc77
fix(sinks): fix header not reaching redis cache.
Jul 14, 2023
6b03ee4
fix(sinks): simplify config on pass-through.
Jul 14, 2023
601a182
fix(sinks): add better error messages.
Jul 14, 2023
1e4dfd0
fix(sinker): fix handle config.
Jul 14, 2023
df8fc57
Merge branch 'develop' into fix/sanity-tests
Jul 14, 2023
b1dfc8f
fix(sinks): fix delete event.
Jul 14, 2023
a590e36
fix(sinker): fix not sending any feedback when delete key is not found.
Jul 14, 2023
47abecd
fix(maestro): fix not removing the sink deployment after the sink is …
Jul 14, 2023
9555f65
fix(agent): fix context cancellation.
Jul 14, 2023
bd7234c
fix(sinks): update sink will set state to unknown.
Jul 14, 2023
12acecc
fix(agent): fix scrape for diode with context cancel func.
Jul 14, 2023
7787122
fix(sinker): fix update setting config to null.
Jul 14, 2023
d3cb616
fix(sinker): fix update setting config to null.
Jul 14, 2023
1e48f69
fix(sinker): fix timestamps.
Jul 14, 2023
698a517
fix(sinker): fix parsing.
Jul 14, 2023
f00ad6d
fix: remove debug logs.
Jul 14, 2023
b8290ae
fix: remove debug logs.
Jul 14, 2023
3920b95
fix(maestro): remove debug logs.
Jul 14, 2023
c3b5506
fix: add timeout and proper handling of when sink has no activity.
Jul 17, 2023
96bb4ae
fix: remove redis entries before removing collector.
Jul 17, 2023
8462cbb
fix(maestro): add kill method for stale collectors without sink-id, h…
Jul 17, 2023
0f266ac
fix(maestro): remove unused method.
Jul 17, 2023
c715a90
fix(maestro): fix monitor not getting the correct sink-id.
Jul 17, 2023
4251a7e
fix(maestro): remove unused method and use simpler form to get deploy…
Jul 17, 2023
8d28c2b
fix(sinker): fixed typos.
Jul 18, 2023
962924e
fix(maestro): fix deployment name.
Jul 18, 2023
53b90c9
fix(maestro): fix deployment name.
Jul 18, 2023
f8ed6e4
fix(maestro): fix delete deployment command.
Jul 18, 2023
08bb49b
fix(maestro): make monitor just look the last log line, instead of la…
Jul 18, 2023
a98e4bc
Merge branch 'develop' into fix/sanity-tests
lpegoraro Jul 19, 2023
50141d6
fix(maestro): remove restriction on monitor to check collectors with …
Jul 19, 2023
fab038e
fix(maestro): simplify check for activity to move sink to idle.
Jul 19, 2023
db2c678
fix(sinker): simplify update activity.
Jul 19, 2023
cfda910
fix(maestro): remove log that is not useful.
Jul 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agent/backend/pktvisor/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (p *pktvisorBackend) createOtlpMqttExporter(ctx context.Context, cancelFunc
if p.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(p.mqttClient, p.otlpMetricsTopic, p.pktvisorVersion, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(p.logger)
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
// Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced.
metricsExporter, err := otlpmqttexporter.CreateMetricsExporter(ctx, set, cfg)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions maestro/config/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ func ReturnConfigYamlFromSink(_ context.Context, kafkaUrlConfig string, sink Sin
return "", errors.New("invalid authentication type")
}
exporterBuilder := FromStrategy(sink.Backend)
if exporterBuilder == nil {
return "", errors.New("invalid backend")
}
extensions, extensionName := authBuilder.GetExtensionsFromMetadata(sink.Config)
exporters, exporterName := exporterBuilder.GetExportersFromMetadata(sink.Config, extensionName)
if exporterName == "" {
Expand Down
23 changes: 23 additions & 0 deletions maestro/kubecontrol/kubecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Service interface {

// UpdateOtelCollector - update an existing collector by id
UpdateOtelCollector(ctx context.Context, ownerID, sinkID, deploymentEntry string) error

// KillOtelCollector - kill an existing collector by id, terminating by the ownerID, sinkID without the file
KillOtelCollector(ctx context.Context, ownerID, sinkID string) error
}

func (svc *deployService) collectorDeploy(ctx context.Context, operation, ownerID, sinkId, manifest string) error {
Expand Down Expand Up @@ -157,3 +160,23 @@ func (svc *deployService) DeleteOtelCollector(ctx context.Context, ownerID, sink
}
return nil
}

func (svc *deployService) KillOtelCollector(ctx context.Context, deploymentName string, sinkId string) error {
stdOutListenFunction := func(out *bufio.Scanner, err *bufio.Scanner) {
for out.Scan() {
svc.logger.Info("Deploy Info: " + out.Text())
}
for err.Scan() {
svc.logger.Info("Deploy Error: " + err.Text())
}
}

// execute action
cmd := exec.Command("kubectl", "delete", "deploy", deploymentName, "-n", namespace)
_, _, err := execCmd(ctx, cmd, svc.logger, stdOutListenFunction)
if err == nil {
svc.logger.Info(fmt.Sprintf("successfully killed the otel-collector for sink-id: %s", sinkId))
}

return nil
}
80 changes: 35 additions & 45 deletions maestro/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (svc *monitorService) Start(ctx context.Context, cancelFunc context.CancelF
}

func (svc *monitorService) getPodLogs(ctx context.Context, pod k8scorev1.Pod) ([]string, error) {
maxTailLines := int64(10)
maxTailLines := int64(1)
sinceSeconds := int64(300)
podLogOpts := k8scorev1.PodLogOptions{TailLines: &maxTailLines, SinceSeconds: &sinceSeconds}
config, err := rest.InClusterConfig()
Expand Down Expand Up @@ -105,7 +105,6 @@ func (svc *monitorService) getPodLogs(ctx context.Context, pod k8scorev1.Pod) ([
}
str := buf.String()
splitLogs := strings.Split(str, "\n")
svc.logger.Info("logs length", zap.Int("amount line logs", len(splitLogs)))
return splitLogs, nil
}

Expand Down Expand Up @@ -141,6 +140,7 @@ func (svc *monitorService) getRunningPods(ctx context.Context) ([]k8scorev1.Pod,
}

func (svc *monitorService) monitorSinks(ctx context.Context) {

runningCollectors, err := svc.getRunningPods(ctx)
if err != nil {
svc.logger.Error("error getting running pods on namespace", zap.Error(err))
Expand All @@ -160,17 +160,21 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
var sink *sinkspb.SinkRes
for _, sinkRes := range sinksRes.Sinks {
if strings.Contains(collector.Name, sinkRes.Id) {
svc.logger.Warn("collector found for sink", zap.String("collector name", collector.Name), zap.String("sink", sinkRes.Id))
sink = sinkRes
break
}
}
if sink == nil {
svc.logger.Warn("collector not found for sink, depleting collector", zap.String("collector name", collector.Name))
sinkId := collector.Name[5:51]
sinkId := collector.Name[5:41]
deploymentEntry, err := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkId)
if err != nil {
svc.logger.Error("did not find collector entry for sink", zap.String("sink-id", sinkId))
deploymentName := "otel-" + sinkId
err = svc.kubecontrol.KillOtelCollector(ctx, deploymentName, sinkId)
if err != nil {
svc.logger.Error("error removing otel collector, manual intervention required", zap.Error(err))
}
continue
}
err = svc.kubecontrol.DeleteOtelCollector(ctx, "", sinkId, deploymentEntry)
Expand All @@ -186,52 +190,38 @@ func (svc *monitorService) monitorSinks(ctx context.Context) {
}
data.SinkID = sink.Id
data.OwnerID = sink.OwnerID
// only analyze logs if current status is "active" or "warning"
var logsErr error
var status string
if sink.GetState() == "active" || sink.GetState() == "warning" {
logs, err := svc.getPodLogs(ctx, collector)
if err != nil {
svc.logger.Error("error on getting logs, skipping", zap.Error(err))
continue
}
status, logsErr = svc.analyzeLogs(logs)
if status == "fail" {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
}
logs, err := svc.getPodLogs(ctx, collector)
if err != nil {
svc.logger.Error("error on getting logs, skipping", zap.Error(err))
continue
}
status, logsErr = svc.analyzeLogs(logs)
if status == "fail" {
svc.logger.Error("error during analyze logs", zap.Error(logsErr))
continue
}
var lastActivity int64
var activityErr error
// if log analysis return active or warning we should check if have activity on collector
if status == "active" || status == "warning" {
lastActivity, activityErr = svc.eventStore.GetActivity(sink.Id)
// if logs reported 'active' status
// here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle
var idleLimit int64 = 0
if activityErr != nil || lastActivity == 0 {
svc.logger.Error("error on getting last collector activity", zap.Error(activityErr))
lastActivity, activityErr := svc.eventStore.GetActivity(sink.Id)
// if logs reported 'active' status
// here we should check if LastActivity is up-to-date, otherwise we need to set sink as idle
idleLimit := time.Now().Unix() - idleTimeSeconds // within 10 minutes
if idleLimit >= lastActivity {
//changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err)
//changing state on redis sinker
data.State.SetFromString("idle")
svc.eventStore.UpdateSinkStateCache(ctx, data)
deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr))
continue
} else {
idleLimit = time.Now().Unix() - idleTimeSeconds // within 10 minutes
}
if idleLimit >= lastActivity {
//changing state on sinks
svc.eventStore.PublishSinkStateChange(sink, "idle", logsErr, err)
//changing state on redis sinker
data.State.SetFromString("idle")
svc.eventStore.UpdateSinkStateCache(ctx, data)
deploymentEntry, errDeploy := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sink.Id)
if errDeploy != nil {
svc.logger.Error("Remove collector: error on getting collector deployment from redis", zap.Error(activityErr))
continue
}
err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deploymentEntry)
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
continue
err = svc.kubecontrol.DeleteOtelCollector(ctx, sink.OwnerID, sink.Id, deploymentEntry)
if err != nil {
svc.logger.Error("error removing otel collector", zap.Error(err))
}
continue
}
//set the new sink status if changed during checks
if sink.GetState() != status && status != "" {
Expand Down Expand Up @@ -283,7 +273,7 @@ func (svc *monitorService) analyzeLogs(logEntry []string) (status string, err er
if strings.Contains(logLine, "400 Bad Request") {
errorMessage := "error: remote write returned HTTP status 400 Bad Request"
return "warning", errors.New(errorMessage)
}
}
// other generic errors
if strings.Contains(logLine, "error") {
errStringLog := strings.TrimRight(logLine, "error")
Expand Down
33 changes: 15 additions & 18 deletions maestro/redis/consumer/hashset.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ func (es eventStore) GetDeploymentEntryFromSinkId(ctx context.Context, sinkId st
// handleSinksDeleteCollector will delete Deployment Entry and force delete otel collector
func (es eventStore) handleSinksDeleteCollector(ctx context.Context, event redis.SinksUpdateEvent) error {
es.logger.Info("Received maestro DELETE event from sinks ID", zap.String("sinkID", event.SinkID), zap.String("owner", event.Owner))
err := es.sinkerKeyRedisClient.HDel(ctx, deploymentKey, event.SinkID).Err()
if err != nil {
return err
}
err = es.RemoveSinkActivity(ctx, event.SinkID)
if err != nil {
return err
}
deploymentEntry, err := es.GetDeploymentEntryFromSinkId(ctx, event.SinkID)
if err != nil {
es.logger.Error("did not find collector entry for sink", zap.String("sink-id", event.SinkID))
Expand Down Expand Up @@ -97,48 +105,37 @@ func (es eventStore) handleSinksUpdateCollector(ctx context.Context, event redis
if err != nil {
es.logger.Error("could not fetch info for sink", zap.String("sink-id", event.SinkID), zap.Error(err))
}
var cfg types.Metadata
if err := json.Unmarshal(sinkData.Config, &cfg); err != nil {
var metadata types.Metadata
if err := json.Unmarshal(sinkData.Config, &metadata); err != nil {
return err
}
data := config.SinkData{
SinkID: sinkData.Id,
OwnerID: sinkData.OwnerID,
Backend: sinkData.Backend,
Config: cfg,
Config: metadata,
}
_ = data.State.SetFromString(sinkData.State)

deploy, err := config.GetDeploymentJson(es.kafkaUrl, data)

if err != nil {
es.logger.Error("error trying to get deployment json for sink ID", zap.String("sinkId", event.SinkID), zap.Error(err))
return err
}
es.sinkerKeyRedisClient.HSet(ctx, deploymentKey, event.SinkID, deploy)
err = es.kubecontrol.UpdateOtelCollector(ctx, event.Owner, event.SinkID, deploy)
err = es.sinkerKeyRedisClient.HSet(ctx, deploymentKey, event.SinkID, deploy).Err()
if err != nil {
es.logger.Error("error trying to update deployment json for sink ID", zap.String("sinkId", event.SinkID), zap.Error(err))
return err
}
// changing state on updated sink to unknown
sinkData.OwnerID = event.Owner
es.PublishSinkStateChange(sinkData, "unknown", err, err)
data.SinkID = sinkData.Id
data.OwnerID = sinkData.OwnerID
err = data.State.SetFromString("unknown")
if err != nil {
es.logger.Error("error setting state as unknown", zap.Error(err))
return err
}
err = es.UpdateSinkStateCache(ctx, data)
err = es.kubecontrol.UpdateOtelCollector(ctx, event.Owner, event.SinkID, deploy)
if err != nil {
es.logger.Error("error update sink cache state as unknown", zap.Error(err))
return err
}
return nil
}

func (es eventStore) UpdateSinkCache(ctx context.Context, data config.SinkData) (err error) {
data.State = config.Unknown
keyPrefix := "sinker_key"
skey := fmt.Sprintf("%s-%s:%s", keyPrefix, data.OwnerID, data.SinkID)
bytes, err := json.Marshal(data)
Expand Down
7 changes: 5 additions & 2 deletions maestro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"encoding/json"
"github.com/orb-community/orb/maestro/monitor"
"github.com/orb-community/orb/pkg/types"
"strings"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -81,15 +82,17 @@ func (svc *maestroService) Start(ctx context.Context, cancelFunction context.Can

for _, sinkRes := range sinksRes.Sinks {
sinkContext := context.WithValue(loadCtx, "sink-id", sinkRes.Id)
var data maestroconfig.SinkData
if err := json.Unmarshal(sinkRes.Config, &data); err != nil {
var metadata types.Metadata
if err := json.Unmarshal(sinkRes.Config, &metadata); err != nil {
svc.logger.Warn("failed to unmarshal sink, skipping", zap.String("sink-id", sinkRes.Id))
continue
}
if val, _ := svc.eventStore.GetDeploymentEntryFromSinkId(ctx, sinkRes.Id); val != "" {
svc.logger.Info("Skipping deploymentEntry because it is already created")
} else {
var data maestroconfig.SinkData
data.SinkID = sinkRes.Id
data.Config = metadata
err := svc.eventStore.CreateDeploymentEntry(sinkContext, data)
if err != nil {
svc.logger.Warn("failed to create deploymentEntry for sink, skipping", zap.String("sink-id", sinkRes.Id))
Expand Down
4 changes: 1 addition & 3 deletions sinker/otel/bridgeservice/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,13 @@ func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwner
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
}
} else if cfgRepo.State == config.Active || cfgRepo.State == config.Warning {
} else {
err = bs.sinkerCache.AddActivity(mfOwnerId, sinkId)
if err != nil {
bs.logger.Error("error during update last remote write", zap.String("sinkId", sinkId), zap.Error(err))
return err
}
bs.logger.Info("registering sink activity", zap.String("sinkID", sinkId), zap.String("newState", newState), zap.Any("currentState", cfgRepo.State))
} else if cfgRepo.State == config.Error {
cfgRepo.Msg = message
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions sinker/redis/consumer/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/orb-community/orb/pkg/types"
)

type updateSinkEvent struct {
sinkID string
owner string
config types.Metadata
timestamp time.Time
type UpdateSinkEvent struct {
SinkID string
Owner string
Config types.Metadata
Timestamp time.Time
}
Loading
Loading