Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gsutil command to gcs client library changes #186

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataproc_jupyter_plugin/commons/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
# DAG run IDs are largely free-form, but we still enforce some sanity checking
# on them in case the generated ID might cause issues with how we generate
# output file names.
DAG_RUN_ID_REGEXP = re.compile("[a-zA-Z0-9_:\\+-]+")
DAG_RUN_ID_REGEXP = re.compile("[a-zA-Z0-9_:\\+.-]+")

# This matches the requirements set by the scheduler form.
AIRFLOW_JOB_REGEXP = re.compile("[a-zA-Z0-9_-]+")
15 changes: 11 additions & 4 deletions dataproc_jupyter_plugin/services/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import subprocess
import urllib
from google.cloud import storage

from dataproc_jupyter_plugin import urls
from dataproc_jupyter_plugin.commons.commands import async_run_gsutil_subcommand
Expand Down Expand Up @@ -87,19 +88,25 @@ async def list_jobs(self, composer_name):
return {"error": str(e)}

async def delete_job(self, composer_name, dag_id, from_page):
airflow_uri, bucket = await self.get_airflow_uri(composer_name)
airflow_uri, bucket_name = await self.get_airflow_uri(composer_name)
try:
api_endpoint = f"{airflow_uri}/api/v1/dags/{dag_id}"
# Delete the DAG via the Airflow API if from_page is None
if from_page is None:
async with self.client_session.delete(
api_endpoint, headers=self.create_headers()
) as response:
self.log.info(response)
cmd = f"gsutil rm gs://{bucket}/dags/dag_{dag_id}.py"
await async_run_gsutil_subcommand(cmd)
bucket = storage.Client().bucket(bucket_name)
blob_name = f"dags/dag_{dag_id}.py"
blob = bucket.blob(blob_name)
blob.delete()

self.log.info(f"Deleted {blob_name} from bucket {bucket_name}")

return 0
except Exception as e:
self.log.exception(f"Error deleting dag: {str(e)}")
self.log.exception(f"Error deleting DAG: {str(e)}")
return {"error": str(e)}

async def update_job(self, composer_name, dag_id, status):
Expand Down
100 changes: 58 additions & 42 deletions dataproc_jupyter_plugin/services/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from datetime import datetime, timedelta
from google.cloud import storage
from google.api_core.exceptions import NotFound
import google.oauth2.credentials as oauth2
import aiofiles

import aiohttp
import pendulum
Expand Down Expand Up @@ -99,30 +101,33 @@ async def check_file_exists(self, bucket_name, file_path):
self.log.exception(f"Error checking file: {error}")
raise IOError(f"Error creating dag: {error}")

async def upload_papermill_to_gcs(self, gcs_dag_bucket):
env = Environment(
loader=PackageLoader(PACKAGE_NAME, "dagTemplates"),
autoescape=select_autoescape(["py"]),
)
wrapper_papermill_path = env.get_template("wrapper_papermill.py").filename
async def upload_to_gcs(
self, gcs_dag_bucket, file_path=None, template_name=None, destination_dir=None
):
try:
cmd = f"gsutil cp '{wrapper_papermill_path}' gs://{gcs_dag_bucket}/dataproc-notebooks/"
await async_run_gsutil_subcommand(cmd)
self.log.info("Papermill file uploaded to gcs successfully")
except subprocess.CalledProcessError as error:
self.log.exception(
f"Error uploading papermill file to gcs: {error.decode()}"
)
raise IOError(error.decode)
storage_client = storage.Client()
bucket = storage_client.bucket(gcs_dag_bucket)
if template_name:
env = Environment(
loader=PackageLoader(PACKAGE_NAME, TEMPLATES_FOLDER_PATH),
autoescape=select_autoescape(["py"]),
)
file_path = env.get_template(template_name).filename

async def upload_input_file_to_gcs(self, input, gcs_dag_bucket, job_name):
try:
cmd = f"gsutil cp './{input}' gs://{gcs_dag_bucket}/dataproc-notebooks/{job_name}/input_notebooks/"
await async_run_gsutil_subcommand(cmd)
self.log.info("Input file uploaded to gcs successfully")
except subprocess.CalledProcessError as error:
self.log.exception(f"Error uploading input file to gcs: {error.decode()}")
raise IOError(error.decode)
if not file_path:
raise ValueError("No file path or template name provided for upload.")
if destination_dir:
blob_name = f"{destination_dir}/{file_path.split('/')[-1]}"
else:
blob_name = f"{file_path.split('/')[-1]}"

