diff --git a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/conftest.py b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/conftest.py index cf5b8dd0b1a31..98380aab384a6 100644 --- a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/conftest.py +++ b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/conftest.py @@ -42,10 +42,13 @@ def delete_blobs_with_prefix(prefix: str) -> None: container_client.delete_blob(blob.name) +REQUIRES_ENV_CREDENTIALS = ["default-credential.yaml", "default-capture-behavior.yaml"] + + @pytest.fixture(name="dagster_yaml") def dagster_yaml_path(request) -> Generator[Path, None, None]: additional_env_vars = {} - if request.param == "default-credential.yaml": + if request.param in REQUIRES_ENV_CREDENTIALS: additional_env_vars = { "AZURE_CLIENT_ID": os.environ["TEST_AZURE_CLIENT_ID"], "AZURE_CLIENT_SECRET": os.environ["TEST_AZURE_CLIENT_SECRET"], diff --git a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-capture-behavior.yaml b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-capture-behavior.yaml new file mode 100644 index 0000000000000..5d1f6942087b8 --- /dev/null +++ b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-capture-behavior.yaml @@ -0,0 +1,14 @@ +compute_logs: + module: dagster_azure.blob.compute_log_manager + class: AzureBlobComputeLogManager + config: + storage_account: + env: TEST_AZURE_STORAGE_ACCOUNT_ID + container: + env: TEST_AZURE_CONTAINER_ID + default_azure_credential: + exclude_environment_credential: false + prefix: + env: TEST_AZURE_LOG_PREFIX + local_dir: "/tmp/cool" + upload_interval: 30 \ No newline at end of file diff --git a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-credential.yaml b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-credential.yaml index be792bf8ee979..589ecc9be498b 100644 --- a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-credential.yaml +++ b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/default-credential.yaml @@ -12,4 +12,4 @@ compute_logs: env: TEST_AZURE_LOG_PREFIX local_dir: "/tmp/cool" upload_interval: 30 - show_url_only: true \ No newline at end of file + show_url_only: true diff --git a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/secret-credential.yaml b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/secret-credential.yaml index 076bbd85abd4a..22113de5bd68b 100644 --- a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/secret-credential.yaml +++ b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/dagster-yamls/secret-credential.yaml @@ -17,4 +17,4 @@ compute_logs: env: TEST_AZURE_LOG_PREFIX local_dir: "/tmp/cool" upload_interval: 30 - show_url_only: true \ No newline at end of file + show_url_only: true diff --git a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py index d2879eaaf7604..fd5f4360578a7 100644 --- a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py +++ b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py @@ -10,11 +10,19 @@ EventRecordsFilter, _check as check, ) +from dagster._core.event_api import EventLogRecord + +YAMLS_NOT_CAPTURED = ["default-capture-behavior.yaml"] @pytest.mark.parametrize( "dagster_yaml", - ["secret-credential.yaml", "default-credential.yaml", "access-key-credential.yaml"], + [ + "secret-credential.yaml", + "default-credential.yaml", + "access-key-credential.yaml", + "default-capture-behavior.yaml", + ], indirect=True, ) def test_compute_log_manager( @@ -28,18 +36,38 @@ def test_compute_log_manager( ["dagster", "asset", "materialize", "--select", "my_asset", "-m", "azure_test_proj.defs"], check=True, ) + logs_captured_record = DagsterInstance.get().get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.LOGS_CAPTURED, + ) + )[0] + + stdout, stderr = ( + get_captured_logs_from_urls( + logs_captured_record, + credentials, + ) + if dagster_yaml.name not in YAMLS_NOT_CAPTURED + else get_captured_logs_from_run( + logs_captured_record, + prefix_env, + container_client, + ) + ) + + assert stdout.count("Printing without context") == 10 + assert stderr.count("Logging using context") == 10 + + +def get_captured_logs_from_urls( + captured_logs_event: EventLogRecord, credentials: ClientSecretCredential +) -> tuple[str, str]: logs_captured_data = check.not_none( - DagsterInstance.get() - .get_event_records( - EventRecordsFilter( - event_type=DagsterEventType.LOGS_CAPTURED, - ) - )[0] - .event_log_entry.dagster_event + captured_logs_event.event_log_entry.dagster_event ).logs_captured_data - assert logs_captured_data.external_stderr_url - assert logs_captured_data.external_stdout_url + assert logs_captured_data.external_stderr_url is not None + assert logs_captured_data.external_stdout_url is not None stderr = ( BlobClient.from_blob_url(logs_captured_data.external_stderr_url, credential=credentials) @@ -54,6 +82,23 @@ def test_compute_log_manager( .readall() .decode() ) + return stdout, stderr - assert stdout.count("Printing without context") == 10 - assert stderr.count("Logging using context") == 10 + +def get_captured_logs_from_run( + captured_logs_event: EventLogRecord, prefix_env: str, container_client: ContainerClient +) -> tuple[str, str]: + run_id = captured_logs_event.run_id + expected_log_folder = f"{prefix_env}/storage/{run_id}/compute_logs" + # list all blobs coming from this log folder + blob_list = list(container_client.list_blobs(name_starts_with=expected_log_folder)) + assert len(blob_list) == 2 + stdout = next(iter([blob for blob in blob_list if "out" in blob.name])) + stderr = next(iter([blob for blob in blob_list if "err" in blob.name])) + # get blob for each log + stdout_blob = container_client.get_blob_client(stdout.name) + stderr_blob = container_client.get_blob_client(stderr.name) + # download content + stdout_content = stdout_blob.download_blob().readall().decode() + stderr_content = stderr_blob.download_blob().readall().decode() + return stdout_content, stderr_content