Skip to content

Commit

Permalink
Merge pull request #30 from awslabs/spark-with-glue-table
Browse files Browse the repository at this point in the history
Add Spark.create_glue_table() and Athena.repair_table()
  • Loading branch information
igorborgest authored Sep 21, 2019
2 parents 54a0d2c + d33ba61 commit 74dced2
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 20 deletions.
38 changes: 32 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
* Pandas -> Redshift (Parallel)
* CSV (S3) -> Pandas (One shot or Batching)
* Athena -> Pandas (One shot or Batching)
* CloudWatch Logs Insights -> Pandas (NEW :star:)
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW :star:)
* CloudWatch Logs Insights -> Pandas
* Encrypt Pandas Dataframes on S3 with KMS keys

### PySpark
* PySpark -> Redshift (Parallel) (NEW :star:)
* PySpark -> Redshift (Parallel)
* Register Glue table from Dataframe stored on S3 (NEW :star:)

### General
* List S3 objects (Parallel)
Expand All @@ -35,7 +36,8 @@
* Delete NOT listed S3 objects (Parallel)
* Copy listed S3 objects (Parallel)
* Get the size of S3 objects (Parallel)
* Get CloudWatch Logs Insights query results (NEW :star:)
* Get CloudWatch Logs Insights query results
* Load partitions on Athena/Glue table (repair table) (NEW :star:)

## Installation

Expand Down Expand Up @@ -166,6 +168,23 @@ session.spark.to_redshift(
)
```

#### Register Glue table from Dataframe stored on S3

```py3
dataframe.write \
.mode("overwrite") \
.format("parquet") \
.partitionBy(["year", "month"]) \
.save(compression="gzip", path="s3://...")
session = awswrangler.Session(spark_session=spark)
session.spark.create_glue_table(dataframe=dataframe,
file_format="parquet",
partition_by=["year", "month"],
path="s3://...",
compression="gzip",
database="my_database")
```

### General

#### Deleting a bunch of S3 objects (parallel :rocket:)
Expand All @@ -185,6 +204,13 @@ results = session.cloudwatchlogs.query(
)
```

#### Load partitions on Athena/Glue table (repair table)

```py3
session = awswrangler.Session()
session.athena.repair_table(database="db_name", table="tbl_name")
```

## Diving Deep

### Pandas to Redshift Flow
Expand Down Expand Up @@ -217,13 +243,13 @@ results = session.cloudwatchlogs.query(

* Fork the AWS Data Wrangler repository and clone that into your development environment

* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source source venv/bin/activate**)
* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source venv/bin/activate**)

* Run **./install-dev.sh**

* Go to the *testing* directory

* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World!)
* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World! Configure your security group to only give access for your IP.)

* Deploy the Cloudformation stack **./deploy-cloudformation.sh**

Expand Down
22 changes: 22 additions & 0 deletions awswrangler/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,25 @@ def wait_query(self, query_execution_id):
raise QueryCancelled(
response["QueryExecution"]["Status"].get("StateChangeReason"))
return response