blob = bucket.blob(blob_name)
blob.upload_from_filename(file_path)
self.log.info(f"File {file_path} uploaded to gcs successfully")

except Exception as error:
self.log.exception(f"Error uploading file to GCS: {str(error)}")
raise IOError(str(error))

def prepare_dag(self, job, gcs_dag_bucket, dag_file):
self.log.info("Generating dag file")
Expand Down Expand Up @@ -239,17 +244,7 @@ def prepare_dag(self, job, gcs_dag_bucket, dag_file):
)
wrapper_papermill_path = env.get_template("wrapper_papermill.py").filename
shutil.copy2(wrapper_papermill_path, LOCAL_DAG_FILE_LOCATION)

async def upload_dag_to_gcs(self, job, dag_file, gcs_dag_bucket):
LOCAL_DAG_FILE_LOCATION = f"./scheduled-jobs/{job.name}"
file_path = os.path.join(LOCAL_DAG_FILE_LOCATION, dag_file)
try:
cmd = f"gsutil cp '{file_path}' gs://{gcs_dag_bucket}/dags/"
await async_run_gsutil_subcommand(cmd)
self.log.info("Dag file uploaded to gcs successfully")
except subprocess.CalledProcessError as error:
self.log.exception(f"Error uploading dag file to gcs: {error.decode()}")
raise IOError(error.decode)
return file_path

