Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor to use the e2e test script #580

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 3 additions & 93 deletions dags/multipod/maxtext_gpu_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@


import datetime
import tempfile

from airflow import models
from airflow.decorators import task
from airflow.hooks.subprocess import SubprocessHook
from airflow.utils.task_group import TaskGroup
from dags import composer_env
from dags.common import test_owner
from dags.common.vm_resource import XpkClusters, CpuVersion, DockerImage, GpuVersion, Project, TpuVersion, Zone
from dags.common.vm_resource import XpkClusters, DockerImage
from dags.multipod.configs import gke_config
from xlml.utils import gke
from xlml.utils.gpu import scale_up_a3_cluster, scale_down_a3_cluster

# Run once a day at 4 am UTC (8 pm PST)
SCHEDULED_TIME = "0 4 * * *" if composer_env.is_prod_env() else None
Expand All @@ -35,93 +32,6 @@
A3_NUM_NODES = 10


def configure_project_and_cluster(project: str, cluster_name: str, zone: str):
region = gke.zone_to_region(zone)

gcloud_command = (
f"gcloud config set project {project}",
"sudo chown -R airflow:airflow /home/airflow/composer_kube_config",
f"gcloud container clusters get-credentials {cluster_name}"
f" --region {region}",
)
return gcloud_command


def resize_a3_cluster(cluster_name: str, zone: str, num_nodes: int):
region = gke.zone_to_region(zone)
node_pool = f"{cluster_name}-np-0"

gcloud_command = (
f"gcloud container clusters resize {cluster_name}"
f" --quiet --region {region}"
f" --node-pool {node_pool}"
f" --num-nodes {num_nodes}",
)
return gcloud_command


def wait_for_cluster_ready():
kubectl_command = (
"kubectl wait --for=condition=Ready nodes --all --timeout=5m",
)
return kubectl_command


@task
def scale_up_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
Project.SUPERCOMPUTER_TESTING.value,
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
A3_NUM_NODES,
)
+ wait_for_cluster_ready()
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


@task
def scale_down_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
Project.SUPERCOMPUTER_TESTING.value,
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
0,
)
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


def run_maxtext_tests(dag: models.DAG):
test_name_prefix = "maxtext"

Expand Down Expand Up @@ -247,7 +157,7 @@ def run_maxtext_tests(dag: models.DAG):
catchup=False,
) as dag:
with TaskGroup(group_id="scale_up", dag=dag) as scale_up:
scale_up_a3_cluster()
scale_up_a3_cluster(A3_NUM_NODES)

