Skip to content

Commit

Permalink
Add Teradata Resource (#104)
Browse files Browse the repository at this point in the history
* initial commit

* Fixed pyright and ruff reported errors

* Moved compute cluster test cases to a directory

* Updated test cases

* Addressed review comments

* Removed end_time

* Added few unit tests

* updated quality-check workflow

* Update quality-check-dagster-teradata.yml

* Update quality-check-dagster-teradata.yml

* Update quality-check-dagster-teradata.yml

* Addressed review comments

* Update Makefile

* Update quality-check-dagster-teradata.yml

---------

Co-authored-by: Talla, Mohan <[email protected]>
  • Loading branch information
MT255026 and tallamohan authored Jan 31, 2025
1 parent 03bb73a commit f0980ae
Show file tree
Hide file tree
Showing 26 changed files with 3,671 additions and 0 deletions.
37 changes: 37 additions & 0 deletions .github/workflows/quality-check-dagster-teradata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: quality-check-dagster-teradata
on:
pull_request:
types: [opened, synchronize, reopened]
paths:
- 'libraries/dagster-teradata/**'

jobs:
check:
runs-on: ubuntu-latest
steps:

- name: Checkout
uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v3

- name: Install python
working-directory: ./libraries/dagster-teradata
run: uv python install 3.12

- name: Sync dependencies
working-directory: ./libraries/dagster-teradata
run: uv sync --all-extras

- name: Ruff
working-directory: ./libraries/dagster-teradata
run: uv run ruff check

- name: Pyright
working-directory: ./libraries/dagster-teradata
run: uv run pyright

- name: Pytest
working-directory: ./libraries/dagster-teradata
run: uv run pytest --ignore=dagster_teradata_tests/functional/
14 changes: 14 additions & 0 deletions .github/workflows/release-dagster-teradata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: build-and-release-dagster-teradata

on:
push:
tags:
- 'dagster_teradata-*.*.*'

jobs:
build-and-release-dagster-teradata:
uses: ./.github/workflows/template-release.yml
with:
library_name: dagster-teradata
working_directory: ./libraries/dagster-teradata
secrets: inherit
15 changes: 15 additions & 0 deletions libraries/dagster-teradata/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
install:
uv sync --all-extras

build:
uv build

test:
uv run pytest --ignore=dagster_teradata_tests/functional/

ruff:
uv run ruff check --fix .
uv run ruff format .

check:
uv run pyright
134 changes: 134 additions & 0 deletions libraries/dagster-teradata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# dagster-teradata

A dagster module that provides integration with [Teradata Vantage](https://www.teradata.com/).

## Installation
The `dagster_teradata` module is available as a PyPI package - install with your preferred python
environment manager.

```
source .venv/bin/activate
pip install dagster-teradata
```

## Example Usage

This offers seamless integration with Teradata Vantage, facilitating efficient workflows for data processing, management,
and transformation. This module supports a range of scenarios, such as executing queries, managing tables,
and integrating with cloud storage solutions like AWS S3 and Azure Data Lake Storage (ADLS). Additionally,
it enables compute cluster management for Teradata Vantage Cloud Lake.

```python
import os
import pytest
from dagster import job, op, EnvVar
from dagster_teradata import TeradataResource

td_resource = TeradataResource(
host=EnvVar("TERADATA_HOST"),
user=EnvVar("TERADATA_USER"),
password=EnvVar("TERADATA_PASSWORD"),
database=EnvVar("TERADATA_DATABASE"),
)

def test_execute_query(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_execute_query(context):
result = context.resources.teradata.execute_queries(
["select order_id from orders_24", "select order_id from orders_25"], True
)
context.log.info(result)

@job(resource_defs={"teradata": td_resource})
def example_job():
example_test_execute_query()

example_job.execute_in_process(resources={"teradata": td_resource})
```
```python
import os
import pytest
from dagster import job, op, EnvVar
from dagster_teradata import TeradataResource

td_resource = TeradataResource(
host=EnvVar("TERADATA_HOST"),
user=EnvVar("TERADATA_USER"),
password=EnvVar("TERADATA_PASSWORD"),
database=EnvVar("TERADATA_DATABASE"),
)

def test_drop_table(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_drop_table(context):
result = context.resources.teradata.drop_table(["process_tmp1", "process_tmp2"])
context.log.info(result)

@job(resource_defs={"teradata": td_resource})
def example_job():
example_test_drop_table()

example_job.execute_in_process(resources={"teradata": td_resource})
```

Here is another example of compute cluster management in Teradata VantageCloud Lake:

```python
import os

import pytest
from dagster import job, op, EnvVar
from dagster_teradata import teradata_resource

def test_create_teradata_compute_cluster(tmp_path):
@op(required_resource_keys={"teradata"})
def example_create_teradata_compute_cluster(context):
"""Args for create_teradata_compute_cluster():
compute_profile_name: Name of the Compute Profile to manage.
compute_group_name: Name of compute group to which compute profile belongs.
query_strategy: Query strategy to use. Refers to the approach or method used by the
Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster.
Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD
compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers
to the mapping of compute resources to a specific node or set of nodes within the cluster.
compute_attribute: Optional attributes of compute profile. Example compute attribute
MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE')
compute_attribute (str, optional): Additional attributes for compute profile. Defaults to None.
"""
context.resources.teradata.create_teradata_compute_cluster(
"ShippingCG01",
"Shipping",
"STANDARD",
"TD_COMPUTE_MEDIUM",
"MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(1) INITIALLY_SUSPENDED('FALSE')",
)

@job(resource_defs={"teradata": teradata_resource})
def example_job():
example_create_teradata_compute_cluster()

example_job.execute_in_process(
run_config={
"resources": {
"teradata": {
"config": {
"host": EnvVar("TERADATA_HOST"),
"user": EnvVar("TERADATA_USER"),
"password": EnvVar("TERADATA_PASSWORD"),
"database": EnvVar("TERADATA_DATABASE"),
}
}
}
}
)
```

## Development

The `Makefile` provides the tools required to test and lint your local installation.

```sh
make test
make ruff
make check
```
6 changes: 6 additions & 0 deletions libraries/dagster-teradata/dagster_teradata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from dagster_teradata.resources import (
TeradataDagsterConnection as TeradataDagsterConnection,
TeradataResource as TeradataResource,
fetch_last_updated_timestamps as fetch_last_updated_timestamps,
teradata_resource as teradata_resource,
)
23 changes: 23 additions & 0 deletions libraries/dagster-teradata/dagster_teradata/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Define constants for dagster-teradata."""

CC_OPR_SUCCESS_STATUS_MSG = "Compute Cluster %s %s operation completed successfully."
CC_OPR_FAILURE_STATUS_MSG = "Compute Cluster %s %s operation has failed."
CC_OPR_INITIALIZING_STATUS_MSG = (
"The environment is currently initializing. Please wait."
)
CC_OPR_EMPTY_PROFILE_ERROR_MSG = (
"Please provide a valid name for the compute cluster profile."
)
CC_GRP_PRP_NON_EXISTS_MSG = (
"The specified Compute cluster is not present or The user doesn't have permission to "
"access compute cluster."
)
CC_GRP_PRP_UN_AUTHORIZED_MSG = "The %s operation is not authorized for the user."
CC_GRP_LAKE_SUPPORT_ONLY_MSG = "Compute Groups is supported only on Vantage Cloud Lake."
CC_OPR_TIMEOUT_ERROR = "There is an issue with the %s operation. Kindly consult the administrator for assistance."
CC_GRP_PRP_EXISTS_MSG = "The specified Compute cluster is already exists."
CC_OPR_EMPTY_COPY_PROFILE_ERROR_MSG = (
"Please provide a valid name for the source and target compute profile."
)
CC_OPR_TIME_OUT = 1200
CC_POLL_INTERVAL = 60
Loading

0 comments on commit f0980ae

Please sign in to comment.