diff --git a/awswrangler/__version__.py b/awswrangler/__version__.py index d215126ac..37148dd84 100644 --- a/awswrangler/__version__.py +++ b/awswrangler/__version__.py @@ -1,4 +1,4 @@ __title__ = "awswrangler" __description__ = "Utility belt to handle data on AWS." -__version__ = "0.0.2" +__version__ = "0.0.3" __license__ = "Apache License 2.0" diff --git a/awswrangler/glue.py b/awswrangler/glue.py index c224a5ce0..c9c45ae67 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -203,17 +203,19 @@ def create_table(self, TableInput=table_input) def add_partitions(self, database, table, partition_paths, file_format, - extra_args): + compression, extra_args): if not partition_paths: return None partitions = list() for partition in partition_paths: if file_format == "parquet": partition_def = Glue.parquet_partition_definition( - partition=partition) + partition=partition, compression=compression) elif file_format == "csv": partition_def = Glue.csv_partition_definition( - partition=partition, extra_args=extra_args) + partition=partition, + compression=compression, + extra_args=extra_args) else: raise UnsupportedFileFormat(file_format) partitions.append(partition_def) @@ -225,8 +227,12 @@ def add_partitions(self, database, table, partition_paths, file_format, DatabaseName=database, TableName=table, PartitionInputList=page) - if len(res["Errors"]) > 0: - raise ApiError(f"{res['Errors'][0]}") + for error in res["Errors"]: + if "ErrorDetail" in error: + if "ErrorCode" in error["ErrorDetail"]: + if error["ErrorDetail"][ + "ErrorCode"] != "AlreadyExistsException": + raise ApiError(f"{error}") def get_connection_details(self, name): return self._client_glue.get_connection( @@ -355,7 +361,7 @@ def csv_table_definition(table, partition_cols_schema, schema, path, "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - "Compressed": True, + "Compressed": compressed, "NumberOfBuckets": -1, "SerdeInfo": { "Parameters": param, @@ -375,7 +381,8 @@ def csv_table_definition(table, partition_cols_schema, schema, path, } @staticmethod - def csv_partition_definition(partition, extra_args): + def csv_partition_definition(partition, compression, extra_args): + compressed = False if compression is None else True sep = extra_args["sep"] if "sep" in extra_args else "," serde = extra_args.get("serde") if serde == "OpenCSVSerDe": @@ -394,6 +401,7 @@ def csv_partition_definition(partition, extra_args): "StorageDescriptor": { "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", "Location": partition[0], + "Compressed": compressed, "SerdeInfo": { "Parameters": param, "SerializationLibrary": serde_fullname, @@ -454,11 +462,13 @@ def parquet_table_definition(table, partition_cols_schema, schema, path, } @staticmethod - def parquet_partition_definition(partition): + def parquet_partition_definition(partition, compression): + compressed = False if compression is None else True return { "StorageDescriptor": { "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", "Location": partition[0], + "Compressed": compressed, "SerdeInfo": { "Parameters": { "serialization.format": "1" diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index 152a10996..661f10cf3 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -433,10 +433,22 @@ def read_sql_athena(self, parse_dates=parse_timestamps, quoting=csv.QUOTE_ALL, max_result_size=max_result_size) - if len(ret.index) > 0: + if max_result_size is None: + if len(ret.index) > 0: + for col in parse_dates: + ret[col] = ret[col].dt.date + return ret + else: + return Pandas._apply_dates_to_generator( + generator=ret, parse_dates=parse_dates) + + @staticmethod + def _apply_dates_to_generator(generator, parse_dates): + for df in generator: + if len(df.index) > 0: for col in parse_dates: - ret[col] = ret[col].dt.date - return ret + df[col] = df[col].dt.date + yield df def to_csv( self, diff --git a/building/Dockerfile b/building/Dockerfile index 9c6061d14..dc43c4b60 100644 --- a/building/Dockerfile +++ b/building/Dockerfile @@ -1,4 +1,4 @@ -FROM lambci/lambda:build-python3.7 +FROM lambci/lambda:build-python3.6 RUN pip install --upgrade pip diff --git a/testing/test_awswrangler/test_pandas.py b/testing/test_awswrangler/test_pandas.py index f1bd57dde..f51c3e253 100644 --- a/testing/test_awswrangler/test_pandas.py +++ b/testing/test_awswrangler/test_pandas.py @@ -241,8 +241,6 @@ def test_to_s3( list(dataframe2.columns)) else: assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) - assert dataframe[dataframe["id"] == 0].iloc[0]["name"] == dataframe2[ - dataframe2["id"] == 0].iloc[0]["name"] def test_to_parquet_with_cast( @@ -594,8 +592,6 @@ def test_to_csv_with_sep( sleep(2) assert len(dataframe.index) == len(dataframe2.index) assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) - assert dataframe[dataframe["id"] == 0].iloc[0]["name"] == dataframe2[ - dataframe2["id"] == 0].iloc[0]["name"] def test_to_csv_serde_exception(