Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add init support for SF log stream #864

Closed
5 changes: 5 additions & 0 deletions aws/logs_monitoring/customized_log_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ def is_lambda_customized_log_group(logstream_name):
)


# For both default and customzied Step Functions log groups, the log_stream starts with "states/"
def is_step_functions_log_group(logstream_name):
return logstream_name.startswith("states/")


def get_lambda_function_name_from_logstream_name(logstream_name):
try:
# Not match the pattern for customized Lambda log group
Expand Down
1 change: 0 additions & 1 deletion aws/logs_monitoring/steps/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def __init__(self, string, event_source):
RDS = ("/aws/rds", AwsEventSource.RDS)
# e.g. sns/us-east-1/123456779121/SnsTopicX
SNS = ("sns/", AwsEventSource.SNS)
STEPFUNCTION = ("/aws/vendedlogs/states", AwsEventSource.STEPFUNCTION)
TRANSITGATEWAY = ("tgw-attach", AwsEventSource.TRANSITGATEWAY)

def __str__(self):
Expand Down
14 changes: 8 additions & 6 deletions aws/logs_monitoring/steps/handlers/awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from customized_log_group import (
is_lambda_customized_log_group,
is_step_functions_log_group,
get_lambda_function_name_from_logstream_name,
)
from steps.handlers.aws_attributes import AwsAttributes
Expand Down Expand Up @@ -102,11 +103,15 @@ def set_source(self, event):
source = str(AwsEventSource.BEDROCK)
self.metadata[DD_SOURCE] = parse_event_source(event, source)

# Special handling for customized log group of Lambda functions
# Multiple Lambda functions can share one single customized log group
# Need to parse logStream name to determine whether it is a Lambda function
# Special handling for customized log group of Lambda Functions and Step Functions
# Prefer to place the handling of customized log group at the bottom so that it can correct the source in some edge cases
# Multiple functions can share one single customized log group
# Need to parse logStream name to determine
if is_lambda_customized_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.LAMBDA)
# Regardless of whether the log group is customized, the corresponding log stream starts with 'states/'."
if is_step_functions_log_group(log_stream):
self.metadata[DD_SOURCE] = str(AwsEventSource.STEPFUNCTION)

def add_cloudwatch_tags_from_cache(self):
log_group_arn = self.aws_attributes.get_log_group_arn()
Expand Down Expand Up @@ -159,9 +164,6 @@ def handle_rds_source(self):
)

def handle_step_function_source(self):
if not self.aws_attributes.get_log_stream().startswith("states/"):
return

state_machine_arn = self.get_state_machine_arn()
if not state_machine_arn:
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"aws": {
"awslogs": {
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"owner": "425362996713"
}
},
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"ddsource": "stepfunction",
"ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
"service": "stepfunction"
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
}
},
"id": "37199773595581154154810589279545129148442535997644275712",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction1:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"message": "{\"id\": \"1\",\"type\": \"ExecutionStarted\",\"details\": {\"input\": \"{}\",\"inputDetails\": {\"truncated\": \"false\"},\"roleArn\": \"arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt\"},\"previous_event_id\": \"0\",\"event_timestamp\": \"1716992192441\",\"execution_arn\": \"arn:aws:states:us-east-1:12345678910:execution:StepFunction3:ccccccc-d1da-4c38-b32c-2b6b07d713fa\",\"redrive_count\": \"0\"}",
"timestamp": 1668095539607
}
]
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"ddsource": "stepfunction",
"ddtags": "env:dev,test_tag_key:test_tag_value,dd_step_functions_trace_enabled:true",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction1",
"host": "arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction3",
"service": "stepfunction"
}
70 changes: 69 additions & 1 deletion aws/logs_monitoring/tests/test_awslogs_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from approvaltests.approvals import verify_as_json
from approvaltests.namer import NamerFactory

from aws.logs_monitoring.steps.enums import AwsEventSource

