Skip to content

Commit

Permalink
Merge pull request #66 from fivexl/cw_metrics
Browse files Browse the repository at this point in the history
Export CloudWatch metrics
  • Loading branch information
EreminAnton authored Nov 1, 2024
2 parents 170aa25 + b84244b commit a512d3a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 64 deletions.
10 changes: 10 additions & 0 deletions main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ data "aws_iam_policy_document" "s3" {
"arn:${data.aws_partition.current.partition}:dynamodb:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/${var.dynamodb_table_name}"
]
}
statement {
sid = "AllowLambdaToPushCloudWatchMetrics"

actions = [
"cloudwatch:PutMetricData",
]
resources = [
"*"
]
}
dynamic "statement" {
for_each = length(aws_sns_topic.events_to_sns) > 0 ? [1] : []
content {
Expand Down
153 changes: 89 additions & 64 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from typing import Any, Dict, List, NamedTuple

import boto3
from slack_sdk.web.slack_response import SlackResponse

from config import Config, SlackAppConfig, SlackWebhookConfig, get_logger, get_slack_config
from dynamodb import get_thread_ts_from_dynamodb, put_event_to_dynamodb
from slack_helpers import (
Expand All @@ -28,7 +30,6 @@
message_for_slack_error_notification,
post_message,
)
from slack_sdk.web.slack_response import SlackResponse
from sns import send_message_to_sns

cfg = Config()
Expand All @@ -38,6 +39,7 @@
s3_client = boto3.client("s3")
dynamodb_client = boto3.client("dynamodb")
sns_client = boto3.client("sns")
cloudwatch_client = boto3.client("cloudwatch")


def lambda_handler(s3_notification_event: Dict[str, List[Any]], _) -> int: # noqa: ANN001
Expand All @@ -50,53 +52,53 @@ def lambda_handler(s3_notification_event: Dict[str, List[Any]], _) -> int: # no

if event_name.startswith("ObjectRemoved"):
handle_removed_object_record(
record = record,
record=record,
)
continue

elif event_name.startswith("ObjectCreated"):
handle_created_object_record(
record = record,
cfg = cfg,
record=record,
cfg=cfg,
)
continue

except Exception as e:
post_message(
message = message_for_slack_error_notification(e, s3_notification_event),
account_id = None,
slack_config = slack_config,
message=message_for_slack_error_notification(e, s3_notification_event),
account_id=None,
slack_config=slack_config,
)
logger.exception({"Failed to process event": e})
return 200


def handle_removed_object_record(
record: dict,
record: dict,
) -> None:
logger.info({"s3:ObjectRemoved event": record})
account_id = record["userIdentity"]["accountId"] if "accountId" in record["userIdentity"] else ""
message = event_to_slack_message(
event = record,
source_file = record["s3"]["object"]["key"],
account_id_from_event = account_id,
event=record,
source_file=record["s3"]["object"]["key"],
account_id_from_event=account_id,
)
post_message(message = message, account_id = account_id, slack_config = slack_config)
post_message(message=message, account_id=account_id, slack_config=slack_config)


def handle_created_object_record(
record: dict,
cfg: Config,
record: dict,
cfg: Config,
) -> None:
logger.debug({"s3_notification_event": record})
cloudtrail_log_record = get_cloudtrail_log_records(record)
if cloudtrail_log_record:
for cloudtrail_log_event in cloudtrail_log_record["events"]:
handle_event(
event = cloudtrail_log_event,
source_file_object_key = cloudtrail_log_record["key"],
rules = cfg.rules,
ignore_rules = cfg.ignore_rules
event=cloudtrail_log_event,
source_file_object_key=cloudtrail_log_record["key"],
rules=cfg.rules,
ignore_rules=cfg.ignore_rules,
)


Expand All @@ -107,7 +109,7 @@ def get_cloudtrail_log_records(record: Dict) -> Dict | None:
if "s3" not in record:
raise AssertionError(f"recieved record does not contain s3 section: {record}")
bucket = record["s3"]["bucket"]["name"]
key = urllib.parse.unquote_plus(record["s3"]["object"]["key"], encoding="utf-8") # type: ignore # noqa: PGH003, E501
key = urllib.parse.unquote_plus(record["s3"]["object"]["key"], encoding="utf-8") # type: ignore # noqa: PGH003, E501
# Do not process digest files
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
Expand Down Expand Up @@ -144,26 +146,51 @@ def should_message_be_processed(
errors = []
for ignore_rule in ignore_rules:
try:
if eval(ignore_rule, {}, {"event": flat_event}) is True: # noqa: PGH001
logger.info({"Event matched ignore rule and will not be processed": {"ignore_rule": ignore_rule, "flat_event": flat_event}}) # noqa: E501
if eval(ignore_rule, {}, {"event": flat_event}) is True: # noqa: PGH001
logger.info(
{"Event matched ignore rule and will not be processed": {"ignore_rule": ignore_rule, "flat_event": flat_event}}
) # noqa: E501
return ProcessingResult(False, errors)
except Exception as e:
logger.exception({"Event parsing failed": {"error": e, "ignore_rule": ignore_rule, "flat_event": flat_event}}) # noqa: E501
logger.exception({"Event parsing failed": {"error": e, "ignore_rule": ignore_rule, "flat_event": flat_event}}) # noqa: E501
errors.append({"error": e, "rule": ignore_rule})

for rule in rules:
try:
if eval(rule, {}, {"event": flat_event}) is True: # noqa: PGH001
logger.info({"Event matched rule and will be processed": {"rule": rule, "flat_event": flat_event}}) # noqa: E501
if eval(rule, {}, {"event": flat_event}) is True: # noqa: PGH001
logger.info({"Event matched rule and will be processed": {"rule": rule, "flat_event": flat_event}}) # noqa: E501
return ProcessingResult(True, errors)
except Exception as e:
logger.exception({"Event parsing failed": {"error": e, "rule": rule, "flat_event": flat_event}})
errors.append({"error": e, "rule": rule})

logger.info({"Event did not match any rules and will not be processed": {"event": event_name, "user": user}}) # noqa: E501
logger.info({"Event did not match any rules and will not be processed": {"event": event_name, "user": user}}) # noqa: E501
return ProcessingResult(False, errors)


def push_cloudwatch_metrics(deny_type: str, event_name: str) -> None:
"""Pushes CloudWatch metrics: one for all AccessDenied events, and one grouped by event name."""
metrics = [
{
"MetricName": "TotalAccessDeniedEvents",
"Dimensions": [{"Name": "AccessDenied", "Value": "AccessDeniedTotal"}],
"Value": 1,
"Unit": "Count",
},
{
"MetricName": "AccessDeniedByEvent",
"Dimensions": [{"Name": "DenyType", "Value": deny_type}, {"Name": "EventName", "Value": event_name}],
"Value": 1,
"Unit": "Count",
},
]
try:
cloudwatch_client.put_metric_data(Namespace="CloudTrail/AccessDeniedEvents", MetricData=metrics)
logger.info("Pushed CloudWatch metrics", extra={"deny_type": deny_type, "event_name": event_name})
except Exception as e:
logger.exception("Failed to push CloudWatch metrics", extra={"error": e, "deny_type": deny_type, "event_name": event_name})


def handle_event(
event: Dict[str, Any],
source_file_object_key: str,
Expand All @@ -172,85 +199,82 @@ def handle_event(
) -> SlackResponse | None:

result = should_message_be_processed(event, rules, ignore_rules)
account_id = event["userIdentity"]["accountId"] if "accountId" in event["userIdentity"] else""
account_id = event["userIdentity"]["accountId"] if "accountId" in event["userIdentity"] else ""
if cfg.rule_evaluation_errors_to_slack:
for error in result.errors:
post_message(
message = message_for_rule_evaluation_error_notification(
error = error["error"],
object_key = source_file_object_key,
rule = error["rule"],
message=message_for_rule_evaluation_error_notification(
error=error["error"],
object_key=source_file_object_key,
rule=error["rule"],
),
account_id = account_id,
slack_config = slack_config,
account_id=account_id,
slack_config=slack_config,
)

if not result.should_be_processed:
return

# log full event if it is AccessDenied
if ("errorCode" in event and "AccessDenied" in event["errorCode"]):
if "errorCode" in event and "AccessDenied" in event["errorCode"]:
event_as_string = json.dumps(event, indent=4)
logger.info({"errorCode": "AccessDenied", "log full event": event_as_string})
# Push CloudWatch metrics
push_cloudwatch_metrics(deny_type=event["errorCode"], event_name=event.get("eventName", "UnknownEvent"))

if not result.should_be_processed:
return

message = event_to_slack_message(event, source_file_object_key, account_id)

send_message_to_sns(
event = event,
source_file = source_file_object_key,
account_id = account_id,
cfg = cfg,
sns_client = sns_client,
event=event,
source_file=source_file_object_key,
account_id=account_id,
cfg=cfg,
sns_client=sns_client,
)

if isinstance(slack_config, SlackWebhookConfig):
return post_message(
message = message,
account_id = account_id,
slack_config = slack_config,
message=message,
account_id=account_id,
slack_config=slack_config,
)

if isinstance(slack_config, SlackAppConfig):
thread_ts = get_thread_ts_from_dynamodb(
cfg = cfg,
event = event,
cfg=cfg,
event=event,
dynamodb_client=dynamodb_client,
)
if thread_ts is not None:
# If we have a thread_ts, we can post the message to the thread
logger.info({"Posting message to thread": {"thread_ts": thread_ts}})
return post_message(
message = message,
account_id = account_id,
thread_ts = thread_ts,
slack_config = slack_config,
message=message,
account_id=account_id,
thread_ts=thread_ts,
slack_config=slack_config,
)
else:
# If we don't have a thread_ts, we need to post the message to the channel
logger.info({"Posting message to channel"})
slack_response = post_message(
message = message,
account_id = account_id,
slack_config = slack_config
)
slack_response = post_message(message=message, account_id=account_id, slack_config=slack_config)
if slack_response is not None:
logger.info({"Saving thread_ts to DynamoDB"})
thread_ts = slack_response.get("ts")
if thread_ts is not None:
put_event_to_dynamodb(
cfg = cfg,
event = event,
thread_ts = thread_ts,
cfg=cfg,
event=event,
thread_ts=thread_ts,
dynamodb_client=dynamodb_client,
)



# Flatten json
def flatten_json(y: dict) -> dict:
out = {}

def flatten(x, name=""): # noqa: ANN001, ANN202
def flatten(x, name=""): # noqa: ANN001, ANN202
if type(x) is dict:
for a in x:
flatten(x[a], name + a + ".")
Expand All @@ -265,12 +289,14 @@ def flatten(x, name=""): # noqa: ANN001, ANN202
flatten(y)
return out


# For local testing
if __name__ == "__main__":
#Before running this script, set environment variables below
#On top of this file add region to boto3 clients
#and remove cfg = Config() slack_config = get_slack_config() from top of this file.
# Before running this script, set environment variables below
# On top of this file add region to boto3 clients
# and remove cfg = Config() slack_config = get_slack_config() from top of this file.
import os

os.environ["SLACK_BOT_TOKEN"] = ""
os.environ["DEFAULT_SLACK_CHANNEL_ID"] = ""
os.environ["SLACK_APP_CONFIGURATION"] = ""
Expand All @@ -291,7 +317,6 @@ def flatten(x, name=""): # noqa: ANN001, ANN202
os.environ["DEFAULT_SNS_TOPIC_ARN"] = ""
os.environ["SNS_CONFIGURATION"] = '[{""}]'


cfg = Config()
slack_config = get_slack_config()

Expand Down

0 comments on commit a512d3a

Please sign in to comment.