Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

To support Step Functions customized log groups #866

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aws/logs_monitoring/customized_log_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def is_lambda_customized_log_group(logstream_name):
)


# For both default and customzied Step Functions log groups, the log_stream starts with "states/"
def is_step_functions_log_group(logstream_name):
return logstream_name.startswith("states/")


def get_lambda_function_name_from_logstream_name(logstream_name):
try:
# Not match the pattern for customized Lambda log group
Expand Down
1 change: 0 additions & 1 deletion aws/logs_monitoring/steps/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def __init__(self, string, event_source):
RDS = ("/aws/rds", AwsEventSource.RDS)
# e.g. sns/us-east-1/123456779121/SnsTopicX
SNS = ("sns/", AwsEventSource.SNS)
STEPFUNCTION = ("/aws/vendedlogs/states", AwsEventSource.STEPFUNCTION)
TRANSITGATEWAY = ("tgw-attach", AwsEventSource.TRANSITGATEWAY)

def __str__(self):
Expand Down
13 changes: 7 additions & 6 deletions aws/logs_monitoring/steps/handlers/awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from customized_log_group import (
is_lambda_customized_log_group,
is_step_functions_log_group,
get_lambda_function_name_from_logstream_name,
)
from steps.handlers.aws_attributes import AwsAttributes
Expand Down Expand Up @@ -102,11 +103,14 @@ def set_source(self, event):
source = str(AwsEventSource.BEDROCK)
self.metadata[DD_SOURCE] = parse_event_source(event, source)

# Special handling for customized log group of Lambda functions
# Multiple Lambda functions can share one single customized log group
# Need to parse logStream name to determine whether it is a Lambda function
# Special handling for customized log group of Lambda Functions and Step Functions
# Multiple functions can share one single customized log group. Need to parse logStream name to determine
# Need to place the handling of customized log group at the bottom so that it can correct the source for some edge cases
if is_lambda_customized_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA)
# Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'."
if is_step_functions_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION)

def add_cloudwatch_tags_from_cache(self):
log_group_arn = self.aws_attributes.get_log_group_arn()
Expand Down Expand Up @@ -159,9 +163,6 @@ def handle_rds_source(self):
)

def handle_step_function_source(self):
if not self.aws_attributes.get_log_stream().startswith("states/"):
return

state_machine_arn = self.get_state_machine_arn()
if not state_machine_arn:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"aws": {
"awslogs": {
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"ddsource": "stepfunction",
"ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
"service": "stepfunction"
}
69 changes: 69 additions & 0 deletions aws/logs_monitoring/tests/test_awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from approvaltests.approvals import verify_as_json
from approvaltests.namer import NamerFactory

from aws.logs_monitoring.steps.enums import AwsEventSource

sys.modules["trace_forwarder.connection"] = MagicMock()
sys.modules["datadog_lambda.wrapper"] = MagicMock()
sys.modules["datadog_lambda.metric"] = MagicMock()
Expand All @@ -22,6 +24,7 @@
},
)
env_patch.start()
from aws.logs_monitoring.settings import DD_HOST, DD_SOURCE
from steps.handlers.awslogs_handler import AwsLogsHandler
from steps.handlers.aws_attributes import AwsAttributes
from caching.cache_layer import CacheLayer
Expand Down Expand Up @@ -117,6 +120,72 @@ def test_awslogs_handler_step_functions_tags_added_properly(
awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
verify_as_json(list(awslogs_handler.handle(event)))
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
# verify that the handling can properly handle SF logs with the default log group naming
self.assertEqual(
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
)
self.assertEqual(
awslogs_handler.metadata[DD_HOST],
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1",
)

@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
@patch("caching.cloudwatch_log_group_cache.send_forwarder_internal_metrics")
@patch.dict("os.environ", {"DD_STEP_FUNCTIONS_TRACE_ENABLED": "true"})
def test_awslogs_handler_step_functions_customized_log_group(
self,
mock_forward_metrics,
mock_cache_init,
):
# SF customized log group
eventFromCustomizedLogGroup = {
"awslogs": {
"data": base64.b64encode(
gzip.compress(
bytes(
json.dumps(
{
"messageType": "DATA_MESSAGE",
"owner": "425362996713",
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"subscriptionFilters": ["testFilter"],
"logEvents": [
{
"id": "37199773595581154154810589279545129148442535997644275712",
"timestamp": 1668095539607,
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
}
],
}
),
"utf-8",
)
)
)
}
}
context = None
metadata = {"ddtags": "env:dev"}
mock_forward_metrics.side_effect = MagicMock()
mock_cache_init.return_value = None
cache_layer = CacheLayer("")
cache_layer._step_functions_cache.get = MagicMock(
return_value=["test_tag_key:test_tag_value"]
)
cache_layer._cloudwatch_log_group_cache.get = MagicMock()

awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
# for some reasons, the below two are needed to update the context of the handler
verify_as_json(list(awslogs_handler.handle(eventFromCustomizedLogGroup)))
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
self.assertEqual(
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
)
self.assertEqual(
awslogs_handler.metadata[DD_HOST],
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
)

def test_process_lambda_logs(self):
# Non Lambda log
Expand Down
14 changes: 14 additions & 0 deletions aws/logs_monitoring/tests/test_customized_log_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from customized_log_group import (
is_lambda_customized_log_group,
get_lambda_function_name_from_logstream_name,
is_step_functions_log_group,
)


Expand Down Expand Up @@ -58,3 +59,16 @@ def get_lambda_function_name_from_logstream_name(self):
get_lambda_function_name_from_logstream_name(stepfunction_log_stream_name),
None,
)

def test_is_step_functions_log_group(self):
# Lambda logstream is false
lambda_log_stream_name = "2023/11/04/[$LATEST]4426346c2cdf4c54a74d3bd2b929fc44"
self.assertEqual(is_step_functions_log_group(lambda_log_stream_name), False)

# SF logstream is true
step_functions_log_stream_name = (
"states/selfmonit-statemachine/2024-11-04-15-30/00000000"
)
self.assertEqual(
is_step_functions_log_group(step_functions_log_stream_name), True
)
8 changes: 0 additions & 8 deletions aws/logs_monitoring/tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,6 @@ def test_carbon_black_event(self):
str(AwsEventSource.CARBONBLACK),
)

def test_step_function_event(self):
self.assertEqual(
parse_event_source(
{"awslogs": "logs"}, "/aws/vendedlogs/states/MyStateMachine-Logs"
),
str(AwsEventSource.STEPFUNCTION),
)

def test_cloudwatch_source_if_none_found(self):
self.assertEqual(
parse_event_source({"awslogs": "logs"}, ""), str(AwsEventSource.CLOUDWATCH)
Expand Down
Loading