sys.modules["trace_forwarder.connection"] = MagicMock()
sys.modules["datadog_lambda.wrapper"] = MagicMock()
sys.modules["datadog_lambda.metric"] = MagicMock()
Expand All @@ -22,6 +24,7 @@
},
)
env_patch.start()
from aws.logs_monitoring.settings import DD_HOST, DD_SOURCE
from steps.handlers.awslogs_handler import AwsLogsHandler
from steps.handlers.aws_attributes import AwsAttributes
from caching.cache_layer import CacheLayer
Expand Down Expand Up @@ -93,7 +96,7 @@ def test_awslogs_handler_step_functions_tags_added_properly(
{
"id": "37199773595581154154810589279545129148442535997644275712",
"timestamp": 1668095539607,
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction1:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction3:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
}
],
}
Expand All @@ -115,8 +118,73 @@ def test_awslogs_handler_step_functions_tags_added_properly(
cache_layer._cloudwatch_log_group_cache.get = MagicMock()

awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
# awslogs_handler.handle(event)
verify_as_json(list(awslogs_handler.handle(event)))
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
self.assertEqual(
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
)
self.assertEqual(
awslogs_handler.metadata[DD_HOST],
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction3",
)

@patch("caching.cloudwatch_log_group_cache.CloudwatchLogGroupTagsCache.__init__")
@patch("caching.cloudwatch_log_group_cache.send_forwarder_internal_metrics")
@patch.dict("os.environ", {"DD_STEP_FUNCTIONS_TRACE_ENABLED": "true"})
def test_awslogs_handler_step_functions_customized_log_group(
self,
mock_forward_metrics,
mock_cache_init,
):
# SF customized log group
eventFromCustomizedLogGroup = {
"awslogs": {
"data": base64.b64encode(
gzip.compress(
bytes(
json.dumps(
{
"messageType": "DATA_MESSAGE",
"owner": "425362996713",
"logGroup": "test/logs",
"logStream": "states/logs-to-traces-sequential/2022-11-10-15-50/7851b2d9",
"subscriptionFilters": ["testFilter"],
"logEvents": [
{
"id": "37199773595581154154810589279545129148442535997644275712",
"timestamp": 1668095539607,
"message": '{"id": "1","type": "ExecutionStarted","details": {"input": "{}","inputDetails": {"truncated": "false"},"roleArn": "arn:aws:iam::12345678910:role/service-role/StepFunctions-test-role-a0iurr4pt"},"previous_event_id": "0","event_timestamp": "1716992192441","execution_arn": "arn:aws:states:us-east-1:12345678910:execution:StepFunction2:ccccccc-d1da-4c38-b32c-2b6b07d713fa","redrive_count": "0"}',
}
],
}
),
"utf-8",
)
)
)
}
}
context = None
metadata = {"ddsource": "postgresql", "ddtags": "env:dev"}
mock_forward_metrics.side_effect = MagicMock()
mock_cache_init.return_value = None
cache_layer = CacheLayer("")
cache_layer._step_functions_cache.get = MagicMock(
return_value=["test_tag_key:test_tag_value"]
)
cache_layer._cloudwatch_log_group_cache.get = MagicMock()

awslogs_handler = AwsLogsHandler(context, metadata, cache_layer)
verify_as_json(list(awslogs_handler.handle(eventFromCustomizedLogGroup)))
verify_as_json(metadata, options=NamerFactory.with_parameters("metadata"))
self.assertEqual(
awslogs_handler.metadata[DD_SOURCE], AwsEventSource.STEPFUNCTION.value
)
self.assertEqual(
awslogs_handler.metadata[DD_HOST],
"arn:aws:states:us-east-1:12345678910:stateMachine:StepFunction2",
)

def test_process_lambda_logs(self):
# Non Lambda log
Expand Down
14 changes: 14 additions & 0 deletions aws/logs_monitoring/tests/test_customized_log_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from customized_log_group import (
is_lambda_customized_log_group,
get_lambda_function_name_from_logstream_name,
is_step_functions_log_group,
)


Expand Down Expand Up @@ -58,3 +59,16 @@ def get_lambda_function_name_from_logstream_name(self):
get_lambda_function_name_from_logstream_name(stepfunction_log_stream_name),
None,
)

def test_is_step_functions_log_group(self):
# Lambda logstream is false
lambda_log_stream_name = "2023/11/04/[$LATEST]4426346c2cdf4c54a74d3bd2b929fc44"
self.assertEqual(is_step_functions_log_group(lambda_log_stream_name), False)

# SF logstream is true
step_functions_log_stream_name = (
"states/selfmonit-statemachine/2024-11-04-15-30/00000000"
)
self.assertEqual(
is_step_functions_log_group(step_functions_log_stream_name), True
)
8 changes: 0 additions & 8 deletions aws/logs_monitoring/tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,6 @@ def test_carbon_black_event(self):
str(AwsEventSource.CARBONBLACK),
)

def test_step_function_event(self):
self.assertEqual(
parse_event_source(
{"awslogs": "logs"}, "/aws/vendedlogs/states/MyStateMachine-Logs"
),
str(AwsEventSource.STEPFUNCTION),
)

def test_cloudwatch_source_if_none_found(self):
self.assertEqual(
parse_event_source({"awslogs": "logs"}, ""), str(AwsEventSource.CLOUDWATCH)
Expand Down
Loading