Skip to content

Commit

Permalink
Fix #1088: Refactor AWS Inspector to new model (#1241)
Browse files Browse the repository at this point in the history
Refactors AWS Inspector sync to new data model so that we minimize the
risk of write errors as seen in #1088.

Also fixes bug where we attempt to attach to inspector findings before
they exist because we ingest the package before the findings.

---------

Co-authored-by: Ramon Petgrave <[email protected]>
  • Loading branch information
achantavy and ramonpetgrave64 authored Sep 19, 2023
1 parent 5c1aa11 commit 779bde5
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 284 deletions.
4 changes: 0 additions & 4 deletions cartography/data/indexes.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ CREATE INDEX IF NOT EXISTS FOR (n:AWSDNSZone) ON (n.zoneid);
CREATE INDEX IF NOT EXISTS FOR (n:AWSDNSZone) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSGroup) ON (n.arn);
CREATE INDEX IF NOT EXISTS FOR (n:AWSGroup) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorFinding) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorFinding) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorPackage) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInspectorPackage) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInternetGateway) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:AWSInternetGateway) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:AWSIpv4CidrBlock) ON (n.id);
Expand Down
35 changes: 0 additions & 35 deletions cartography/data/jobs/cleanup/aws_import_inspector_cleanup.json

This file was deleted.

186 changes: 44 additions & 142 deletions cartography/intel/aws/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import boto3
import neo4j

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.models.aws.inspector.findings import AWSInspectorFindingSchema
from cartography.models.aws.inspector.packages import AWSInspectorPackageSchema
from cartography.util import aws_handle_regions
from cartography.util import aws_paginate
from cartography.util import batch
from cartography.util import run_cleanup_job
from cartography.util import timeit


