Skip to content

Commit

Permalink
Add security group parameters for EMR
Browse files Browse the repository at this point in the history
  • Loading branch information
igorborgest committed Oct 20, 2019
1 parent b32e37e commit 98a5e0d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ session.spark.create_glue_table(dataframe=dataframe,

```py3
session = awswrangler.Session(spark_session=spark)
dfs = session.spark.flatten(df=df_nested)
dfs = session.spark.flatten(dataframe=df_nested)
for name, df_flat in dfs:
print(name)
df_flat.show()
Expand Down
24 changes: 23 additions & 1 deletion awswrangler/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ def _build_cluster_args(**pars):
if pars["key_pair_name"] is not None:
args["Instances"]["Ec2KeyName"] = pars["key_pair_name"]

# Security groups
if pars["security_group_master"] is not None:
args["Instances"]["EmrManagedMasterSecurityGroup"] = pars["security_group_master"]
if pars["security_groups_master_additional"] is not None:
args["Instances"]["AdditionalMasterSecurityGroups"] = pars["security_groups_master_additional"]
if pars["security_group_slave"] is not None:
args["Instances"]["EmrManagedSlaveSecurityGroup"] = pars["security_group_slave"]
if pars["security_groups_slave_additional"] is not None:
args["Instances"]["AdditionalSlaveSecurityGroups"] = pars["security_groups_slave_additional"]
if pars["security_group_service_access"] is not None:
args["Instances"]["ServiceAccessSecurityGroup"] = pars["security_group_service_access"]

# Configurations
if pars["python3"] or pars["spark_glue_catalog"] or pars["hive_glue_catalog"] or pars["presto_glue_catalog"]:
args["Configurations"]: List = []
Expand Down Expand Up @@ -265,7 +277,12 @@ def create_cluster(self,
debugging: bool = True,
applications: Optional[List[str]] = None,
visible_to_all_users: bool = True,
key_pair_name: Optional[str] = None):
key_pair_name: Optional[str] = None,
security_group_master: Optional[str] = None,
security_groups_master_additional: Optional[List[str]] = None,
security_group_slave: Optional[str] = None,
security_groups_slave_additional: Optional[List[str]] = None,
security_group_service_access: Optional[str] = None):
"""
Create a EMR cluster with instance fleets configuration
https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html
Expand Down Expand Up @@ -305,6 +322,11 @@ def create_cluster(self,
:param applications: List of applications (e.g ["Hadoop", "Spark", "Ganglia", "Hive"])
:param visible_to_all_users: True or False
:param key_pair_name: Key pair name (string)
:param security_group_master: The identifier of the Amazon EC2 security group for the master node.
:param security_groups_master_additional: A list of additional Amazon EC2 security group IDs for the master node.
:param security_group_slave: The identifier of the Amazon EC2 security group for the core and task nodes.
:param security_groups_slave_additional: A list of additional Amazon EC2 security group IDs for the core and task nodes.
:param security_group_service_access: The identifier of the Amazon EC2 security group for the Amazon EMR service to access clusters in VPC private subnets.
:return: Cluster ID (string)
"""
args = EMR._build_cluster_args(**locals())
Expand Down
15 changes: 9 additions & 6 deletions awswrangler/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,28 +294,31 @@ def _build_name(name: str, expr: str) -> str:
return f"{name}_{suffix}".replace(".", "_")

@staticmethod
def flatten(df: sql.DataFrame, explode_outer: bool = True, explode_pos: bool = True,
def flatten(dataframe: sql.DataFrame, explode_outer: bool = True, explode_pos: bool = True,
name: str = "root") -> Dict[str, sql.DataFrame]:
"""
Convert a complex nested DataFrame in one (or many) flat DataFrames
If a columns is a struct it is flatten directly.
If a columns is an array or map, then child DataFrames are created in different granularities.
:param df: Spark DataFrame
:param dataframe: Spark DataFrame
:param explode_outer: Should we preserve the null values on arrays?
:param explode_pos: Create columns with the index of the ex-array
:param name: The name of the root Dataframe
:return: A dictionary with the names as Keys and the DataFrames as Values
"""
cols_exprs: List[Tuple[str, str, str]] = Spark._flatten_struct_dataframe(df=df,
cols_exprs: List[Tuple[str, str, str]] = Spark._flatten_struct_dataframe(df=dataframe,
explode_outer=explode_outer,
explode_pos=explode_pos)
exprs_arr: List[str] = [x[2] for x in cols_exprs if Spark._is_array_or_map(x[1])]
exprs: List[str] = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1])]
dfs: Dict[str, sql.DataFrame] = {name: df.selectExpr(exprs)}
dfs: Dict[str, sql.DataFrame] = {name: dataframe.selectExpr(exprs)}
exprs = [x[2] for x in cols_exprs if not Spark._is_array_or_map(x[1]) and not x[0].endswith("_pos")]
for expr in exprs_arr:
df_arr = df.selectExpr(exprs + [expr])
df_arr = dataframe.selectExpr(exprs + [expr])
name_new: str = Spark._build_name(name=name, expr=expr)
dfs_new = Spark.flatten(df=df_arr, explode_outer=explode_outer, explode_pos=explode_pos, name=name_new)
dfs_new = Spark.flatten(dataframe=df_arr,
explode_outer=explode_outer,
explode_pos=explode_pos,
name=name_new)
dfs = {**dfs, **dfs_new}
return dfs
2 changes: 1 addition & 1 deletion docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Flatten nested PySpark DataFrame
.. code-block:: python
session = awswrangler.Session(spark_session=spark)
dfs = session.spark.flatten(df=df_nested)
dfs = session.spark.flatten(dataframe=df_nested)
for name, df_flat in dfs:
print(name)
df_flat.show()
Expand Down
8 changes: 4 additions & 4 deletions testing/test_awswrangler/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def test_flatten_simple_struct(session):
])
df = session.spark_session.createDataFrame(data=pdf, schema=schema)
df.printSchema()
dfs = session.spark.flatten(df=df)
dfs = session.spark.flatten(dataframe=df)
assert len(dfs) == 1
dfs["root"].printSchema()
dtypes = str(dfs["root"].dtypes)
Expand Down Expand Up @@ -261,7 +261,7 @@ def test_flatten_complex_struct(session):
])
df = session.spark_session.createDataFrame(data=pdf, schema=schema)
df.printSchema()
dfs = session.spark.flatten(df=df)
dfs = session.spark.flatten(dataframe=df)
assert len(dfs) == 1
dfs["root"].printSchema()
dtypes = str(dfs["root"].dtypes)
Expand Down Expand Up @@ -294,7 +294,7 @@ def test_flatten_simple_map(session):
])
df = session.spark_session.createDataFrame(data=pdf, schema=schema)
df.printSchema()
dfs = session.spark.flatten(df=df)
dfs = session.spark.flatten(dataframe=df)
assert len(dfs) == 2

# root
Expand Down Expand Up @@ -329,7 +329,7 @@ def test_flatten_simple_array(session):
])
df = session.spark_session.createDataFrame(data=pdf, schema=schema)
df.printSchema()
dfs = session.spark.flatten(df=df)
dfs = session.spark.flatten(dataframe=df)
assert len(dfs) == 2

# root
Expand Down

0 comments on commit 98a5e0d

Please sign in to comment.