Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kics): add usage of parallel flag for analyzer #6935

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions e2e/fixtures/assets/analyze_help
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
Usage:
kics remediate [flags]
kics analyze [flags]

Flags:
-h, --help help for analyze
--analyze-path strings paths or directories to scan
example: "./somepath,somefile.txt"
--analyze-results string points to the JSON results file of analyzer (default "platforms.json")
--analyze-path strings paths or directories to scan
example: "./somepath,somefile.txt"
--analyze-results string points to the JSON results file of analyzer (default "platforms.json")
-h, --help help for analyze
--parallel int number of workers used for parallel analyzing, set 0 to auto-detect parallelism (default 1)

Global Flags:
--ci display only log messages to CLI output (mutually exclusive with silent)
Expand Down
3 changes: 3 additions & 0 deletions internal/console/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/Checkmarx/kics/pkg/analyzer"
"github.com/Checkmarx/kics/pkg/engine/source"
"github.com/Checkmarx/kics/pkg/model"
"github.com/Checkmarx/kics/pkg/utils"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -68,6 +69,7 @@ func getAnalyzeParameters() *analyzer.Parameters {
Path: flags.GetMultiStrFlag(flags.AnalyzePath),
Results: flags.GetStrFlag(flags.AnalyzeResults),
MaxFileSize: flags.GetIntFlag(flags.MaxFileSizeFlag),
NumWorkers: flags.GetIntFlag(flags.ParallelScanFile),
}

return &analyzeParams
Expand All @@ -92,6 +94,7 @@ func executeAnalyze(analyzeParams *analyzer.Parameters) error {
ExcludeGitIgnore: false,
GitIgnoreFileName: "",
MaxFileSize: analyzeParams.MaxFileSize,
NumWorkers: utils.AdjustNumWorkers(analyzeParams.NumWorkers),
}

analyzedPaths, err := analyzer.Analyze(analyzerStruct)
Expand Down
7 changes: 7 additions & 0 deletions internal/console/assets/analyze-flags.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,12 @@
"shorthandFlag": "",
"defaultValue": "platforms.json",
"usage": "points to the JSON results file of analyzer"
},
"parallel": {
"flagType": "int",
"shorthandFlag": "",
"defaultValue": "1",
"usage": "number of workers used for parallel analyzing, set 0 to auto-detect parallelism",
"validation": "validateWorkersFlag"
}
}
119 changes: 68 additions & 51 deletions pkg/analyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type Parameters struct {
Results string
Path []string
MaxFileSize int
NumWorkers int
}

// regexSlice is a struct to contain a slice of regex
Expand All @@ -154,6 +155,7 @@ type Analyzer struct {
GitIgnoreFileName string
ExcludeGitIgnore bool
MaxFileSize int
NumWorkers int
}

