Skip to content

Commit

Permalink
finalizing auto responsive control for behave testing
Browse files Browse the repository at this point in the history
  • Loading branch information
damienjburks committed Apr 16, 2024
1 parent 6c78958 commit 01c34d3
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/control-catalog/behave_tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ click==8.1.7
google-api-core==2.18.0
google-auth==2.29.0
google-cloud-core==2.4.1
google-cloud-kms==2.21.3
google-cloud-storage==2.16.0
google-crc32c==1.5.0
google-resumable-media==2.7.0
googleapis-common-protos==1.63.0
grpc-google-iam-v1==0.13.0
grpcio==1.62.1
grpcio-status==1.62.1
idna==3.6
jmespath==1.0.1
mypy-extensions==1.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ Scenario: Test Control CCC.OS.C2
AND you own the object storage bucket in GCP
WHEN a data plane request with an untrusted KMS key is made to the AWS object storage bucket
AND a data plane request with an untrusted KMS key is made to the GCP object storage bucket
THEN the request should be denied
THEN the AWS request should be denied
AND the GCP storage object should have been deleted
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import logging
import time
import boto3

from google.cloud import storage
from google.cloud import kms_v1
from botocore.exceptions import ClientError
from behave import given, then, when

logging.basicConfig(level=logging.INFO)

BUCKET_OBJ_NAME = "test.txt"
STORAGE_BUCKET_NAME = "malicious-sb-ccc-os-c2"
UNTRUSTED_KEY_NAME = "malicious-sb-untrusted-ccc-os-c2"
UNTRUSTED_KEY_ALIAS = f"alias/{UNTRUSTED_KEY_NAME}"
Expand Down Expand Up @@ -51,19 +54,44 @@ def aws_upload_obj_with_untrusted_key(context):
"a data plane request with an untrusted KMS key is made to the GCP object storage bucket"
)
def gcp_upload_obj_with_untrusted_key(context):
# This control needs to be reviewed in more detail - we
# This control needs to be reviewed in more detail - we
# can upload to the bucket with an untrusted key.
key_name = f"projects/common-cloud-controls-testing/locations/us-central1/keyRings/{STORAGE_BUCKET_NAME}-keyring/cryptoKeys/{UNTRUSTED_KEY_NAME}"
client = kms_v1.KeyManagementServiceClient()
parent = "projects/common-cloud-controls-testing/locations/us-central1"
key_rings = client.list_key_rings(request={"parent": parent})

kms_key_id = None
for key_ring in key_rings:
kms_keys = kms_v1.ListCryptoKeysRequest(mapping={"parent": key_ring.name})
for kms_key in client.list_crypto_keys(kms_keys):
if (
UNTRUSTED_KEY_NAME in kms_key.name
and kms_key.primary.destroy_time is None
):
kms_key_id = kms_key.name
break

if kms_key_id is not None:
break

bucket = storage.Bucket(context.storage_client, STORAGE_BUCKET_NAME)
bucket.blob("test.txt", kms_key_name=key_name).upload_from_string(
bucket.blob(BUCKET_OBJ_NAME, kms_key_name=kms_key_id).upload_from_string(
"Hello, World"
)
time.sleep(10) # Sleep for 10 seconds


@then("the request should be denied")
@then("the AWS request should be denied")
def validate_request_denied(context):
print(context.s3_publish_error)
if "AccessDenied" in context.s3_publish_error:
assert True
else:
assert False


@then("the GCP storage object should have been deleted")
def validate_request_denied(context):
bucket = storage.Bucket(context.storage_client, STORAGE_BUCKET_NAME)
blob = bucket.blob(BUCKET_OBJ_NAME)
assert not blob.exists()
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,42 @@ resource "google_storage_bucket" "malicious_storage_bucket" {
}

uniform_bucket_level_access = true
}

