Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-azure] default behavior testing #27041

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ def delete_blobs_with_prefix(prefix: str) -> None:
container_client.delete_blob(blob.name)


REQUIRES_ENV_CREDENTIALS = ["default-credential.yaml", "default-capture-behavior.yaml"]


@pytest.fixture(name="dagster_yaml")
def dagster_yaml_path(request) -> Generator[Path, None, None]:
additional_env_vars = {}
if request.param == "default-credential.yaml":
if request.param in REQUIRES_ENV_CREDENTIALS:
additional_env_vars = {
"AZURE_CLIENT_ID": os.environ["TEST_AZURE_CLIENT_ID"],
"AZURE_CLIENT_SECRET": os.environ["TEST_AZURE_CLIENT_SECRET"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
compute_logs:
module: dagster_azure.blob.compute_log_manager
class: AzureBlobComputeLogManager
config:
storage_account:
env: TEST_AZURE_STORAGE_ACCOUNT_ID
container:
env: TEST_AZURE_CONTAINER_ID
default_azure_credential:
exclude_environment_credential: false
prefix:
env: TEST_AZURE_LOG_PREFIX
local_dir: "/tmp/cool"
upload_interval: 30
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ compute_logs:
env: TEST_AZURE_LOG_PREFIX
local_dir: "/tmp/cool"
upload_interval: 30
show_url_only: true
show_url_only: true
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ compute_logs:
env: TEST_AZURE_LOG_PREFIX
local_dir: "/tmp/cool"
upload_interval: 30
show_url_only: true
show_url_only: true
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@
EventRecordsFilter,
_check as check,
)
from dagster._core.event_api import EventLogRecord

YAMLS_NOT_CAPTURED = ["default-capture-behavior.yaml"]


@pytest.mark.parametrize(
"dagster_yaml",
["secret-credential.yaml", "default-credential.yaml", "access-key-credential.yaml"],
[
"secret-credential.yaml",
"default-credential.yaml",
"access-key-credential.yaml",
"default-capture-behavior.yaml",
],
indirect=True,
)
def test_compute_log_manager(
Expand All @@ -28,18 +36,38 @@ def test_compute_log_manager(
["dagster", "asset", "materialize", "--select", "my_asset", "-m", "azure_test_proj.defs"],
check=True,
)
logs_captured_record = DagsterInstance.get().get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.LOGS_CAPTURED,
)
)[0]

stdout, stderr = (
get_captured_logs_from_urls(
logs_captured_record,
credentials,
)
if dagster_yaml.name not in YAMLS_NOT_CAPTURED
else get_captured_logs_from_run(
logs_captured_record,
prefix_env,
container_client,
)
)

assert stdout.count("Printing without context") == 10
assert stderr.count("Logging using context") == 10


def get_captured_logs_from_urls(
captured_logs_event: EventLogRecord, credentials: ClientSecretCredential
) -> tuple[str, str]:
logs_captured_data = check.not_none(
DagsterInstance.get()
.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.LOGS_CAPTURED,
)
)[0]
.event_log_entry.dagster_event
captured_logs_event.event_log_entry.dagster_event
).logs_captured_data

assert logs_captured_data.external_stderr_url
assert logs_captured_data.external_stdout_url
assert logs_captured_data.external_stderr_url is not None
mlarose marked this conversation as resolved.
Show resolved Hide resolved
assert logs_captured_data.external_stdout_url is not None

stderr = (
BlobClient.from_blob_url(logs_captured_data.external_stderr_url, credential=credentials)
Expand All @@ -54,6 +82,23 @@ def test_compute_log_manager(
.readall()
.decode()
)
return stdout, stderr

assert stdout.count("Printing without context") == 10
assert stderr.count("Logging using context") == 10

def get_captured_logs_from_run(
captured_logs_event: EventLogRecord, prefix_env: str, container_client: ContainerClient
) -> tuple[str, str]:
run_id = captured_logs_event.run_id
expected_log_folder = f"{prefix_env}/storage/{run_id}/compute_logs"
# list all blobs coming from this log folder
blob_list = list(container_client.list_blobs(name_starts_with=expected_log_folder))
assert len(blob_list) == 2
stdout = next(iter([blob for blob in blob_list if "out" in blob.name]))
stderr = next(iter([blob for blob in blob_list if "err" in blob.name]))
# get blob for each log
stdout_blob = container_client.get_blob_client(stdout.name)
stderr_blob = container_client.get_blob_client(stderr.name)
# download content
stdout_content = stdout_blob.download_blob().readall().decode()
stderr_content = stderr_blob.download_blob().readall().decode()
return stdout_content, stderr_content