// types is a map that contains the regex by type
Expand Down Expand Up @@ -289,6 +291,7 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) {
// results is the channel shared by the workers that contains the types found
results := make(chan string)
locCount := make(chan int)
jobsChannel := make(chan analyzerInfo)
ignoreFiles := make([]string, 0)
projectConfigFiles := make([]string, 0)
done := make(chan bool)
Expand Down Expand Up @@ -328,18 +331,20 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) {

a.Types, a.ExcludeTypes = typeLower(a.Types, a.ExcludeTypes)

// Start the workers
for _, file := range files {
// Start a goroutine for each worker
for w := 0; w < a.NumWorkers; w++ {
wg.Add(1)
// analyze the files concurrently
a := &analyzerInfo{
typesFlag: a.Types,
excludeTypesFlag: a.ExcludeTypes,
filePath: file,
}
go a.worker(results, unwanted, locCount, &wg)

go func() {
// Decrement the counter when the goroutine completes
defer wg.Done()
analyzeFileWorker(results, unwanted, locCount, jobsChannel)
}()
}

// create the jobs for the workers
go createAnalyzeJobs(files, a, jobsChannel)

go func() {
// close channel results when the worker has finished writing into it
defer func() {
Expand All @@ -363,52 +368,64 @@ func Analyze(a *Analyzer) (model.AnalyzedPaths, error) {
return returnAnalyzedPaths, nil
}

// createAnalyzeJobs for parallel worker processing
func createAnalyzeJobs(files []string, a *Analyzer, jobsChannel chan<- analyzerInfo) {
defer close(jobsChannel)
for _, file := range files {
jobsChannel <- analyzerInfo{
typesFlag: a.Types,
excludeTypesFlag: a.ExcludeTypes,
filePath: file,
}
}
}

// worker determines the type of the file by ext (dockerfile and terraform)/content and
// writes the answer to the results channel
// if no types were found, the worker will write the path of the file in the unwanted channel
func (a *analyzerInfo) worker(results, unwanted chan<- string, locCount chan<- int, wg *sync.WaitGroup) {
defer wg.Done()

ext := utils.GetExtension(a.filePath)
linesCount, _ := utils.LineCounter(a.filePath)

switch ext {
// Dockerfile (direct identification)
case ".dockerfile", "Dockerfile":
if a.isAvailableType(dockerfile) {
results <- dockerfile
locCount <- linesCount
}
// Dockerfile (indirect identification)
case "possibleDockerfile", ".ubi8", ".debian":
if a.isAvailableType(dockerfile) && isDockerfile(a.filePath) {
results <- dockerfile
locCount <- linesCount
} else {
unwanted <- a.filePath
}
// Terraform
case ".tf", "tfvars":
if a.isAvailableType(terraform) {
results <- terraform
locCount <- linesCount
}
// GRPC
case ".proto":
if a.isAvailableType(grpc) {
results <- grpc
locCount <- linesCount
}
// It could be Ansible Config or Ansible Inventory
case ".cfg", ".conf", ".ini":
if a.isAvailableType(ansible) {
results <- ansible
locCount <- linesCount
func analyzeFileWorker(results, unwanted chan<- string, locCount chan<- int, jobsChannel <-chan analyzerInfo) {
for a := range jobsChannel {
ext := utils.GetExtension(a.filePath)
linesCount, _ := utils.LineCounter(a.filePath)

switch ext {
// Dockerfile (direct identification)
case ".dockerfile", "Dockerfile":
if a.isAvailableType(dockerfile) {
results <- dockerfile
locCount <- linesCount
}
// Dockerfile (indirect identification)
case "possibleDockerfile", ".ubi8", ".debian":
if a.isAvailableType(dockerfile) && isDockerfile(a.filePath) {
results <- dockerfile
locCount <- linesCount
} else {
unwanted <- a.filePath
}
// Terraform
case ".tf", "tfvars":
if a.isAvailableType(terraform) {
results <- terraform
locCount <- linesCount
}
// GRPC
case ".proto":
if a.isAvailableType(grpc) {
results <- grpc
locCount <- linesCount
}
// It could be Ansible Config or Ansible Inventory
case ".cfg", ".conf", ".ini":
if a.isAvailableType(ansible) {
results <- ansible
locCount <- linesCount
}
/* It could be Ansible, Buildah, CICD, CloudFormation, Crossplane, OpenAPI, Azure Resource Manager
Docker Compose, Knative, Kubernetes, Pulumi, ServerlessFW or Google Deployment Manager*/
case yaml, yml, json, sh:
a.checkContent(results, unwanted, locCount, linesCount, ext)
}
/* It could be Ansible, Buildah, CICD, CloudFormation, Crossplane, OpenAPI, Azure Resource Manager
Docker Compose, Knative, Kubernetes, Pulumi, ServerlessFW or Google Deployment Manager*/
case yaml, yml, json, sh:
a.checkContent(results, unwanted, locCount, linesCount, ext)
}
}

Expand Down
13 changes: 2 additions & 11 deletions pkg/engine/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"

"runtime"
"strings"
"sync"
"time"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/Checkmarx/kics/pkg/detector/helm"
"github.com/Checkmarx/kics/pkg/engine/source"
"github.com/Checkmarx/kics/pkg/model"
"github.com/Checkmarx/kics/pkg/utils"
"github.com/open-policy-agent/opa/ast"
"github.com/open-policy-agent/opa/cover"
"github.com/open-policy-agent/opa/rego"
Expand Down Expand Up @@ -100,15 +100,6 @@ var (
}
)

func adjustNumWorkers(workers int) int {
// for the case in which the end user decides to use num workers as "auto-detected"
// we will set the number of workers to the number of CPUs available based on GOMAXPROCS value
if workers == 0 {
return runtime.GOMAXPROCS(-1)
}
return workers
}

// NewInspector initializes a inspector, compiling and loading queries for scan and its tracker
func NewInspector(
ctx context.Context,
Expand Down Expand Up @@ -170,7 +161,7 @@ func NewInspector(
excludeResults: excludeResults,
detector: lineDetector,
queryExecTimeout: queryExecTimeout,
numWorkers: adjustNumWorkers(numWorkers),
numWorkers: utils.AdjustNumWorkers(numWorkers),
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/scan/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (c *Client) prepareAndAnalyzePaths(ctx context.Context) (provider.Extracted
GitIgnoreFileName: ".gitignore",
ExcludeGitIgnore: c.ScanParams.ExcludeGitIgnore,
MaxFileSize: c.ScanParams.MaxFileSizeFlag,
NumWorkers: utils.AdjustNumWorkers(c.ScanParams.ParallelScanFlag),
}

pathTypes, errAnalyze := analyzePaths(a)
Expand Down
12 changes: 12 additions & 0 deletions pkg/utils/adjust_workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package utils

import "runtime"

func AdjustNumWorkers(workers int) int {
// for the case in which the end user decides to use num workers as "auto-detected"
// we will set the number of workers to the number of CPUs available based on GOMAXPROCS value
if workers == 0 {
return runtime.GOMAXPROCS(-1)
}
return workers
}
19 changes: 19 additions & 0 deletions pkg/utils/adjust_workers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils

import (
"runtime"
"testing"

"github.com/stretchr/testify/require"
)

func Test_AdjustWorkers(t *testing.T) {
workers0 := AdjustNumWorkers(0)
require.Equal(t, workers0, runtime.GOMAXPROCS(-1))
workers1 := AdjustNumWorkers(1)
require.Equal(t, workers1, 1)
workersnegative1 := AdjustNumWorkers(-1)
require.Equal(t, workersnegative1, -1)
workers100 := AdjustNumWorkers(100)
require.Equal(t, workers100, 100)
}
Loading