data "google_iam_policy" "policy" {
binding {
role = "roles/storage.objectCreator"
members = ["user:*"]
condition {
title = "Deny unencrypted uploads"
description = "Only allow objects to be uploaded with a specific KMS key"
expression = "resource.name.startsWith(\"projects/common-cloud-controls-testing/buckets/${google_storage_bucket.malicious_storage_bucket.name}/objects\") && !request.resource.labels.kms_key_name.startsWith(\"projects/common-cloud-controls-testing/locations/us-central1/keyRings/${google_kms_key_ring.keyring.id}/cryptoKeys/${google_kms_crypto_key.trusted_cmek.name}\")"
}
}
depends_on = [ google_kms_crypto_key_iam_binding.trusted_kms_key_binding ]
}

resource "google_storage_bucket_iam_policy" "name" {
data "archive_file" "my_function_src" {
type = "zip"
source_dir = "${path.module}/src"
output_file_mode = "0666"
output_path = "${path.module}/example_src.zip"
}
resource "google_storage_bucket_object" "src" {
name = "example_src_${data.archive_file.my_function_src.output_md5}.zip"
bucket = google_storage_bucket.malicious_storage_bucket.name
policy_data = data.google_iam_policy.policy.policy_data
source = data.archive_file.my_function_src.output_path
}
resource "google_cloudfunctions_function" "untrusted_enc_obj_deleter" {
name = "${var.bucket_name}-ccc-os-c2-autorem-control"
runtime = "python39"
entry_point = "delete_object"
source_archive_bucket = google_storage_bucket_object.src.bucket
source_archive_object = google_storage_bucket_object.src.name

event_trigger {
event_type = "google.storage.object.finalize"
resource = google_storage_bucket.malicious_storage_bucket.name
}

https_trigger_security_level = "SECURE_ALWAYS"
}

resource "random_string" "random" {
length = 5
special = false
}
resource "google_kms_key_ring" "keyring" {
name = "${var.bucket_name}-ccc-os-c2-keyring"
name = "${var.bucket_name}-ccc-os-c2-kr-${random_string.random.id}"
location = "us-central1"
}

Expand All @@ -45,6 +60,23 @@ resource "google_kms_crypto_key" "trusted_cmek" {
}
}

resource "google_kms_crypto_key_iam_binding" "trusted_kms_key_binding" {
crypto_key_id = google_kms_crypto_key.trusted_cmek.id
role = "roles/cloudkms.cryptoKeyEncrypterDecrypter"
members = [
"serviceAccount:[email protected]" # Cloud Storage service account
]
}

# Malicious Threat Actor adds a key binding for the untrusted CMEK to the Default Service Account
resource "google_kms_crypto_key_iam_binding" "untrusted_kms_key_binding" {
crypto_key_id = google_kms_crypto_key.untrusted_cmek.id
role = "roles/cloudkms.cryptoKeyEncrypterDecrypter"
members = [
"serviceAccount:[email protected]" # Cloud Storage service account
]
}

resource "google_kms_crypto_key" "untrusted_cmek" {
name = "${var.bucket_name}-untrusted-ccc-os-c2"
key_ring = google_kms_key_ring.keyring.id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import logging
import functions_framework
from google.cloud import storage

logging.basicConfig(level=logging.INFO)

@functions_framework.cloud_event
def delete_object(event):
logging.info("Function triggered: %s", event.data)

bucket_name = event.data['bucket']
object_name = event.data['name']
kms_key_name = event.data['kmsKeyName']

# Initialize the client
client = storage.Client()

# Get the bucket
bucket = client.get_bucket(bucket_name)

# Get the object
blob = bucket.blob(object_name)

# Check if the object is not encrypted with the default CMEK
# or if the object is not encrypted with a CMEK
if bucket.default_kms_key_name not in kms_key_name:
blob.delete()
logging.info("Object %s deleted successfully.", object_name)
return f"Object {object_name} deleted successfully.", 200
else:
logging.info("Object %s is already encrypted with the default CMEK.", object_name)
return f"Object {object_name} is already encrypted with the default CMEK.", 200

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-cloud-storage

0 comments on commit 01c34d3

Please sign in to comment.