async def execute(self, input_data):
try:
Expand All @@ -269,16 +264,24 @@ async def execute(self, input_data):
f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} exists."
)
else:
await self.upload_papermill_to_gcs(gcs_dag_bucket)
await self.upload_to_gcs(
gcs_dag_bucket,
template_name=WRAPPER_PAPPERMILL_FILE,
destination_dir="dataproc-notebooks",
)
print(
f"The file gs://{gcs_dag_bucket}/{wrapper_pappermill_file_path} does not exist."
)
if not job.input_filename.startswith(GCS):
await self.upload_input_file_to_gcs(
job.input_filename, gcs_dag_bucket, job_name
await self.upload_to_gcs(
gcs_dag_bucket,
file_path=f"./{job.input_filename}",
destination_dir=f"dataproc-notebooks/{job_name}/input_notebooks",
)
self.prepare_dag(job, gcs_dag_bucket, dag_file)
await self.upload_dag_to_gcs(job, dag_file, gcs_dag_bucket)
file_path = self.prepare_dag(job, gcs_dag_bucket, dag_file)
await self.upload_to_gcs(
gcs_dag_bucket, file_path=file_path, destination_dir="dags"
)
return {"status": 0}
except Exception as e:
return {"error": str(e)}
Expand All @@ -292,11 +295,24 @@ async def download_dag_output(
)
except Exception as ex:
return {"error": f"Invalid DAG run ID {dag_run_id}"}

try:
cmd = f"gsutil cp 'gs://{bucket_name}/dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb' ./"
await async_run_gsutil_subcommand(cmd)
self.log.info("Output notebook file downloaded successfully")
credentials = oauth2.Credentials(self._access_token)
storage_client = storage.Client(credentials=credentials)
blob_name = (
f"dataproc-output/{dag_id}/output-notebooks/{dag_id}_{dag_run_id}.ipynb"
)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
original_file_name = os.path.basename(blob_name)
destination_file_name = os.path.join(".", original_file_name)
async with aiofiles.open(destination_file_name, "wb") as f:
file_data = blob.download_as_bytes()
await f.write(file_data)
self.log.info(
f"Output notebook file '{original_file_name}' downloaded successfully"
)
return 0
except subprocess.CalledProcessError as error:
except Exception as error:
self.log.exception(f"Error downloading output notebook file: {str(error)}")
return {"error": str(error)}
66 changes: 32 additions & 34 deletions dataproc_jupyter_plugin/tests/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import json
import subprocess
from unittest.mock import AsyncMock, MagicMock

import aiohttp

Expand Down Expand Up @@ -80,41 +81,38 @@ async def mock_credentials():
payload = json.loads(response.body)
assert payload == {"error": "Missing required credentials"}


@pytest.mark.parametrize("returncode, expected_result", [(0, 0)])
async def test_delete_job(monkeypatch, returncode, expected_result, jp_fetch):

async def mock_async_command_executor(cmd):
if cmd is None:
raise ValueError("Received None for cmd parameter")
if returncode == 0:
return b"output", b""
else:
raise subprocess.CalledProcessError(
returncode, cmd, output=b"output", stderr=b"error in executing command"
)

monkeypatch.setattr(airflow.Client, "get_airflow_uri", mock_get_airflow_uri)
monkeypatch.setattr(
airflow, "async_run_gsutil_subcommand", mock_async_command_executor
@pytest.mark.asyncio
@pytest.mark.parametrize(
"from_page, expected_status", [(None, 0), ("some_page", 0)]
)
monkeypatch.setattr(aiohttp, "ClientSession", MockClientSession)
mock_composer = "mock-composer"
mock_dag_id = "mock_dag_id"
mock_from_page = "mock_from_page"
response = await jp_fetch(
"dataproc-plugin",
"dagDelete",
params={
"composer": mock_composer,
"dag_id": mock_dag_id,
"from_page": mock_from_page,
},
method="DELETE",
)
assert response.code == 200
payload = json.loads(response.body)
assert payload["status"] == 0
async def test_delete_job(monkeypatch, from_page, expected_status, jp_fetch):
monkeypatch.setattr(airflow.Client, "get_airflow_uri", mock_get_airflow_uri)
mock_delete = AsyncMock()
mock_delete.return_value.__aenter__.return_value.status = 200
mock_client_session = MagicMock()
mock_client_session.delete = mock_delete
monkeypatch.setattr(
"dagDelete.aiohttp.ClientSession", lambda: mock_client_session
)
mock_bucket = MagicMock()
mock_blob = MagicMock()
mock_bucket.blob.return_value = mock_blob
mock_storage_client = MagicMock()
mock_storage_client.bucket.return_value = mock_bucket
monkeypatch.setattr("dagDelete.storage.Client", lambda: mock_storage_client)
response = await jp_fetch(
"dataproc-plugin",
"dagDelete",
method="DELETE",
params={
"composer": "mock-composer",
"dag_id": "mock_dag_id",
"from_page": from_page,
},
)
assert response.code == 200
payload = json.loads(response.body)
assert payload["status"] == expected_status


class MockClientSession:
Expand Down
27 changes: 13 additions & 14 deletions dataproc_jupyter_plugin/tests/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import aiohttp
import pytest
from google.cloud import storage

from dataproc_jupyter_plugin import credentials
from dataproc_jupyter_plugin.commons import commands
from dataproc_jupyter_plugin.services import airflow
from dataproc_jupyter_plugin.services import executor
Expand Down Expand Up @@ -96,30 +98,27 @@ async def test_execute_success(
@pytest.mark.parametrize("returncode, expected_result", [(0, 0)])
async def test_download_dag_output(monkeypatch, returncode, expected_result, jp_fetch):

async def mock_async_command_executor(cmd):
if cmd is None:
raise ValueError("Received None for cmd parameter")
if returncode == 0:
return b"output", b""
else:
raise subprocess.CalledProcessError(
returncode, cmd, output=b"output", stderr=b"error in executing command"
)

async def mock_list_dag_run_task(*args, **kwargs):
return None

monkeypatch.setattr(airflow.Client, "list_dag_run_task", mock_list_dag_run_task)
monkeypatch.setattr(
executor, "async_run_gsutil_subcommand", mock_async_command_executor
)
monkeypatch.setattr(aiohttp, "ClientSession", MockClientSession)
mock_blob = MagicMock()
mock_blob.download_as_bytes.return_value = b"mock file content"

mock_bucket = MagicMock()
mock_bucket.blob.return_value = mock_blob

mock_storage_client = MagicMock()
mock_storage_client.bucket.return_value = mock_bucket
monkeypatch.setattr(credentials, "get_cached", mock_credentials)
monkeypatch.setattr(storage, "Client", lambda credentials=None: mock_storage_client)

mock_composer_name = "mock-composer"
mock_bucket_name = "mock_bucket"
mock_dag_id = "mock-dag-id"
mock_dag_run_id = "258"
command = f"gsutil cp 'gs://{mock_bucket_name}/dataproc-output/{mock_dag_id}/output-notebooks/{mock_dag_id}_{mock_dag_run_id}.ipynb' ./"

response = await jp_fetch(
"dataproc-plugin",
"downloadOutput",
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ dependencies = [
"pydantic~=1.10.0",
"bigframes~=0.22.0",
"aiohttp~=3.9.5",
"google-cloud-storage~=2.18.2"
"google-cloud-storage~=2.18.2",
"aiofiles>=22.1.0,<23"
]
dynamic = ["version", "description", "authors", "urls", "keywords"]

Expand Down
Loading