Skip to content

Commit

Permalink
To support Step Functions customized log groups
Browse files Browse the repository at this point in the history
  • Loading branch information
nine5two7 committed Nov 5, 2024
1 parent 4fda68f commit e64a486
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 18 deletions.
5 changes: 5 additions & 0 deletions aws/logs_monitoring/customized_log_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def is_lambda_customized_log_group(logstream_name):
)


# For both default and customzied Step Functions log groups, the log_stream starts with "states/"
def is_step_functions_log_group(logstream_name):
return logstream_name.startswith("states/")


def get_lambda_function_name_from_logstream_name(logstream_name):
try:
# Not match the pattern for customized Lambda log group
Expand Down
1 change: 0 additions & 1 deletion aws/logs_monitoring/steps/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def __init__(self, string, event_source):
RDS = ("/aws/rds", AwsEventSource.RDS)
# e.g. sns/us-east-1/123456779121/SnsTopicX
SNS = ("sns/", AwsEventSource.SNS)
STEPFUNCTION = ("/aws/vendedlogs/states", AwsEventSource.STEPFUNCTION)
TRANSITGATEWAY = ("tgw-attach", AwsEventSource.TRANSITGATEWAY)

def __str__(self):
Expand Down
14 changes: 8 additions & 6 deletions aws/logs_monitoring/steps/handlers/awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from customized_log_group import (
is_lambda_customized_log_group,
is_step_functions_log_group,
get_lambda_function_name_from_logstream_name,
)
from steps.handlers.aws_attributes import AwsAttributes
Expand Down Expand Up @@ -102,11 +103,15 @@ def set_source(self, event):
source = str(AwsEventSource.BEDROCK)
self.metadata[DD_SOURCE] = parse_event_source(event, source)

# Special handling for customized log group of Lambda functions
# Multiple Lambda functions can share one single customized log group
# Need to parse logStream name to determine whether it is a Lambda function
# Special handling for customized log group of Lambda Functions and Step Functions
# Prefer to place the handling of customized log group at the bottom so that it can correct the source in some edge cases
# Multiple functions can share one single customized log group
# Need to parse logStream name to determine
if is_lambda_customized_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA)
# Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'."
if is_step_functions_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION)

def add_cloudwatch_tags_from_cache(self):
log_group_arn = self.aws_attributes.get_log_group_arn()
Expand Down Expand Up @@ -159,9 +164,6 @@ def handle_rds_source(self):
)

def handle_step_function_source(self):
if not self.aws_attributes.get_log_stream().startswith("states/"):
return

state_machine_arn = self.get_state_machine_arn()
if not state_machine_arn:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"aws": {
"awslogs": {
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"ddsource": "stepfunction",
"ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
"service": "stepfunction"
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
}
},
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction1:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction3:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"ddsource": "stepfunction",
"ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction3",
"service": "stepfunction"
}
70 changes: 69 additions & 1 deletion aws/logs_monitoring/tests/test_awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from approvaltests.approvals import verify_as_json
from approvaltests.namer import NamerFactory

from aws.logs_monitoring.steps.enums import AwsEventSource

sys.modules["trace_forwarder.connection"] = MagicMock()
sys.modules["datadog_lambda.wrapper"] = MagicMock()
sys.modules["datadog_lambda.metric"] = MagicMock()
Expand All @@ -22,6 +24,7 @@
},
)
env_patch.start()
from aws.logs_monitoring.settings import DD_HOST, DD_SOURCE
from steps.handlers.awslogs_handler import AwsLogsHandler
from steps.handlers.aws_attributes import AwsAttributes
from caching.cache_layer import CacheLayer
Expand Down Expand Up @@ -93,7 +96,7 @@ def test_awslogs_handler_step_functions_tags_added_properly(
{
"id": "37199773595581154154810589279545129148442535997644275712",
"timestamp": 1668095539607,
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction1:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction3:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
}
],
}
Expand All @@ -115,8 +118,73 @@ def test_awslogs_handler_step_functions_tags_added_properly(
cache_layer._cloudwatch_log_group_cache.get = MagicMock()

awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
# awslogs_handler.handle(event)
verify_as_json(list(awslogs_handler.handle(event)))
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
self.assertEqual(
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
)
self.assertEqual(
awslogs_handler.metadata[DD_HOST],
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction3",
)

@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
@patch("caching.cloudwatch_log_group_cache.send_forwarder_internal_metrics")
@patch.dict("os.environ", {"DD_STEP_FUNCTIONS_TRACE_ENABLED": "true"})
def test_awslogs_handler_step_functions_customized_log_group(
self,
mock_forward_metrics,
mock_cache_init,
):
# SF customized log group
eventFromCustomizedLogGroup = {
"awslogs": {
"data": base64.b64encode(
gzip.compress(
bytes(
json.dumps(
{
"messageType": "DATA_MESSAGE",
"owner": "425362996713",
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"subscriptionFilters": ["testFilter"],
"logEvents": [
{
"id": "37199773595581154154810589279545129148442535997644275712",
"timestamp": 1668095539607,
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
}
],
}
),
"utf-8",
)
)
)
}
}
context = None
metadata = {"ddsource": "postgresql", "ddtags": "env:dev"}
mock_forward_metrics.side_effect = MagicMock()
mock_cache_init.return_value = None
cache_layer = CacheLayer("")
cache_layer._step_functions_cache.get = MagicMock(
return_value=["test_tag_key:test_tag_value"]
)
cache_layer._cloudwatch_log_group_cache.get = MagicMock()

awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
verify_as_json(list(awslogs_handler.handle(eventFromCustomizedLogGroup)))
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
self.assertEqual(
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
)
self.assertEqual(
awslogs_handler.metadata[DD_HOST],
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
)

def test_process_lambda_logs(self):
# Non Lambda log
Expand Down
14 changes: 14 additions & 0 deletions aws/logs_monitoring/tests/test_customized_log_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from customized_log_group import (
is_lambda_customized_log_group,
get_lambda_function_name_from_logstream_name,
is_step_functions_log_group,
)


Expand Down Expand Up @@ -58,3 +59,16 @@ def get_lambda_function_name_from_logstream_name(self):
get_lambda_function_name_from_logstream_name(stepfunction_log_stream_name),
None,
)

def test_is_step_functions_log_group(self):
# Lambda logstream is false
lambda_log_stream_name = "2023/11/04/[$LATEST]4426346c2cdf4c54a74d3bd2b929fc44"
self.assertEqual(is_step_functions_log_group(lambda_log_stream_name), False)

# SF logstream is true
step_functions_log_stream_name = (
"states/selfmonit-statemachine/2024-11-04-15-30/00000000"
)
self.assertEqual(
is_step_functions_log_group(step_functions_log_stream_name), True
)
8 changes: 0 additions & 8 deletions aws/logs_monitoring/tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,6 @@ def test_carbon_black_event(self):
str(AwsEventSource.CARBONBLACK),
)

def test_step_function_event(self):
self.assertEqual(
parse_event_source(
{"awslogs": "logs"}, "/aws/vendedlogs/states/MyStateMachine-Logs"
),
str(AwsEventSource.STEPFUNCTION),
)

def test_cloudwatch_source_if_none_found(self):
self.assertEqual(
parse_event_source({"awslogs": "logs"}, ""), str(AwsEventSource.CLOUDWATCH)
Expand Down

0 comments on commit e64a486

Please sign in to comment.