with TaskGroup(
group_id="run_tests", dag=dag, prefix_group_id=False
Expand Down
102 changes: 74 additions & 28 deletions dags/sparsity_diffusion_devx/maxtext_moe_gpu_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,107 @@


import datetime

from airflow import models
from airflow.utils.task_group import TaskGroup
from dags import composer_env
from dags.common import test_owner
from dags.common.vm_resource import XpkClusters, DockerImage
from dags.multipod.configs import gke_config
from xlml.utils.gpu import scale_up_a3_cluster, scale_down_a3_cluster


# Run once a day at 11 am UTC (3 am PST)
SCHEDULED_TIME = "0 11 * * *" if composer_env.is_prod_env() else None

# Number of nodes on A3 cluster to be scaled up to
A3_NUM_NODES = 3

with models.DAG(
dag_id="maxtext_moe_gpu_e2e",
schedule=SCHEDULED_TIME,
tags=[
"sparsity_diffusion_devx",
"multipod_team",
"maxtext",
"gpu",
"stable",
"nightly",
"mlscale_onduty",
],
start_date=datetime.datetime(2024, 12, 11),
catchup=False,
) as dag:
SCANNED_CHECKPOINT = "gs://ml-auto-solutions/output/sparsity_diffusion_devx/maxtext/chained_tests_mixtral-8x7b_nightly-2025-01-09-05-00-18//8x7b/scanned_ckpt/0/items"
UNSCANNED_CKPT_PATH = "gs://ml-auto-solutions/output/sparsity_diffusion_devx/maxtext/chained_tests_mixtral-8x7b_nightly-2025-01-09-05-00-18//unscanned_ckpt/checkpoints/0/items"


def run_maxtext_tests():
test_name_prefix = "maxtext"

timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M")
train_base = (
"python3 MaxText/train.py MaxText/configs/base.yml model_name=mixtral-8x7b "
"base_output_directory=gs://runner-maxtext-logs dataset_path=gs://maxtext-dataset "
"steps=2 per_device_batch_size=1 hardware=gpu dataset_type=synthetic attention=cudnn_flash_te "
"remat_policy=full use_iota_embed=True capacity_factor=1.0 "
"reuse_example_batch=1 enable_checkpointing=False megablox=False "
"weight_dtype=bfloat16 ici_expert_parallelism=-1 ici_fsdp_parallelism=1"
)
test_models_gpu = {
"mixtral-8x7b-1node": (
f"{train_base} run_name=runner-{timestamp}-1",
f"SCANNED_CHECKPOINT={SCANNED_CHECKPOINT} \
UNSCANNED_CKPT_PATH={UNSCANNED_CKPT_PATH} \
bash end_to_end/gpu/mixtral/test_8x7b.sh",
1,
),
"mixtral-8x7b-2node": (
f"{train_base} run_name=runner-{timestamp}-2",
f"SCANNED_CHECKPOINT={SCANNED_CHECKPOINT} \
UNSCANNED_CKPT_PATH={UNSCANNED_CKPT_PATH} \
bash end_to_end/gpu/mixtral/test_8x7b.sh",
2,
),
}

for model, (test_script, nnodes) in test_models_gpu.items():
pinned_a3_gpu = gke_config.get_maxtext_end_to_end_gpu_gke_test_config(
time_out_in_min=60,
time_out_in_min=90,
test_name=f"{test_name_prefix}-pinned-{model}",
run_model_cmds=(test_script,),
num_slices=nnodes,
cluster=XpkClusters.GPU_A3_CLUSTER,
docker_image=DockerImage.MAXTEXT_GPU_JAX_PINNED.value,
test_owner=test_owner.MICHELLE_Y,
).run()
pinned_a3_gpu
pinned_a3plus_gpu = gke_config.get_maxtext_end_to_end_gpu_gke_test_config(
time_out_in_min=90,
test_name=f"{test_name_prefix}-pinned-{model}",
run_model_cmds=(test_script,),
num_slices=nnodes,
cluster=XpkClusters.GPU_A3PLUS_CLUSTER,
docker_image=DockerImage.MAXTEXT_GPU_JAX_PINNED.value,
test_owner=test_owner.MICHELLE_Y,
).run()
stable_a3_gpu = gke_config.get_maxtext_end_to_end_gpu_gke_test_config(
time_out_in_min=90,
test_name=f"{test_name_prefix}-stable-{model}",
run_model_cmds=(test_script,),
num_slices=nnodes,
cluster=XpkClusters.GPU_A3_CLUSTER,
docker_image=DockerImage.MAXTEXT_GPU_JAX_STABLE_STACK.value,
test_owner=test_owner.MICHELLE_Y,
).run()
stable_a3plus_gpu = gke_config.get_maxtext_end_to_end_gpu_gke_test_config(
time_out_in_min=90,
test_name=f"{test_name_prefix}-stable-{model}",
run_model_cmds=(test_script,),
num_slices=nnodes,
cluster=XpkClusters.GPU_A3PLUS_CLUSTER,
docker_image=DockerImage.MAXTEXT_GPU_JAX_STABLE_STACK.value,
test_owner=test_owner.MICHELLE_Y,
).run()
pinned_a3_gpu >> pinned_a3plus_gpu >> stable_a3_gpu >> stable_a3plus_gpu


with models.DAG(
dag_id="maxtext_moe_gpu_e2e",
schedule=SCHEDULED_TIME,
tags=[
"sparsity_diffusion_devx",
"multipod_team",
"maxtext",
"gpu",
"stable",
"nightly",
],
start_date=datetime.datetime(2024, 12, 11),
catchup=False,
) as dag:
with TaskGroup(group_id="scale_up", dag=dag) as scale_up:
scale_up_a3_cluster(A3_NUM_NODES)

with TaskGroup(
group_id="run_tests", dag=dag, prefix_group_id=False
) as run_tests:
run_maxtext_tests()

with TaskGroup(group_id="scale_down", dag=dag) as scale_down:
scale_down_a3_cluster()

scale_up >> run_tests >> scale_down
79 changes: 78 additions & 1 deletion xlml/utils/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
from absl import logging
import airflow
from airflow.decorators import task, task_group
from airflow.hooks.subprocess import SubprocessHook
import datetime
import fabric
from google.cloud import compute_v1
import io
import paramiko
import re
import tempfile
import time
from typing import Dict, Iterable
import uuid
from xlml.apis import gcp_config, test_config
from xlml.utils import ssh
from xlml.utils import ssh, gke
from dags.common.vm_resource import XpkClusters
from dags.map_reproducibility.utils.common_utils import configure_project_and_cluster


def get_image_from_family(project: str, family: str) -> compute_v1.Image:
Expand Down Expand Up @@ -553,3 +557,76 @@ def wait_for_resource_deletion(operation_name: airflow.XComArg):

op = delete_resource_request(instance_name, project_id, zone)
wait_for_resource_deletion(op)


def resize_a3_cluster(cluster_name: str, zone: str, num_nodes: int):
region = gke.zone_to_region(zone)
node_pool = f"{cluster_name}-np-0"

gcloud_command = (
f"gcloud container clusters resize {cluster_name}"
f" --quiet --region {region}"
f" --node-pool {node_pool}"
f" --num-nodes {num_nodes}",
)
return gcloud_command


def wait_for_cluster_ready():
kubectl_command = (
"kubectl wait --for=condition=Ready nodes --all --timeout=5m",
)
return kubectl_command


@task
def scale_up_a3_cluster(num_nodes):
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
gke.zone_to_region(XpkClusters.GPU_A3_CLUSTER.zone),
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
Copy link
Collaborator

@parambole parambole Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michelle-yooh I believe there is an additional problem: running A3 tests affects the way clusters are set up. Given that this is the same approach used in the MaxText code, there could be a race condition where the cluster is being brought down while the tests are still running.

@yangyuwei correct me if I'm wrong, but the solution for this would be to bring up and down a new cluster or consolidate the tests into an existing DAG, which avoids rescaling a new cluster.

Hence, I asked about the importance of running the tests on A3.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parambole That's a valid concern. I had ignored it as the scheduled times for the DAGs are quite distant but I agree that there are still chances that other jobs are brought down in the mid way.

@yangyuwei What were the reasons behind the resizing approach instead of keeping the fixed capacity for A3 cluster?

XpkClusters.GPU_A3_CLUSTER.zone,
num_nodes,
)
+ wait_for_cluster_ready()
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"


@task
def scale_down_a3_cluster():
with tempfile.TemporaryDirectory() as tmpdir:
hook = SubprocessHook()

result = hook.run_command(
[
"bash",
"-c",
";".join(
configure_project_and_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
gke.zone_to_region(XpkClusters.GPU_A3_CLUSTER.zone),
)
+ resize_a3_cluster(
XpkClusters.GPU_A3_CLUSTER.name,
XpkClusters.GPU_A3_CLUSTER.zone,
0,
)
),
],
cwd=tmpdir,
)
assert result.exit_code == 0, f"Command failed with code {result.exit_code}"