Expand All @@ -20,17 +22,17 @@
@timeit
@aws_handle_regions
def get_inspector_findings(
session: boto3.session.Session,
region: str,
current_aws_account_id: str,
) -> List[Dict]:
'''
session: boto3.session.Session,
region: str,
current_aws_account_id: str,
) -> List[Dict[str, Any]]:
"""
We must list_findings by filtering the request, otherwise the request could tiemout.
First, we filter by account_id. And since there may be millions of CLOSED findings that may never go away,
we will only fetch those in ACTIVE or SUPPRESSED statuses.
list_members will get us all the accounts that
have delegated access to the account specified by current_aws_account_id.
'''
"""
client = session.client('inspector2', region_name=region)

members = aws_paginate(client, 'list_members', 'members')
Expand Down Expand Up @@ -61,7 +63,7 @@ def get_inspector_findings(
return findings


def transform_inspector_findings(results: List[Dict]) -> Tuple[List, List]:
def transform_inspector_findings(results: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
findings_list: List[Dict] = []
packages: Dict[str, Any] = {}

Expand Down Expand Up @@ -110,7 +112,7 @@ def transform_inspector_findings(results: List[Dict]) -> Tuple[List, List]:
return findings_list, packages_list


def transform_inspector_packages(packages: Dict[str, Any]) -> List[Dict]:
def transform_inspector_packages(packages: Dict[str, Any]) -> List[Dict[str, Any]]:
packages_list: List[Dict] = []
for package_id in packages.keys():
packages_list.append(packages[package_id])
Expand Down Expand Up @@ -146,153 +148,53 @@ def _process_packages(package_details: Dict[str, Any], aws_account_id: str, find
return packages


def _port_range_string(details: Dict) -> str:
def _port_range_string(details: Dict[str, Any]) -> str:
begin = details['openPortRange']['begin']
end = details['openPortRange']['end']
return f"{begin}-{end}"


def _load_findings_tx(
tx: neo4j.Transaction,
findings: List[Dict],
region: str,
aws_update_tag: int,
) -> None:
query = """
UNWIND $Findings as new_finding
MERGE (finding:AWSInspectorFinding{id: new_finding.id})
ON CREATE SET finding.firstseen = timestamp(),
finding.arn = new_finding.arn,
finding.region = $Region,
finding.awsaccount = new_finding.awsaccount
SET finding.lastupdated = $UpdateTag,
finding.name = new_finding.title,
finding.instanceid = new_finding.instanceid,
finding.ecrimageid = new_finding.ecrimageid,
finding.ecrrepositoryid = new_finding.ecrrepositoryid,
finding.severity = new_finding.severity,
finding.firstobservedat = new_finding.firstobservedat,
finding.updatedat = new_finding.updatedat,
finding.description = new_finding.description,
finding.type = new_finding.type,
finding.cvssscore = new_finding.cvssscore,
finding.protocol = new_finding.protocol,
finding.portrange = new_finding.portrange,
finding.portrangebegin = new_finding.portrangebegin,
finding.portrangeend = new_finding.portrangeend,
finding.vulnerabilityid = new_finding.vulnerabilityid,
finding.referenceurls = new_finding.referenceurls,
finding.relatedvulnerabilities = new_finding.relatedvulnerabilities,
finding.source = new_finding.source,
finding.sourceurl = new_finding.sourceurl,
finding.status = new_finding.status,
finding.vendorcreatedat = new_finding.vendorcreatedat,
finding.vendorseverity = new_finding.vendorseverity,
finding.vendorupdatedat = new_finding.vendorupdatedat,
finding.vulnerablepackageids = new_finding.vulnerablepackageids,
finding:Risk
WITH finding
MATCH (account:AWSAccount{id: finding.awsaccount})
MERGE (account)-[r:RESOURCE]->(finding)
ON CREATE SET r.firstseen = timestamp()
SET r.lastupdated = $UpdateTag
WITH finding
MATCH (instance:EC2Instance{id: finding.instanceid})
MERGE (instance)<-[r2:AFFECTS]-(finding)
ON CREATE SET r2.firstseen = timestamp()
SET r2.lastupdated = $UpdateTag
WITH finding
MATCH (repo:ECRRepository{id: finding.ecrrepositoryid})
MERGE (repo)<-[r3:AFFECTS]-(finding)
ON CREATE SET r3.firstseen = timestamp()
SET r3.lastupdated = $UpdateTag
WITH finding
MATCH (image:ECRImage{id: finding.ecrimageid})
MERGE (image)<-[r4:AFFECTS]-(finding)
ON CREATE SET r4.firstseen = timestamp()
SET r4.lastupdated = $UpdateTag
"""

tx.run(
query,
Findings=findings,
UpdateTag=aws_update_tag,
Region=region,
)


@timeit
def load_inspector_findings(
neo4j_session: neo4j.Session, findings: List[Dict], region: str,
aws_update_tag: int,
neo4j_session: neo4j.Session,
findings: List[Dict[str, Any]],
region: str,
aws_update_tag: int,
current_aws_account_id: str,
) -> None:
for i, findings_batch in enumerate(batch(findings), start=1):
logger.info(f'Loading batch number {i}')
neo4j_session.write_transaction(
_load_findings_tx,
findings=findings_batch,
region=region,
aws_update_tag=aws_update_tag,
)


def _load_packages_tx(
tx: neo4j.Transaction,
packages: List[Dict],
region: str,
aws_update_tag: int,
) -> None:
query = """
UNWIND $Packages as new_package
MERGE (package:AWSInspectorPackage{id: new_package.id})
ON CREATE SET package.firstseen = timestamp(),
package.region = $Region,
package.awsaccount = new_package.awsaccount,
package.findingarn = new_package.findingarn
SET package.lastupdated = $UpdateTag,
package.name = new_package.name,
package.arch = new_package.arch,
package.version = new_package.version,
package.release = new_package.release,
package.epoch = new_package.epoch,
package.manager = new_package.packageManager,
package.filepath = new_package.filePath,
package.fixedinversion = new_package.fixedInVersion,
package.sourcelayerhash = new_package.sourceLayerHash
WITH package
MATCH (finding:AWSInspectorFinding{id: package.findingarn})
MERGE (finding)-[r:HAS]->(package)
WITH package
MATCH (account:AWSAccount{id: package.awsaccount})
MERGE (account)-[r:RESOURCE]->(package)
ON CREATE SET r.firstseen = timestamp()
SET r.lastupdated = $UpdateTag
"""

tx.run(
query,
Packages=packages,
UpdateTag=aws_update_tag,
load(
neo4j_session,
AWSInspectorFindingSchema(),
findings,
Region=region,
AWS_ID=current_aws_account_id,
lastupdated=aws_update_tag,
)


@timeit
def load_inspector_packages(
neo4j_session: neo4j.Session, packages: List[Dict], region: str,
aws_update_tag: int,
neo4j_session: neo4j.Session,
packages: List[Dict[str, Any]],
region: str,
aws_update_tag: int,
current_aws_account_id: str,
) -> None:
neo4j_session.write_transaction(
_load_packages_tx,
packages=packages,
region=region,
aws_update_tag=aws_update_tag,
load(
neo4j_session,
AWSInspectorPackageSchema(),
packages,
Region=region,
AWS_ID=current_aws_account_id,
lastupdated=aws_update_tag,
)


@timeit
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
run_cleanup_job('aws_import_inspector_cleanup.json', neo4j_session, common_job_parameters)
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any]) -> None:
logger.info("Running AWS Inspector cleanup")
GraphJob.from_node_schema(AWSInspectorFindingSchema(), common_job_parameters).run(neo4j_session)
GraphJob.from_node_schema(AWSInspectorPackageSchema(), common_job_parameters).run(neo4j_session)


@timeit
Expand All @@ -302,14 +204,14 @@ def sync(
regions: List[str],
current_aws_account_id: str,
update_tag: int,
common_job_parameters: Dict,
common_job_parameters: Dict[str, Any],
) -> None:
for region in regions:
logger.info(f"Syncing AWS Inspector findings for account {current_aws_account_id} and region {region}")
findings = get_inspector_findings(boto3_session, region, current_aws_account_id)
finding_data, package_data = transform_inspector_findings(findings)
logger.info(f"Loading {len(package_data)} packages")
load_inspector_packages(neo4j_session, package_data, region, update_tag)
logger.info(f"Loading {len(finding_data)} findings")
load_inspector_findings(neo4j_session, finding_data, region, update_tag)
load_inspector_findings(neo4j_session, finding_data, region, update_tag, current_aws_account_id)
logger.info(f"Loading {len(package_data)} packages")
load_inspector_packages(neo4j_session, package_data, region, update_tag, current_aws_account_id)
cleanup(neo4j_session, common_job_parameters)
Empty file.
Loading

0 comments on commit 779bde5

Please sign in to comment.