diff --git a/pkg/exporter/api/controller_test.go b/pkg/exporter/api/controller_test.go index f9bd08521..409684015 100644 --- a/pkg/exporter/api/controller_test.go +++ b/pkg/exporter/api/controller_test.go @@ -134,6 +134,8 @@ func TestNewExporter(t *testing.T) { baseURL := fmt.Sprintf(BaseURL, serverPort) servercConfig.HTTP.Port = serverPort servercConfig.BinaryType = "minimal" + servercConfig.Storage.Dedupe = false + servercConfig.Storage.GC = false serverController := zotapi.NewController(servercConfig) So(serverController, ShouldNotBeNil) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 174559d45..aa2259312 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -60,7 +60,7 @@ func (pq *generatorsPriorityQueue) Pop() any { const ( rateLimiterScheduler = 400 - rateLimit = 5 * time.Second + rateLimit = 50 * time.Millisecond NumWorkersMultiplier = 4 sendMetricsInterval = 5 * time.Second ) @@ -241,7 +241,7 @@ func (scheduler *Scheduler) RunScheduler() { ctx, cancel := context.WithCancel(context.Background()) scheduler.cancelFunc = cancel - throttle := time.NewTicker(rateLimit).C + throttle := time.NewTicker(scheduler.RateLimit).C numWorkers := scheduler.NumWorkers @@ -278,10 +278,14 @@ func (scheduler *Scheduler) RunScheduler() { task := scheduler.getTask() - if task != nil { - // push tasks into worker pool until workerChan is full. - scheduler.workerChan <- task + if task == nil { + <-throttle + + continue } + + // push tasks into worker pool until workerChan is full. + scheduler.workerChan <- task } } }() diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9980cd310..defcf6b15 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -130,10 +130,10 @@ func TestScheduler(t *testing.T) { genH := &shortGenerator{log: logger, priority: "high priority"} // interval has to be higher than throttle value to simulate - sch.SubmitGenerator(genH, 6*time.Second, scheduler.HighPriority) + sch.SubmitGenerator(genH, 1*time.Second, scheduler.HighPriority) sch.RunScheduler() - time.Sleep(7 * time.Second) + time.Sleep(2 * time.Second) sch.Shutdown() data, err := os.ReadFile(logFile.Name()) @@ -152,6 +152,7 @@ func TestScheduler(t *testing.T) { cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3} metrics := monitoring.NewMetricsServer(true, logger) sch := scheduler.NewScheduler(cfg, metrics, logger) + sch.RateLimit = 5 * time.Second genL := &generator{log: logger, priority: "low priority"} sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority) @@ -212,7 +213,7 @@ func TestScheduler(t *testing.T) { sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority) sch.RunScheduler() - time.Sleep(4 * time.Second) + time.Sleep(1 * time.Second) sch.Shutdown() data, err := os.ReadFile(logFile.Name()) @@ -275,7 +276,7 @@ func TestScheduler(t *testing.T) { sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.MediumPriority) sch.RunScheduler() - time.Sleep(4 * time.Second) + time.Sleep(1 * time.Second) sch.Shutdown() data, err := os.ReadFile(logFile.Name())