def repair_table(self, database, table, s3_output=None):
"""
Hive's metastore consistency check
"MSCK REPAIR TABLE table;"
Recovers partitions and data associated with partitions.
Use this statement when you add partitions to the catalog.
It is possible it will take some time to add all partitions.
If this operation times out, it will be in an incomplete state
where only a few partitions are added to the catalog.
:param database: Glue database name
:param table: Glue table name
:param s3_output: AWS S3 path
:return: Query execution ID
"""
query = f"MSCK REPAIR TABLE {table};"
query_id = self.run_query(query=query,
database=database,
s3_output=s3_output)
self.wait_query(query_execution_id=query_id)
return query_id
4 changes: 2 additions & 2 deletions awswrangler/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def metadata_to_glue(self,
partition_cols=partition_cols,
preserve_index=preserve_index,
cast_columns=cast_columns)
table = table if table else Glue._parse_table_name(path)
table = table if table else Glue.parse_table_name(path)
table = table.lower().replace(".", "_")
if mode == "overwrite":
self.delete_table_if_exists(database=database, table=table)
Expand Down Expand Up @@ -301,7 +301,7 @@ def _build_schema(dataframe,
return schema_built, partition_cols_schema_built

@staticmethod
def _parse_table_name(path):
def parse_table_name(path):
if path[-1] == "/":
path = path[:-1]
return path.rpartition("/")[2]
Expand Down
8 changes: 4 additions & 4 deletions awswrangler/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,12 +576,15 @@ def to_s3(self,
"""
if compression is not None:
compression = compression.lower()
file_format = file_format.lower()
if file_format not in ["parquet", "csv"]:
raise UnsupportedFileFormat(file_format)
if file_format == "csv":
if compression not in Pandas.VALID_CSV_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid CSV compression. Try: {Pandas.VALID_CSV_COMPRESSIONS}"
)
if file_format == "parquet":
elif file_format == "parquet":
if compression not in Pandas.VALID_PARQUET_COMPRESSIONS:
raise InvalidCompression(
f"{compression} isn't a valid PARQUET compression. Try: {Pandas.VALID_PARQUET_COMPRESSIONS}"
Expand Down Expand Up @@ -641,9 +644,6 @@ def data_to_s3(self,
logger.debug(f"procs_io_bound: {procs_io_bound}")
if path[-1] == "/":
path = path[:-1]
file_format = file_format.lower()
if file_format not in ["parquet", "csv"]:
raise UnsupportedFileFormat(file_format)
objects_paths = []
if procs_cpu_bound > 1:
bounders = _get_bounders(dataframe=dataframe,
Expand Down
69 changes: 68 additions & 1 deletion awswrangler/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pyspark.sql.functions import floor, rand
from pyspark.sql.types import TimestampType

from awswrangler.exceptions import MissingBatchDetected
from awswrangler.exceptions import MissingBatchDetected, UnsupportedFileFormat

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -142,3 +142,70 @@ def write(pandas_dataframe):
)
dataframe.unpersist()
self._session.s3.delete_objects(path=path)

def create_glue_table(self,
database,
path,
dataframe,
file_format,
compression,
table=None,
serde=None,
sep=",",
partition_by=None,
load_partitions=True,
replace_if_exists=True):
"""
Create a Glue metadata table pointing for some dataset stored on AWS S3.
:param dataframe: PySpark Dataframe
:param file_format: File format (E.g. "parquet", "csv")
:param partition_by: Columns used for partitioning
:param path: AWS S3 path
:param compression: Compression (e.g. gzip, snappy, lzo, etc)
:param sep: Separator token for CSV formats (e.g. ",", ";", "|")
:param serde: Serializer/Deserializer (e.g. "OpenCSVSerDe", "LazySimpleSerDe")
:param database: Glue database name
:param table: Glue table name. If not passed, extracted from the path
:param load_partitions: Load partitions after the table creation
:param replace_if_exists: Drop table and recreates that if already exists
:return: None
"""
file_format = file_format.lower()
if file_format not in ["parquet", "csv"]:
raise UnsupportedFileFormat(file_format)
table = table if table else self._session.glue.parse_table_name(path)
table = table.lower().replace(".", "_")
logger.debug(f"table: {table}")
full_schema = dataframe.dtypes
if partition_by is None:
partition_by = []
schema = [x for x in full_schema if x[0] not in partition_by]
partitions_schema_tmp = {
x[0]: x[1]
for x in full_schema if x[0] in partition_by
}
partitions_schema = [(x, partitions_schema_tmp[x])
for x in partition_by]
logger.debug(f"schema: {schema}")
logger.debug(f"partitions_schema: {partitions_schema}")
if replace_if_exists is not None:
self._session.glue.delete_table_if_exists(database=database,
table=table)
extra_args = {}
if file_format == "csv":
extra_args["sep"] = sep
if serde is None:
serde = "OpenCSVSerDe"
extra_args["serde"] = serde
self._session.glue.create_table(
database=database,
table=table,
schema=schema,
partition_cols_schema=partitions_schema,
path=path,
file_format=file_format,
compression=compression,
extra_args=extra_args)
if load_partitions:
self._session.athena.repair_table(database=database, table=table)
4 changes: 2 additions & 2 deletions docs/source/contributing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ Step-by-step

* Fork the AWS Data Wrangler repository and clone that into your development environment

* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source source venv/bin/activate**)
* Go to the project's directory create a Python's virtual environment for the project (**python -m venv venv && source venv/bin/activate**)

* Run **./install-dev.sh**

* Go to the *testing* directory

* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World!)
* Configure the parameters.json file with your AWS environment infos (Make sure that your Redshift will not be open for the World! Configure your security group to only give access for your IP.)

* Deploy the Cloudformation stack **./deploy-cloudformation.sh**

Expand Down
26 changes: 26 additions & 0 deletions docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,24 @@ Loading Pyspark Dataframe to Redshift
mode="append",
)
Register Glue table from Dataframe stored on S3
```````````````````````````````````````````````

.. code-block:: python
dataframe.write \
.mode("overwrite") \
.format("parquet") \
.partitionBy(["year", "month"]) \
.save(compression="gzip", path="s3://...")
session = awswrangler.Session(spark_session=spark)
session.spark.create_glue_table(dataframe=dataframe,
file_format="parquet",
partition_by=["year", "month"],
path="s3://...",
compression="gzip",
database="my_database")
General
-------

Expand All @@ -160,3 +178,11 @@ Get CloudWatch Logs Insights query results
log_group_names=[LOG_GROUP_NAME],
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)
Load partitions on Athena/Glue table (repair table)
```````````````````````````````````````````````````

.. code-block:: python
session = awswrangler.Session()
session.athena.repair_table(database="db_name", table="tbl_name")
10 changes: 6 additions & 4 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ Pandas
* Pandas -> Redshift (Parallel)
* CSV (S3) -> Pandas (One shot or Batching)
* Athena -> Pandas (One shot or Batching)
* CloudWatch Logs Insights -> Pandas (NEW)
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW)
* CloudWatch Logs Insights -> Pandas
* Encrypt Pandas Dataframes on S3 with KMS keys

PySpark
```````
* PySpark -> Redshift (Parallel) (NEW)
* PySpark -> Redshift (Parallel)
* Register Glue table from Dataframe stored on S3 (NEW)

General
```````
Expand All @@ -35,7 +36,8 @@ General
* Delete NOT listed S3 objects (Parallel)
* Copy listed S3 objects (Parallel)
* Get the size of S3 objects (Parallel)
* Get CloudWatch Logs Insights query results (NEW)
* Get CloudWatch Logs Insights query results
* Load partitions on Athena/Glue table (repair table) (NEW)


Table Of Contents
Expand Down
1 change: 1 addition & 0 deletions testing/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ Resources:
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: awswrangler_test
Description: AWS Data Wrangler Test Arena - Glue Database

LogGroup:
Expand Down
Loading

0 comments on commit 74dced2

Please sign in to comment.