Skip to content

Commit

Permalink
Added checks for UC-incompatible task clusters in Apache Airflow DAGs
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx committed Mar 15, 2024
1 parent 2da0ef1 commit 8eb176d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/databricks/labs/pylint/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,35 @@ def _check_tasks(self, value: astroid.NodeNG):
for task in self._infer_value(inferred):
if "new_cluster" not in task:
continue
raise ValueError("new_cluster is missing data_security_mode")
if "data_security_mode" not in task["new_cluster"]:
self.add_message("missing-data-security-mode", node=value, args=(task["task_key"],))

def _check_job_clusters(self, value: astroid.NodeNG):
for inferred in value.infer():
for job_cluster in self._infer_value(inferred):
if "new_cluster" not in job_cluster:
continue
# add message that this job cluster is missing data_security_mode
self.add_message("missing-data-security-mode", node=value, args=(job_cluster["job_cluster_key"],))
if "data_security_mode" not in job_cluster["new_cluster"]:
self.add_message("missing-data-security-mode", node=value, args=(job_cluster["job_cluster_key"],))

def _infer_value(self, value: astroid.NodeNG):
if isinstance(value, (str, int, bool, list, dict, type(None))):
return value
if isinstance(value, astroid.Dict):
return self._infer_dict(value)
if isinstance(value, astroid.List):
return self._infer_list(value)
if isinstance(value, astroid.Const):
return value.value
if isinstance(value, astroid.Tuple):
return tuple(self._infer_value(elem) for elem in value.elts)
if isinstance(value, astroid.DictUnpack):
return {self._infer_value(key): self._infer_value(value) for key, value in value.items}
raise ValueError(f"Unsupported type {type(value)}")

def _infer_dict(self, in_dict: astroid.Dict):
out_dict = {}
for in_key, in_value in in_dict.items:
out_key = self._infer_value(in_key.value)
out_key = self._infer_value(in_key)
out_value = self._infer_value(in_value)
out_dict[out_key] = out_value
return out_dict
Expand Down
28 changes: 28 additions & 0 deletions tests/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,31 @@ def test_missing_data_security_mode_in_job_clusters(lint_with):
"[missing-data-security-mode] banana cluster missing 'data_security_mode' "
"required for Unity Catalog compatibility"
) in messages


def test_missing_data_security_mode_in_task_clusters(lint_with):
messages = (
lint_with(AirflowChecker)
<< """from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
tasks = [
{
"task_key": "banana",
"notebook_task": {
"notebook_path": "/Shared/test",
},
'new_cluster': {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
]
DatabricksCreateJobsOperator( #@
task_id="jobs_create_named",
tasks=tasks
)"""
)
assert (
"[missing-data-security-mode] banana cluster missing 'data_security_mode' "
"required for Unity Catalog compatibility"
) in messages

0 comments on commit 8eb176d

Please sign in to comment.