Skip to content

Commit

Permalink
Bumping version to 0.0.24
Browse files Browse the repository at this point in the history
  • Loading branch information
igorborgest committed Dec 5, 2019
1 parent 24d315e commit a755c71
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 270 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

> Utility belt to handle data on AWS.
[![Release](https://img.shields.io/badge/release-0.0.23-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Release](https://img.shields.io/badge/release-0.0.24-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Downloads](https://img.shields.io/pypi/dm/awswrangler.svg)](https://pypi.org/project/awswrangler/)
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/en/latest/?badge=latest)
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__title__ = "awswrangler"
__description__ = "Utility belt to handle data on AWS."
__version__ = "0.0.23"
__version__ = "0.0.24"
__license__ = "Apache License 2.0"
47 changes: 38 additions & 9 deletions awswrangler/athena.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, List, Tuple, Optional, Any, Iterator, Union
from typing import Dict, List, Tuple, Optional, Any, Iterator
from time import sleep
import logging
import re
Expand All @@ -25,7 +25,6 @@ def get_query_columns_metadata(self, query_execution_id: str) -> Dict[str, str]:
"""
response: Dict = self._client_athena.get_query_results(QueryExecutionId=query_execution_id, MaxResults=1)
col_info: List[Dict[str, str]] = response["ResultSet"]["ResultSetMetadata"]["ColumnInfo"]
logger.debug(f"col_info: {col_info}")
return {x["Name"]: x["Type"] for x in col_info}

def create_athena_bucket(self):
Expand All @@ -42,7 +41,13 @@ def create_athena_bucket(self):
s3_resource.Bucket(s3_output)
return s3_output

def run_query(self, query: str, database: Optional[str] = None, s3_output: Optional[str] = None, workgroup: Optional[str] = None, encryption: Optional[str] = None, kms_key: Optional[str] = None) -> str:
def run_query(self,
query: str,
database: Optional[str] = None,
s3_output: Optional[str] = None,
workgroup: Optional[str] = None,
encryption: Optional[str] = None,
kms_key: Optional[str] = None) -> str:
"""
Run a SQL Query against AWS Athena
P.S All default values will be inherited from the Session()
Expand All @@ -55,7 +60,7 @@ def run_query(self, query: str, database: Optional[str] = None, s3_output: Optio
:param kms_key: For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID.
:return: Query execution ID
"""
args: Dict[str, Union[str, Dict[str, Union[str, Dict[str, str]]]]] = {"QueryString": query}
args: Dict[str, Any] = {"QueryString": query}

# s3_output
if s3_output is None:
Expand All @@ -71,7 +76,9 @@ def run_query(self, query: str, database: Optional[str] = None, s3_output: Optio
if kms_key is not None:
args["ResultConfiguration"]["EncryptionConfiguration"]["KmsKey"] = kms_key
elif self._session.athena_encryption is not None:
args["ResultConfiguration"]["EncryptionConfiguration"] = {"EncryptionOption": self._session.athena_encryption}
args["ResultConfiguration"]["EncryptionConfiguration"] = {
"EncryptionOption": self._session.athena_encryption
}
if self._session.athena_kms_key is not None:
args["ResultConfiguration"]["EncryptionConfiguration"]["KmsKey"] = self._session.athena_kms_key

Expand Down Expand Up @@ -113,7 +120,13 @@ def wait_query(self, query_execution_id):
raise QueryCancelled(response["QueryExecution"]["Status"].get("StateChangeReason"))
return response

def repair_table(self, table: str, database: Optional[str] = None, s3_output: Optional[str] = None, workgroup: Optional[str] = None, encryption: Optional[str] = None, kms_key: Optional[str] = None):
def repair_table(self,
table: str,
database: Optional[str] = None,
s3_output: Optional[str] = None,
workgroup: Optional[str] = None,
encryption: Optional[str] = None,
kms_key: Optional[str] = None):
"""
Hive's metastore consistency check
"MSCK REPAIR TABLE table;"
Expand All @@ -133,7 +146,12 @@ def repair_table(self, table: str, database: Optional[str] = None, s3_output: Op
:return: Query execution ID
"""
query = f"MSCK REPAIR TABLE {table};"
query_id = self.run_query(query=query, database=database, s3_output=s3_output, workgroup=workgroup, encryption=encryption, kms_key=kms_key)
query_id = self.run_query(query=query,
database=database,
s3_output=s3_output,
workgroup=workgroup,
encryption=encryption,
kms_key=kms_key)
self.wait_query(query_execution_id=query_id)
return query_id

Expand Down Expand Up @@ -174,7 +192,13 @@ def get_results(self, query_execution_id: str) -> Iterator[Dict[str, Any]]:
yield row
next_token = res.get("NextToken")

def query(self, query: str, database: Optional[str] = None, s3_output: Optional[str] = None, workgroup: Optional[str] = None, encryption: Optional[str] = None, kms_key: Optional[str] = None) -> Iterator[Dict[str, Any]]:
def query(self,
query: str,
database: Optional[str] = None,
s3_output: Optional[str] = None,
workgroup: Optional[str] = None,
encryption: Optional[str] = None,
kms_key: Optional[str] = None) -> Iterator[Dict[str, Any]]:
"""
Run a SQL Query against AWS Athena and return the result as a Iterator of lists
P.S All default values will be inherited from the Session()
Expand All @@ -187,7 +211,12 @@ def query(self, query: str, database: Optional[str] = None, s3_output: Optional[
:param kms_key: For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID.
:return: Query execution ID
"""
query_id: str = self.run_query(query=query, database=database, s3_output=s3_output, workgroup=workgroup, encryption=encryption, kms_key=kms_key)
query_id: str = self.run_query(query=query,
database=database,
s3_output=s3_output,
workgroup=workgroup,
encryption=encryption,
kms_key=kms_key)
self.wait_query(query_execution_id=query_id)
return self.get_results(query_execution_id=query_id)

Expand Down
3 changes: 2 additions & 1 deletion awswrangler/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ def convert_schema(func: Callable, schema: List[Tuple[str, str]]) -> Dict[str, s
return {name: func(dtype) for name, dtype in schema}


def extract_pyarrow_schema_from_pandas(dataframe: pd.DataFrame, preserve_index: bool,
def extract_pyarrow_schema_from_pandas(dataframe: pd.DataFrame,
preserve_index: bool,
indexes_position: str = "right") -> List[Tuple[str, str]]:
"""
Extract the related Pyarrow schema from any Pandas DataFrame
Expand Down
5 changes: 4 additions & 1 deletion awswrangler/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,10 @@ def submit_step(self,
logger.info(f"response: \n{json.dumps(response, default=str, indent=4)}")
return response["StepIds"][0]

def build_step(self, name: str, command: str, action_on_failure: str = "CONTINUE",
def build_step(self,
name: str,
command: str,
action_on_failure: str = "CONTINUE",
script: bool = False) -> Dict[str, Collection[str]]:
"""
Build the Step dictionary
Expand Down
166 changes: 93 additions & 73 deletions awswrangler/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import csv
from datetime import datetime
from decimal import Decimal
from ast import literal_eval

from botocore.exceptions import ClientError, HTTPClientError # type: ignore
import pandas as pd # type: ignore
Expand Down Expand Up @@ -46,24 +47,24 @@ def _parse_path(path):
return parts[0], parts[2]

def read_csv(
self,
path,
max_result_size=None,
header="infer",
names=None,
usecols=None,
dtype=None,
sep=",",
thousands=None,
decimal=".",
lineterminator="\n",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
escapechar=None,
parse_dates: Union[bool, Dict, List] = False,
infer_datetime_format=False,
encoding="utf-8",
converters=None,
self,
path,
max_result_size=None,
header="infer",
names=None,
usecols=None,
dtype=None,
sep=",",
thousands=None,
decimal=".",
lineterminator="\n",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
escapechar=None,
parse_dates: Union[bool, Dict, List] = False,
infer_datetime_format=False,
encoding="utf-8",
converters=None,
):
"""
Read CSV file from AWS S3 using optimized strategies.
Expand Down Expand Up @@ -137,25 +138,25 @@ def read_csv(

@staticmethod
def _read_csv_iterator(
client_s3,
bucket_name,
key_path,
max_result_size=200_000_000, # 200 MB
header="infer",
names=None,
usecols=None,
dtype=None,
sep=",",
thousands=None,
decimal=".",
lineterminator="\n",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
escapechar=None,
parse_dates: Union[bool, Dict, List] = False,
infer_datetime_format=False,
encoding="utf-8",
converters=None,
client_s3,
bucket_name,
key_path,
max_result_size=200_000_000, # 200 MB
header="infer",
names=None,
usecols=None,
dtype=None,
sep=",",
thousands=None,
decimal=".",
lineterminator="\n",
quotechar='"',
quoting=csv.QUOTE_MINIMAL,
escapechar=None,
parse_dates: Union[bool, Dict, List] = False,
infer_datetime_format=False,
encoding="utf-8",
converters=None,
):
"""
Read CSV file from AWS S3 using optimized strategies.
Expand Down Expand Up @@ -350,24 +351,24 @@ def _find_terminator(body, sep, quoting, quotechar, lineterminator):

@staticmethod
def _read_csv_once(
client_s3,
bucket_name,
key_path,
header="infer",
names=None,
usecols=None,
dtype=None,
sep=",",
thousands=None,
decimal=".",
lineterminator="\n",
quotechar='"',
quoting=0,
escapechar=None,
parse_dates: Union[bool, Dict, List] = False,
infer_datetime_format=False,
encoding=None,
converters=None,
client_s3,
bucket_name,
key_path,
header="infer",
names=None,
usecols=None,
dtype=None,
sep=",",
thousands=None,
decimal=".",
lineterminator="\n",
quotechar='"',
quoting=0,
escapechar=None,
parse_dates: Union[bool, Dict, List] = False,
infer_datetime_format=False,
encoding=None,
converters=None,
):
"""
Read CSV file from AWS S3 using optimized strategies.
Expand Down Expand Up @@ -420,9 +421,17 @@ def _read_csv_once(

@staticmethod
def _list_parser(value: str) -> List[Union[int, float, str, None]]:
# try resolve with a simple literal_eval
try:
return literal_eval(value)
except ValueError:
pass # keep trying

# sanity check
if len(value) <= 1:
return []
items: List[None, str] = [None if x == "null" else x for x in value[1:-1].split(", ")]

items: List[Union[None, str]] = [None if x == "null" else x for x in value[1:-1].split(", ")]
array_type: Optional[type] = None

# check if all values are integers
Expand Down Expand Up @@ -481,8 +490,14 @@ def _get_query_dtype(self, query_execution_id: str) -> Tuple[Dict[str, str], Lis
logger.debug(f"converters: {converters}")
return dtype, parse_timestamps, parse_dates, converters

def read_sql_athena(self, sql, database=None, s3_output=None, max_result_size=None, workgroup=None,
encryption=None, kms_key=None):
def read_sql_athena(self,
sql,
database=None,
s3_output=None,
max_result_size=None,
workgroup=None,
encryption=None,
kms_key=None):
"""
Executes any SQL query on AWS Athena and return a Dataframe of the result.
P.S. If max_result_size is passed, then a iterator of Dataframes is returned.
Expand All @@ -499,7 +514,12 @@ def read_sql_athena(self, sql, database=None, s3_output=None, max_result_size=No
"""
if not s3_output:
s3_output = self._session.athena.create_athena_bucket()
query_execution_id = self._session.athena.run_query(query=sql, database=database, s3_output=s3_output, workgroup=workgroup, encryption=encryption, kms_key=kms_key)
query_execution_id = self._session.athena.run_query(query=sql,
database=database,
s3_output=s3_output,
workgroup=workgroup,
encryption=encryption,
kms_key=kms_key)
query_response = self._session.athena.wait_query(query_execution_id=query_execution_id)
if query_response["QueryExecution"]["Status"]["State"] in ["FAILED", "CANCELLED"]:
reason = query_response["QueryExecution"]["Status"]["StateChangeReason"]
Expand Down Expand Up @@ -532,19 +552,19 @@ def _apply_dates_to_generator(generator, parse_dates):
yield df

def to_csv(
self,
dataframe,
path,
sep=",",
serde="OpenCSVSerDe",
database: Optional[str] = None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
procs_cpu_bound=None,
procs_io_bound=None,
inplace=True,
self,
dataframe,
path,
sep=",",
serde="OpenCSVSerDe",
database: Optional[str] = None,
table=None,
partition_cols=None,
preserve_index=True,
mode="append",
procs_cpu_bound=None,
procs_io_bound=None,
inplace=True,
):
"""
Write a Pandas Dataframe as CSV files on S3
Expand Down Expand Up @@ -806,7 +826,7 @@ def _data_to_s3_dataset_writer(dataframe,
for keys, subgroup in dataframe.groupby(partition_cols):
subgroup = subgroup.drop(partition_cols, axis="columns")
if not isinstance(keys, tuple):
keys = (keys,)
keys = (keys, )
subdir = "/".join([f"{name}={val}" for name, val in zip(partition_cols, keys)])
prefix = "/".join([path, subdir])
object_path = Pandas._data_to_s3_object_writer(dataframe=subgroup,
Expand Down
7 changes: 5 additions & 2 deletions awswrangler/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ def get_connection(self, glue_connection):
conn = self.generate_connection(database=database, host=host, port=int(port), user=user, password=password)
return conn

def write_load_manifest(self, manifest_path: str, objects_paths: List[str], procs_io_bound: Optional[int] = None
) -> Dict[str, List[Dict[str, Union[str, bool, Dict[str, int]]]]]:
def write_load_manifest(
self,
manifest_path: str,
objects_paths: List[str],
procs_io_bound: Optional[int] = None) -> Dict[str, List[Dict[str, Union[str, bool, Dict[str, int]]]]]:
objects_sizes: Dict[str, int] = self._session.s3.get_objects_sizes(objects_paths=objects_paths,
procs_io_bound=procs_io_bound)
manifest: Dict[str, List[Dict[str, Union[str, bool, Dict[str, int]]]]] = {"entries": []}
Expand Down
Loading

0 comments on commit a755c71

Please sign in to comment.