From d254d5b03db6e842b1ba38e9f0d3f5f1a6bc5466 Mon Sep 17 00:00:00 2001 From: Nicolas Bock Date: Fri, 13 Sep 2024 11:26:53 -0600 Subject: [PATCH] Do not re-use connections Processor Edition In order to avoid stale connections to Salesforce and files.com, do not re-use those connections (the clients) and rather create new client connections when needed. Issue: https://github.com/canonical/athena-core/issues/177 Signed-off-by: Nicolas Bock --- Dockerfile-debug | 4 +- cmd/processor/main.go | 22 ++--- pkg/processor/processor.go | 142 ++++++++++++++++++-------------- pkg/processor/processor_test.go | 9 +- 4 files changed, 93 insertions(+), 84 deletions(-) diff --git a/Dockerfile-debug b/Dockerfile-debug index ca0f25e..2353359 100644 --- a/Dockerfile-debug +++ b/Dockerfile-debug @@ -1,7 +1,7 @@ FROM ubuntu:24.04 -LABEL maintainer="Canonical Sustaining Engineering " -LABEL org.opencontainers.image.description "Athena Monitor" +LABEL maintainer="Canonical Sustaining Engineering " +LABEL org.opencontainers.image.description "Athena Debug Container" RUN apt-get update RUN DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends apt-utils diff --git a/cmd/processor/main.go b/cmd/processor/main.go index 05bbd30..2a152d4 100644 --- a/cmd/processor/main.go +++ b/cmd/processor/main.go @@ -41,32 +41,28 @@ func main() { log.Debug(line) } - filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint) - if err != nil { - panic(err) - } - - sfClient, err := common.NewSalesforceClient(cfg) - if err != nil { - panic(err) - } - natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl)) if err != nil { panic(err) } - p, err := processor.NewProcessor(filesClient, sfClient, natsClient, cfg, nil) + salesforceClientFactory := &common.BaseSalesforceClientFactory{} + filesComClientFactory := &common.BaseFilesComClientFactory{} + + p, err := processor.NewProcessor(filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil) if err != nil { panic(err) } ctx, cancel := context.WithCancel(context.Background()) - if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, name, topic string, + if err := p.Run(ctx, func( + filesComClientFactory common.FilesComClientFactory, + salesforceClientFactory common.SalesforceClientFactory, + name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber { log.Infof("Subscribing: %s - to topic: %s", name, topic) - return processor.NewBaseSubscriber(fc, sf, name, topic, reports, cfg, dbConn) + return processor.NewBaseSubscriber(filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn) }); err != nil { panic(err) } diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index e23bb4f..6571054 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -23,22 +23,22 @@ import ( ) type Processor struct { - Config *config.Config - Db *gorm.DB - FilesClient common.FilesComClient - Hostname string - Provider pubsub.Provider - SalesforceClient common.SalesforceClient + Config *config.Config + Db *gorm.DB + FilesComClientFactory common.FilesComClientFactory + Hostname string + Provider pubsub.Provider + SalesforceClientFactory common.SalesforceClientFactory } type BaseSubscriber struct { - Config *config.Config - Db *gorm.DB - FilesComClient common.FilesComClient - Name string - Options pubsub.HandlerOptions - Reports map[string]config.Report - SalesforceClient common.SalesforceClient + Config *config.Config + Db *gorm.DB + FilesComClientFactory common.FilesComClientFactory + Name string + Options pubsub.HandlerOptions + Reports map[string]config.Report + SalesforceClientFactory common.SalesforceClientFactory } func (s *BaseSubscriber) Setup(c *pubsub.Client) { @@ -56,10 +56,10 @@ type ReportToExecute struct { type ReportRunner struct { Config *config.Config Db *gorm.DB - FilescomClient common.FilesComClient + FilesComClientFactory common.FilesComClientFactory Name, Subscriber, Basedir string Reports []ReportToExecute - SalesforceClient common.SalesforceClient + SalesforceClientFactory common.SalesforceClientFactory } func RunWithTimeout(baseDir string, timeout time.Duration, command string) ([]byte, error) { @@ -118,36 +118,30 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum log.Debugf("Fetching files for path '%s' from db", filePath) result := runner.Db.Where("path = ?", filePath).First(&file) if result.Error != nil { - return fmt.Errorf("File not found with path '%s' in database", filePath) + return fmt.Errorf("file not found with path '%s' in database", filePath) } log.Infof("Fetching case with number '%s' from Salesforce", caseNumber) - sfCase, err := runner.SalesforceClient.GetCaseByNumber(caseNumber) + salesforceClient, err := runner.SalesforceClientFactory.NewSalesforceClient(runner.Config) if err != nil { - // The SalesForce connection possibly died on us. Let's try to - // revive it and then try again. - log.Warn("Creating new SF client since current one is failing") - runner.SalesforceClient, err = common.NewSalesforceClient(runner.Config) - if err != nil { - log.Errorf("Failed to reconnect to salesforce: %s", err) - panic(err) - } - sfCase, err = runner.SalesforceClient.GetCaseByNumber(caseNumber) - if err != nil { - log.Error(err) - return err - } + log.Errorf("failed to get Salesforce connection: %s", err) + return err + } + sfCase, err := salesforceClient.GetCaseByNumber(caseNumber) + if err != nil { + log.Error(err) + return err } log.Debugf("Case %s successfully fetched from Salesforce", sfCase) var newReport = new(db.Report) - newReport.Created = time.Now() newReport.CaseID = sfCase.Id - newReport.FilePath = file.Path + newReport.Created = time.Now() + newReport.FileID = file.ID newReport.FileName = filepath.Base(file.Path) + newReport.FilePath = file.Path newReport.Name = report.Name - newReport.FileID = file.ID newReport.Subscriber = report.Subscriber if runner.Config.Processor.ReportsUploadPath == "" { @@ -156,13 +150,18 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum uploadPath = path.Join(runner.Config.Processor.ReportsUploadPath, newReport.FileName) } + filesComClient, err := runner.FilesComClientFactory.NewFilesComClient(runner.Config.FilesCom.Key, runner.Config.FilesCom.Endpoint) + if err != nil { + log.Errorf("failed to get new file.com client: %s", err) + return err + } log.Debugf("Uploading script output(s) to files.com") for scriptName, output := range scriptOutputs { dst_fname := fmt.Sprintf(DefaultReportOutputFormat, uploadPath, report.Name, scriptName) log.Debugf("Uploading script output %s", dst_fname) - uploadedFilePath, err := runner.FilescomClient.Upload(string(output), dst_fname) + uploadedFilePath, err := filesComClient.Upload(string(output), dst_fname) if err != nil { - return fmt.Errorf("Failed to upload file '%s': %s", dst_fname, err.Error()) + return fmt.Errorf("failed to upload file '%s': %s", dst_fname, err.Error()) } log.Debugf("Successfully uploaded file '%s'", uploadedFilePath.Path) @@ -229,7 +228,10 @@ func renderTemplate(ctx *pongo2.Context, data string) (string, error) { return out, nil } -func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceClient, fc common.FilesComClient, subscriber, name string, +func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, + salesforceClientFactory common.SalesforceClientFactory, + filesComClientFactory common.FilesComClientFactory, + subscriber, name string, file *db.File, reports map[string]config.Report) (*ReportRunner, error) { var reportRunner ReportRunner @@ -259,13 +261,13 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl } log.Debugf("Moved file to %s", dir) - reportRunner.Config = cfg - reportRunner.Subscriber = subscriber - reportRunner.Name = name reportRunner.Basedir = dir + reportRunner.Config = cfg reportRunner.Db = dbConn - reportRunner.SalesforceClient = sf - reportRunner.FilescomClient = fc + reportRunner.FilesComClientFactory = filesComClientFactory + reportRunner.Name = name + reportRunner.SalesforceClientFactory = salesforceClientFactory + reportRunner.Subscriber = subscriber //TODO: document the template variables tplContext := pongo2.Context{ @@ -277,7 +279,7 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl var scripts = make(map[string]string) for reportName, report := range reports { - log.Debugf("Running %s script(s) (num=%d)", reportName, len(report.Scripts)) + log.Debugf("running %d '%s' script(s)", len(report.Scripts), reportName) for scriptName, script := range report.Scripts { if script.Run == "" { log.Errorf("No script provided to run on '%s'", scriptName) @@ -313,13 +315,13 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl } reportToExecute := ReportToExecute{} - reportToExecute.Timeout = timeout reportToExecute.BaseDir = reportRunner.Basedir - reportToExecute.Subscriber = reportRunner.Subscriber - reportToExecute.Name = reportName reportToExecute.File = file reportToExecute.FileName = file.Path + reportToExecute.Name = reportName reportToExecute.Scripts = scripts + reportToExecute.Subscriber = reportRunner.Subscriber + reportToExecute.Timeout = timeout reportRunner.Reports = append(reportRunner.Reports, reportToExecute) } @@ -336,7 +338,7 @@ func (runner *ReportRunner) Clean() error { } func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.Msg) error { - runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClient, s.FilesComClient, s.Name, s.Options.Topic, file, s.Reports) + runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports) if err != nil { log.Errorf("Failed to get new runner: %s", err) msg.Ack() @@ -354,7 +356,8 @@ func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.M const defaultHandlerDeadline = 10 * time.Minute -func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, +func NewBaseSubscriber( + filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) *BaseSubscriber { var subscriber = BaseSubscriber{ Options: pubsub.HandlerOptions{ @@ -363,19 +366,21 @@ func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient commo AutoAck: false, JSON: true, Deadline: defaultHandlerDeadline, - }, Reports: reports, + }, + Reports: reports, } - subscriber.FilesComClient = filesClient - subscriber.SalesforceClient = salesforceClient - subscriber.Options.Handler = subscriber.Handler subscriber.Config = cfg - subscriber.Name = topic subscriber.Db = dbConn + subscriber.FilesComClientFactory = filesComClientFactory + subscriber.Name = topic + subscriber.Options.Handler = subscriber.Handler + subscriber.SalesforceClientFactory = salesforceClientFactory return &subscriber } -func NewProcessor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, +func NewProcessor( + filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory, provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB) (*Processor, error) { var err error if dbConn == nil { @@ -391,12 +396,12 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal } return &Processor{ - Config: cfg, - Db: dbConn, - FilesClient: filesClient, - Hostname: hostname, - Provider: provider, - SalesforceClient: salesforceClient, + Config: cfg, + Db: dbConn, + FilesComClientFactory: filesComClientFactory, + Hostname: hostname, + Provider: provider, + SalesforceClientFactory: salesforceClientFactory, }, nil } @@ -477,6 +482,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. reportMap[report.Subscriber][report.CaseID][report.Name] = append(reportMap[report.Subscriber][report.CaseID][report.Name], report) } + salesforceClient, err := p.SalesforceClientFactory.NewSalesforceClient(p.Config) + if err != nil { + log.Errorf("failed to get Salesforce client: %s", err) + return + } for subscriberName, caseMap := range reportMap { for caseId, reportsByType := range caseMap { for _, reports := range reportsByType { @@ -515,10 +525,10 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. } var comment *simpleforce.SObject if p.Config.Salesforce.EnableChatter { - comment = p.SalesforceClient.PostChatter(caseId, + comment = salesforceClient.PostChatter(caseId, chunkHeader+chunk, subscriber.SFCommentIsPublic) } else { - comment = p.SalesforceClient.PostComment(caseId, + comment = salesforceClient.PostComment(caseId, chunkHeader+chunk, subscriber.SFCommentIsPublic) } if comment == nil { @@ -538,8 +548,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. } } -func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient common.FilesComClient, - salesforceClient common.SalesforceClient, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error { +func (p *Processor) Run(ctx context.Context, newSubscriberFn func( + filesComClientFactory common.FilesComClientFactory, + salesforceClientFactory common.SalesforceClientFactory, + name, topic string, reports map[string]config.Report, + cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error { if ctx == nil { var cancel context.CancelFunc @@ -554,7 +567,8 @@ func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient co }) for event := range p.Config.Processor.SubscribeTo { - go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db)) + go pubsub.Subscribe(newSubscriberFn(p.FilesComClientFactory, p.SalesforceClientFactory, + p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db)) } interval, err := time.ParseDuration(p.Config.Processor.BatchCommentsEvery) diff --git a/pkg/processor/processor_test.go b/pkg/processor/processor_test.go index 45d6a11..859b1ec 100644 --- a/pkg/processor/processor_test.go +++ b/pkg/processor/processor_test.go @@ -48,11 +48,8 @@ func (s *MockSubscriber) Setup(c *pubsub.Client) { } func (s *ProcessorTestSuite) TestRunProcessor() { - filesComClient := test.FilesComClient{} - salesforceClient := test.SalesforceClient{} - provider := &memory.MemoryProvider{} - processor, _ := NewProcessor(&filesComClient, &salesforceClient, provider, s.config, s.db) + processor, _ := NewProcessor(&test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -67,7 +64,9 @@ func (s *ProcessorTestSuite) TestRunProcessor() { var called = 0 - _ = processor.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, + _ = processor.Run(ctx, func( + filesComClientFactory common.FilesComClientFactory, + salesforceClientFactory common.SalesforceClientFactory, name string, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber { var subscriber = MockSubscriber{Options: pubsub.HandlerOptions{ Topic: topic,