Skip to content

Commit

Permalink
[dagster-azure] default behavior testing (#27041)
Browse files Browse the repository at this point in the history
## Summary & Motivation
This puts the default behavior, where we don't have access to the urls
from the captured log context, under test.

This is to guard against regressions in default behavior w.r.t. captured
logs, which we have seen previously.

## How I Tested These Changes
New test
  • Loading branch information
dpeng817 authored Jan 13, 2025
1 parent 579cccd commit 390efce
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ def delete_blobs_with_prefix(prefix: str) -> None:
container_client.delete_blob(blob.name)


REQUIRES_ENV_CREDENTIALS = ["default-credential.yaml", "default-capture-behavior.yaml"]


@pytest.fixture(name="dagster_yaml")
def dagster_yaml_path(request) -> Generator[Path, None, None]:
additional_env_vars = {}
if request.param == "default-credential.yaml":
if request.param in REQUIRES_ENV_CREDENTIALS:
additional_env_vars = {
"AZURE_CLIENT_ID": os.environ["TEST_AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["TEST_AZURE_CLIENT_SECRET"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
compute_logs:
module: dagster_azure.blob.compute_log_manager
class: AzureBlobComputeLogManager
config:
storage_account:
env: TEST_AZURE_STORAGE_ACCOUNT_ID
container:
env: TEST_AZURE_CONTAINER_ID
default_azure_credential:
exclude_environment_credential: false
prefix:
env: TEST_AZURE_LOG_PREFIX
local_dir: "/tmp/cool"
upload_interval: 30
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ compute_logs:
env: TEST_AZURE_LOG_PREFIX
local_dir: "/tmp/cool"
upload_interval: 30
show_url_only: true
show_url_only: true
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ compute_logs:
env: TEST_AZURE_LOG_PREFIX
local_dir: "/tmp/cool"
upload_interval: 30
show_url_only: true
show_url_only: true
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
EventRecordsFilter,
_check as check,
)
from dagster._core.event_api import EventLogRecord

YAMLS_NOT_CAPTURED = ["default-capture-behavior.yaml"]


@pytest.mark.parametrize(
"dagster_yaml",
["secret-credential.yaml", "default-credential.yaml", "access-key-credential.yaml"],
[
"secret-credential.yaml",
"default-credential.yaml",
"access-key-credential.yaml",
"default-capture-behavior.yaml",
],
indirect=True,
)
def test_compute_log_manager(
Expand All @@ -28,18 +36,38 @@ def test_compute_log_manager(
["dagster", "asset", "materialize", "--select", "my_asset", "-m", "azure_test_proj.defs"],
check=True,
)
logs_captured_record = DagsterInstance.get().get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.LOGS_CAPTURED,
)
)[0]

stdout, stderr = (
get_captured_logs_from_urls(
logs_captured_record,
credentials,
)
if dagster_yaml.name not in YAMLS_NOT_CAPTURED
else get_captured_logs_from_run(
logs_captured_record,
prefix_env,
container_client,
)
)

assert stdout.count("Printing without context") == 10
assert stderr.count("Logging using context") == 10


def get_captured_logs_from_urls(
captured_logs_event: EventLogRecord, credentials: ClientSecretCredential
) -> tuple[str, str]:
logs_captured_data = check.not_none(
DagsterInstance.get()
.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.LOGS_CAPTURED,
)
)[0]
.event_log_entry.dagster_event
captured_logs_event.event_log_entry.dagster_event
).logs_captured_data

assert logs_captured_data.external_stderr_url
assert logs_captured_data.external_stdout_url
assert logs_captured_data.external_stderr_url is not None
assert logs_captured_data.external_stdout_url is not None

stderr = (
BlobClient.from_blob_url(logs_captured_data.external_stderr_url, credential=credentials)
Expand All @@ -54,6 +82,23 @@ def test_compute_log_manager(
.readall()
.decode()
)
return stdout, stderr

assert stdout.count("Printing without context") == 10
assert stderr.count("Logging using context") == 10

def get_captured_logs_from_run(
captured_logs_event: EventLogRecord, prefix_env: str, container_client: ContainerClient
) -> tuple[str, str]:
run_id = captured_logs_event.run_id
expected_log_folder = f"{prefix_env}/storage/{run_id}/compute_logs"
# list all blobs coming from this log folder
blob_list = list(container_client.list_blobs(name_starts_with=expected_log_folder))
assert len(blob_list) == 2
stdout = next(iter([blob for blob in blob_list if "out" in blob.name]))
stderr = next(iter([blob for blob in blob_list if "err" in blob.name]))
# get blob for each log
stdout_blob = container_client.get_blob_client(stdout.name)
stderr_blob = container_client.get_blob_client(stderr.name)
# download content
stdout_content = stdout_blob.download_blob().readall().decode()
stderr_content = stderr_blob.download_blob().readall().decode()
return stdout_content, stderr_content

0 comments on commit 390efce

Please sign in to comment.