From cffdefc220812264c646b1c52dc7f5d3e15b8ec7 Mon Sep 17 00:00:00 2001 From: pereiramarco011 Date: Thu, 7 Mar 2024 11:34:46 +0000 Subject: [PATCH 1/2] analyzer using parallelScan flag --- internal/console/analyze.go | 3 + pkg/analyzer/analyzer.go | 119 ++++++++++++++++++------------- pkg/engine/inspector.go | 13 +--- pkg/utils/adjust_workers.go | 12 ++++ pkg/utils/adjust_workers_test.go | 19 +++++ 5 files changed, 104 insertions(+), 62 deletions(-) create mode 100644 pkg/utils/adjust_workers.go create mode 100644 pkg/utils/adjust_workers_test.go diff --git a/internal/console/analyze.go b/internal/console/analyze.go index d0e7150d56d..7d2325d09fc 100644 --- a/internal/console/analyze.go +++ b/internal/console/analyze.go @@ -11,6 +11,7 @@ import ( "github.com/Checkmarx/kics/pkg/analyzer" "github.com/Checkmarx/kics/pkg/engine/source" "github.com/Checkmarx/kics/pkg/model" + "github.com/Checkmarx/kics/pkg/utils" "github.com/rs/zerolog/log" "github.com/spf13/cobra" ) @@ -68,6 +69,7 @@ func getAnalyzeParameters() *analyzer.Parameters { Path: flags.GetMultiStrFlag(flags.AnalyzePath), Results: flags.GetStrFlag(flags.AnalyzeResults), MaxFileSize: flags.GetIntFlag(flags.MaxFileSizeFlag), + NumWorkers: flags.GetIntFlag(flags.ParallelScanFile), } return &analyzeParams @@ -92,6 +94,7 @@ func executeAnalyze(analyzeParams *analyzer.Parameters) error { ExcludeGitIgnore: false, GitIgnoreFileName: "", MaxFileSize: analyzeParams.MaxFileSize, + NumWorkers: utils.AdjustNumWorkers(analyzeParams.NumWorkers), } analyzedPaths, err := analyzer.Analyze(analyzerStruct) diff --git a/pkg/analyzer/analyzer.go b/pkg/analyzer/analyzer.go index 0d4a0db48bb..78fc416cb3e 100644 --- a/pkg/analyzer/analyzer.go +++ b/pkg/analyzer/analyzer.go @@ -132,6 +132,7 @@ type Parameters struct { Results string Path []string MaxFileSize int + NumWorkers int } // regexSlice is a struct to contain a slice of regex @@ -154,6 +155,7 @@ type Analyzer struct { GitIgnoreFileName string ExcludeGitIgnore bool MaxFileSize int + NumWorkers int } // types is a map that contains the regex by type @@ -289,6 +291,7 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) { // results is the channel shared by the workers that contains the types found results := make(chan string) locCount := make(chan int) + jobsChannel := make(chan analyzerInfo) ignoreFiles := make([]string, 0) projectConfigFiles := make([]string, 0) done := make(chan bool) @@ -328,18 +331,20 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) { a.Types, a.ExcludeTypes = typeLower(a.Types, a.ExcludeTypes) - // Start the workers - for _, file := range files { + // Start a goroutine for each worker + for w := 0; w < a.NumWorkers; w++ { wg.Add(1) - // analyze the files concurrently - a := &analyzerInfo{ - typesFlag: a.Types, - excludeTypesFlag: a.ExcludeTypes, - filePath: file, - } - go a.worker(results, unwanted, locCount, &wg) + + go func() { + // Decrement the counter when the goroutine completes + defer wg.Done() + analyzeFileWorker(results, unwanted, locCount, jobsChannel) + }() } + // create the jobs for the workers + go createAnalyzeJobs(files, a, jobsChannel) + go func() { // close channel results when the worker has finished writing into it defer func() { @@ -363,52 +368,64 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) { return returnAnalyzedPaths, nil } +// createAnalyzeJobs for parallel worker processing +func createAnalyzeJobs(files []string, a *Analyzer, jobsChannel chan<- analyzerInfo) { + defer close(jobsChannel) + for _, file := range files { + jobsChannel <- analyzerInfo{ + typesFlag: a.Types, + excludeTypesFlag: a.ExcludeTypes, + filePath: file, + } + } +} + // worker determines the type of the file by ext (dockerfile and terraform)/content and // writes the answer to the results channel // if no types were found, the worker will write the path of the file in the unwanted channel -func (a *analyzerInfo) worker(results, unwanted chan<- string, locCount chan<- int, wg *sync.WaitGroup) { - defer wg.Done() - - ext := utils.GetExtension(a.filePath) - linesCount, _ := utils.LineCounter(a.filePath) - - switch ext { - // Dockerfile (direct identification) - case ".dockerfile", "Dockerfile": - if a.isAvailableType(dockerfile) { - results <- dockerfile - locCount <- linesCount - } - // Dockerfile (indirect identification) - case "possibleDockerfile", ".ubi8", ".debian": - if a.isAvailableType(dockerfile) && isDockerfile(a.filePath) { - results <- dockerfile - locCount <- linesCount - } else { - unwanted <- a.filePath - } - // Terraform - case ".tf", "tfvars": - if a.isAvailableType(terraform) { - results <- terraform - locCount <- linesCount - } - // GRPC - case ".proto": - if a.isAvailableType(grpc) { - results <- grpc - locCount <- linesCount - } - // It could be Ansible Config or Ansible Inventory - case ".cfg", ".conf", ".ini": - if a.isAvailableType(ansible) { - results <- ansible - locCount <- linesCount +func analyzeFileWorker(results, unwanted chan<- string, locCount chan<- int, jobsChannel <-chan analyzerInfo) { + for a := range jobsChannel { + ext := utils.GetExtension(a.filePath) + linesCount, _ := utils.LineCounter(a.filePath) + + switch ext { + // Dockerfile (direct identification) + case ".dockerfile", "Dockerfile": + if a.isAvailableType(dockerfile) { + results <- dockerfile + locCount <- linesCount + } + // Dockerfile (indirect identification) + case "possibleDockerfile", ".ubi8", ".debian": + if a.isAvailableType(dockerfile) && isDockerfile(a.filePath) { + results <- dockerfile + locCount <- linesCount + } else { + unwanted <- a.filePath + } + // Terraform + case ".tf", "tfvars": + if a.isAvailableType(terraform) { + results <- terraform + locCount <- linesCount + } + // GRPC + case ".proto": + if a.isAvailableType(grpc) { + results <- grpc + locCount <- linesCount + } + // It could be Ansible Config or Ansible Inventory + case ".cfg", ".conf", ".ini": + if a.isAvailableType(ansible) { + results <- ansible + locCount <- linesCount + } + /* It could be Ansible, Buildah, CICD, CloudFormation, Crossplane, OpenAPI, Azure Resource Manager + Docker Compose, Knative, Kubernetes, Pulumi, ServerlessFW or Google Deployment Manager*/ + case yaml, yml, json, sh: + a.checkContent(results, unwanted, locCount, linesCount, ext) } - /* It could be Ansible, Buildah, CICD, CloudFormation, Crossplane, OpenAPI, Azure Resource Manager - Docker Compose, Knative, Kubernetes, Pulumi, ServerlessFW or Google Deployment Manager*/ - case yaml, yml, json, sh: - a.checkContent(results, unwanted, locCount, linesCount, ext) } } diff --git a/pkg/engine/inspector.go b/pkg/engine/inspector.go index 0d876bdba20..ecd4bbecced 100644 --- a/pkg/engine/inspector.go +++ b/pkg/engine/inspector.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" - "runtime" "strings" "sync" "time" @@ -18,6 +17,7 @@ import ( "github.com/Checkmarx/kics/pkg/detector/helm" "github.com/Checkmarx/kics/pkg/engine/source" "github.com/Checkmarx/kics/pkg/model" + "github.com/Checkmarx/kics/pkg/utils" "github.com/open-policy-agent/opa/ast" "github.com/open-policy-agent/opa/cover" "github.com/open-policy-agent/opa/rego" @@ -100,15 +100,6 @@ var ( } ) -func adjustNumWorkers(workers int) int { - // for the case in which the end user decides to use num workers as "auto-detected" - // we will set the number of workers to the number of CPUs available based on GOMAXPROCS value - if workers == 0 { - return runtime.GOMAXPROCS(-1) - } - return workers -} - // NewInspector initializes a inspector, compiling and loading queries for scan and its tracker func NewInspector( ctx context.Context, @@ -170,7 +161,7 @@ func NewInspector( excludeResults: excludeResults, detector: lineDetector, queryExecTimeout: queryExecTimeout, - numWorkers: adjustNumWorkers(numWorkers), + numWorkers: utils.AdjustNumWorkers(numWorkers), }, nil } diff --git a/pkg/utils/adjust_workers.go b/pkg/utils/adjust_workers.go new file mode 100644 index 00000000000..030983103b0 --- /dev/null +++ b/pkg/utils/adjust_workers.go @@ -0,0 +1,12 @@ +package utils + +import "runtime" + +func AdjustNumWorkers(workers int) int { + // for the case in which the end user decides to use num workers as "auto-detected" + // we will set the number of workers to the number of CPUs available based on GOMAXPROCS value + if workers == 0 { + return runtime.GOMAXPROCS(-1) + } + return workers +} diff --git a/pkg/utils/adjust_workers_test.go b/pkg/utils/adjust_workers_test.go new file mode 100644 index 00000000000..c01f2f604e6 --- /dev/null +++ b/pkg/utils/adjust_workers_test.go @@ -0,0 +1,19 @@ +package utils + +import ( + "runtime" + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_AdjustWorkers(t *testing.T) { + workers0 := AdjustNumWorkers(0) + require.Equal(t, workers0, runtime.GOMAXPROCS(-1)) + workers1 := AdjustNumWorkers(1) + require.Equal(t, workers1, 1) + workersnegative1 := AdjustNumWorkers(-1) + require.Equal(t, workersnegative1, -1) + workers100 := AdjustNumWorkers(100) + require.Equal(t, workers100, 100) +} From 0c5528bb1305b653148b5454d804448839713095 Mon Sep 17 00:00:00 2001 From: pereiramarco011 Date: Thu, 7 Mar 2024 12:03:34 +0000 Subject: [PATCH 2/2] add analyze cmd help and flag --- e2e/fixtures/assets/analyze_help | 11 ++++++----- internal/console/assets/analyze-flags.json | 7 +++++++ pkg/scan/utils.go | 1 + 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/e2e/fixtures/assets/analyze_help b/e2e/fixtures/assets/analyze_help index 840508a76ee..d88f4579d2a 100644 --- a/e2e/fixtures/assets/analyze_help +++ b/e2e/fixtures/assets/analyze_help @@ -1,11 +1,12 @@ Usage: - kics remediate [flags] + kics analyze [flags] Flags: - -h, --help help for analyze - --analyze-path strings paths or directories to scan - example: "./somepath,somefile.txt" - --analyze-results string points to the JSON results file of analyzer (default "platforms.json") + --analyze-path strings paths or directories to scan + example: "./somepath,somefile.txt" + --analyze-results string points to the JSON results file of analyzer (default "platforms.json") + -h, --help help for analyze + --parallel int number of workers used for parallel analyzing, set 0 to auto-detect parallelism (default 1) Global Flags: --ci display only log messages to CLI output (mutually exclusive with silent) diff --git a/internal/console/assets/analyze-flags.json b/internal/console/assets/analyze-flags.json index da64f49da87..4915a3c7ee6 100644 --- a/internal/console/assets/analyze-flags.json +++ b/internal/console/assets/analyze-flags.json @@ -10,5 +10,12 @@ "shorthandFlag": "", "defaultValue": "platforms.json", "usage": "points to the JSON results file of analyzer" + }, + "parallel": { + "flagType": "int", + "shorthandFlag": "", + "defaultValue": "1", + "usage": "number of workers used for parallel analyzing, set 0 to auto-detect parallelism", + "validation": "validateWorkersFlag" } } diff --git a/pkg/scan/utils.go b/pkg/scan/utils.go index 49c81ffc69f..d8e03f1ed08 100644 --- a/pkg/scan/utils.go +++ b/pkg/scan/utils.go @@ -60,6 +60,7 @@ func (c *Client) prepareAndAnalyzePaths(ctx context.Context) (provider.Extracted GitIgnoreFileName: ".gitignore", ExcludeGitIgnore: c.ScanParams.ExcludeGitIgnore, MaxFileSize: c.ScanParams.MaxFileSizeFlag, + NumWorkers: utils.AdjustNumWorkers(c.ScanParams.ParallelScanFlag), } pathTypes, errAnalyze := analyzePaths(a)