diff --git a/README.md b/README.md index edde276a8..07a1fd078 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ > Utility belt to handle data on AWS. -[![Release](https://img.shields.io/badge/release-0.0.20-brightgreen.svg)](https://pypi.org/project/awswrangler/) +[![Release](https://img.shields.io/badge/release-0.0.21-brightgreen.svg)](https://pypi.org/project/awswrangler/) [![Downloads](https://img.shields.io/pypi/dm/awswrangler.svg)](https://pypi.org/project/awswrangler/) [![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7-brightgreen.svg)](https://pypi.org/project/awswrangler/) [![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/en/latest/?badge=latest) diff --git a/awswrangler/__version__.py b/awswrangler/__version__.py index 8fb50e86f..1ce12f8b2 100644 --- a/awswrangler/__version__.py +++ b/awswrangler/__version__.py @@ -1,4 +1,4 @@ __title__ = "awswrangler" __description__ = "Utility belt to handle data on AWS." -__version__ = "0.0.20" +__version__ = "0.0.21" __license__ = "Apache License 2.0" diff --git a/awswrangler/s3.py b/awswrangler/s3.py index ec8e1c42d..d834aeafa 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -70,7 +70,9 @@ def parse_path(path): def parse_object_path(path): return path.replace("s3://", "").split("/", 1) - def delete_objects(self, path): + def delete_objects(self, path: str, procs_io_bound: Optional[int] = None) -> None: + if not procs_io_bound: + procs_io_bound = self._session.procs_io_bound bucket, path = self.parse_path(path=path) client = self._session.boto3_session.client(service_name="s3", config=self._session.botocore_config) procs = [] @@ -93,7 +95,7 @@ def delete_objects(self, path): proc.daemon = False proc.start() procs.append(proc) - if len(procs) == self._session.procs_io_bound: + if len(procs) == procs_io_bound: wait_process_release(procs) else: logger.debug(f"Starting last delete call...") diff --git a/awswrangler/spark.py b/awswrangler/spark.py index 27731dbfd..58be7768f 100644 --- a/awswrangler/spark.py +++ b/awswrangler/spark.py @@ -1,4 +1,4 @@ -from typing import List, Tuple, Dict, Any +from typing import List, Tuple, Dict, Any, Optional import logging import os @@ -18,11 +18,7 @@ class Spark: def __init__(self, session): self._session = session - cpus: int = os.cpu_count() - if cpus == 1: - self._procs_io_bound: int = 1 - else: - self._procs_io_bound = int(cpus / 2) + self._procs_io_bound: int = 1 logging.info(f"_procs_io_bound: {self._procs_io_bound}") def read_csv(self, **args) -> DataFrame: @@ -61,9 +57,9 @@ def to_redshift( table: str, iam_role: str, diststyle: str = "AUTO", - distkey=None, + distkey: Optional[str] = None, sortstyle: str = "COMPOUND", - sortkey=None, + sortkey: Optional[str] = None, min_num_partitions: int = 200, mode: str = "append", ) -> None: @@ -87,7 +83,7 @@ def to_redshift( logger.debug(f"Minimum number of partitions : {min_num_partitions}") if path[-1] != "/": path += "/" - self._session.s3.delete_objects(path=path) + self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound) spark: SparkSession = self._session.spark_session casts: Dict[str, str] = Spark._extract_casts(dataframe.dtypes) dataframe = Spark.date2timestamp(dataframe) @@ -125,9 +121,9 @@ def write(pandas_dataframe: pd.DataFrame) -> pd.DataFrame: cast_columns=casts) return pd.DataFrame.from_dict({"objects_paths": paths}) - df_objects_paths = dataframe.repartition(numPartitions=num_partitions) # type: ignore - df_objects_paths = df_objects_paths.withColumn(par_col_name, spark_partition_id()) # type: ignore - df_objects_paths = df_objects_paths.groupby(par_col_name).apply(write) # type: ignore + df_objects_paths: DataFrame = dataframe.repartition(numPartitions=num_partitions) # type: ignore + df_objects_paths: DataFrame = df_objects_paths.withColumn(par_col_name, spark_partition_id()) # type: ignore + df_objects_paths: DataFrame = df_objects_paths.groupby(par_col_name).apply(write) # type: ignore objects_paths: List[str] = list(df_objects_paths.toPandas()["objects_paths"]) dataframe.unpersist() @@ -155,7 +151,7 @@ def write(pandas_dataframe: pd.DataFrame) -> pd.DataFrame: sortkey=sortkey, mode=mode, cast_columns=casts) - self._session.s3.delete_objects(path=path) + self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound) def create_glue_table(self, database, diff --git a/building/publish.sh b/building/publish.sh index 1bc47356d..8afcbd1b1 100755 --- a/building/publish.sh +++ b/building/publish.sh @@ -3,6 +3,8 @@ set -e cd .. rm -fr build dist .egg awswrangler.egg-info -python setup.py sdist +python3.6 setup.py bdist_egg +python3.6 setup.py bdist_wheel +python3.6 setup.py sdist twine upload dist/* rm -fr build dist .egg awswrangler.egg-info diff --git a/testing/test_awswrangler/test_redshift.py b/testing/test_awswrangler/test_redshift.py index a6ccc1eac..4a7dfb1ff 100644 --- a/testing/test_awswrangler/test_redshift.py +++ b/testing/test_awswrangler/test_redshift.py @@ -281,12 +281,7 @@ def test_to_redshift_spark_big(session, bucket, redshift_parameters): def test_to_redshift_spark_bool(session, bucket, redshift_parameters): - dataframe = session.spark_session.createDataFrame( - pd.DataFrame({ - "A": [1, 2, 3], - "B": [True, False, True] - }) - ) + dataframe = session.spark_session.createDataFrame(pd.DataFrame({"A": [1, 2, 3], "B": [True, False, True]})) print(dataframe) print(dataframe.dtypes) con = Redshift.generate_connection( @@ -318,12 +313,11 @@ def test_to_redshift_spark_bool(session, bucket, redshift_parameters): def test_stress_to_redshift_spark_big(session, bucket, redshift_parameters): - dataframe = session.spark_session.createDataFrame( - pd.DataFrame({ - "A": list(range(1_000_000)), - "B": list(range(1_000_000)), - "C": list(range(1_000_000)) - })) + print("Creating DataFrame...") + dataframe = session.spark_session.createDataFrame(pd.DataFrame({ + "A": list(range(10_000)), + "B": list(range(10_000)) + })) dataframe.cache() for i in range(10): print(f"Run number: {i}")