Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add azure upload and download client logic #27

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ html = ["beautifulsoup4", "html5lib"]
email = ["email_validator"]
test = ["pytest", "pytest-cov", "pytest-loguru"]
dev = ["pre-commit"]
azure = ["azure.storage.blob","BlobServiceClient","ContentSettings","pandas"]


#########
Expand Down
197 changes: 197 additions & 0 deletions src/hdx/utilities/azure_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""All the logic around Azure blob uploads and downloads of files"""

import base64
import hashlib
import hmac
import io
import logging
from datetime import datetime
from os.path import exists
from typing import Any

try:
import pandas as pd
from azure.storage.blob import BlobServiceClient, ContentSettings
except ImportError:
BlobServiceClient = None
ContentSettings = None
pd = None

from .downloader import Download

logger = logging.getLogger(__name__)


class AzureBlobDownload(Download):
"""Wrapper for Azure Blob download logic"""

def download_file(
self,
url: str,
account: str,
container: str,
key: str,
blob: None,
**kwargs: Any,
) -> str:
"""Download a blob file from an Azure Storage

Args:
url (str): URL for the exact blob location
account (str): Storage account to access the blob
container (str): Container to download from
key (str): Key to access the blob
blob (str): Name of the blob to be downloaded. If empty, then it is assumed to download
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the default is None, the type should be Optional[str]

the whole container.
**kwargs: See below
path (str): Full path to use for downloaded file instead of folder and filename.
keep (bool): Whether to keep already downloaded file. Defaults to False.
post (bool): Whether to use POST instead of GET. Defaults to False.
parameters (Dict): Parameters to pass. Defaults to None.
timeout (float): Timeout for connecting to URL. Defaults to None (no timeout).
headers (Dict): Headers to pass. Defaults to None.
encoding (str): Encoding to use for text response. Defaults to None (best guess).
"""
path = kwargs.get("path")
keep = kwargs.get("keep", False)

request_time = datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
api_version = "2018-03-28"
parameters = {
"verb": "GET",
"Content-Encoding": "",
"Content-Language": "",
"Content-Length": "",
"Content-MD5": "",
"Content-Type": "",
"Date": "",
"If-Modified-Since": "",
"If-Match": "",
"If-None-Match": "",
"If-Unmodified-Since": "",
"Range": "",
"CanonicalizedHeaders": "x-ms-date:"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using f-strings for this and other cases of concatenating strings

+ request_time
+ "\nx-ms-version:"
+ api_version
+ "\n",
"CanonicalizedResource": "/"
+ account
+ "/"
+ container
+ "/"
+ blob,
}

signature = (
parameters["verb"]
+ "\n"
+ parameters["Content-Encoding"]
+ "\n"
+ parameters["Content-Language"]
+ "\n"
+ parameters["Content-Length"]
+ "\n"
+ parameters["Content-MD5"]
+ "\n"
+ parameters["Content-Type"]
+ "\n"
+ parameters["Date"]
+ "\n"
+ parameters["If-Modified-Since"]
+ "\n"
+ parameters["If-Match"]
+ "\n"
+ parameters["If-None-Match"]
+ "\n"
+ parameters["If-Unmodified-Since"]
+ "\n"
+ parameters["Range"]
+ "\n"
+ parameters["CanonicalizedHeaders"]
+ parameters["CanonicalizedResource"]
)
signed_string = base64.b64encode(
hmac.new(
base64.b64decode(key),
msg=signature.encode("utf-8"),
digestmod=hashlib.sha256,
).digest()
).decode()

headers = {
"x-ms-date": request_time,
"x-ms-version": api_version,
"Authorization": ("SharedKey " + account + ":" + signed_string,),
}

url = (
"https://"
+ account
+ ".blob.core.windows.net/"
+ container
+ "/"
+ blob
)

if keep and exists(url):
print(f"The blob URL exists: {url}")
return path
self.setup(
url=url,
stream=True,
post=kwargs.get("post", False),
parameters=kwargs.get("parameters"),
timeout=kwargs.get("timeout"),
headers=headers,
encoding=kwargs.get("encoding"),
)
return self.stream_path(
path, "Download of %s failed in retrieval of stream!" % url
)


class AzureBlobUpload:
"""Wrapper for Azure Blob upload logic"""

def upload_file(
self,
dataset_name: str,
filename: str,
account: str,
container: str,
key: str,
data: None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be helpful to users of the function specify a type hint eg. Optional[Dict] if it can be None or a dictionary

) -> str:
"""Upload a file to a blob storage within a container for an azure storage account
Args:
dataset_name (str): name of the dataset within the dictionary list to upload
filename (str): new name for the file once it is uploaded to the container
account (str): Storage account
container (str): Name of the container where the file will be uploaded to.
key (str): Access key to container
data : json type of dicts with multiple datasets or just one
"""

blob_service = BlobServiceClient.from_connection_string(
f"DefaultEndpointsProtocol=https;AccountName={account};AccountKey= "
f"{key};EndpointSuffix=core.windows.net"
)

blob_client = blob_service.get_blob_client(
container=container, blob=filename
)

try:
stream = io.StringIO()
df = pd.DataFrame(data[dataset_name])
df.to_csv(stream, sep=";")
file_to_blob = stream.getvalue()
blob_client.upload_blob(
file_to_blob,
overwrite=True,
content_settings=ContentSettings(content_type="text/csv"),
)
logger.info("Successfully uploaded: %s" % dataset_name)
except Exception:
logger.error("Failed to upload dataset: %s" % dataset_name)