From 481f612d5ea0f2fcb7818e1e25901587909d81f5 Mon Sep 17 00:00:00 2001 From: Muennighoff Date: Thu, 5 Jan 2023 19:44:43 +0400 Subject: [PATCH 1/5] Integrate dkulib & allow empty input --- .../recipe.py | 35 +- .../recipe.py | 31 +- .../recipe.py | 35 +- .../recipe.py | 34 +- python-lib/amazon_comprehend_api_client.py | 12 +- python-lib/api_parallelizer.py | 197 ----------- python-lib/dku_io_utils.py | 38 --- python-lib/dkulib/__init__.py | 0 python-lib/dkulib/dku_config/README.md | 94 ++++++ python-lib/dkulib/dku_config/__init__.py | 12 + python-lib/dkulib/dku_config/custom_check.py | 261 +++++++++++++++ python-lib/dkulib/dku_config/dku_config.py | 99 ++++++ .../dkulib/dku_config/dku_file_manager.py | 51 +++ python-lib/dkulib/dku_config/dss_parameter.py | 126 +++++++ python-lib/dkulib/dku_config/requirements.txt | 1 + python-lib/dkulib/dku_io_utils/README.md | 44 +++ python-lib/dkulib/dku_io_utils/__init__.py | 11 + .../dkulib/dku_io_utils/chunked_processing.py | 112 +++++++ .../dku_io_utils/column_descriptions.py | 61 ++++ python-lib/dkulib/io_utils/README.md | 24 ++ python-lib/dkulib/io_utils/__init__.py | 16 + python-lib/dkulib/io_utils/plugin_io_utils.py | 144 ++++++++ python-lib/dkulib/parallelizer/__init__.py | 10 + .../dkulib/parallelizer/parallelizer.py | 315 ++++++++++++++++++ .../dkulib/parallelizer/requirements.txt | 2 + tests/python/requirements.txt | 2 - tests/python/unit/.gitkeep | 0 tests/python/unit/test_api_parallelizer.py | 97 ------ 28 files changed, 1461 insertions(+), 403 deletions(-) delete mode 100644 python-lib/api_parallelizer.py delete mode 100644 python-lib/dku_io_utils.py create mode 100644 python-lib/dkulib/__init__.py create mode 100644 python-lib/dkulib/dku_config/README.md create mode 100644 python-lib/dkulib/dku_config/__init__.py create mode 100644 python-lib/dkulib/dku_config/custom_check.py create mode 100644 python-lib/dkulib/dku_config/dku_config.py create mode 100644 python-lib/dkulib/dku_config/dku_file_manager.py create mode 100644 python-lib/dkulib/dku_config/dss_parameter.py create mode 100644 python-lib/dkulib/dku_config/requirements.txt create mode 100644 python-lib/dkulib/dku_io_utils/README.md create mode 100644 python-lib/dkulib/dku_io_utils/__init__.py create mode 100644 python-lib/dkulib/dku_io_utils/chunked_processing.py create mode 100644 python-lib/dkulib/dku_io_utils/column_descriptions.py create mode 100644 python-lib/dkulib/io_utils/README.md create mode 100644 python-lib/dkulib/io_utils/__init__.py create mode 100644 python-lib/dkulib/io_utils/plugin_io_utils.py create mode 100644 python-lib/dkulib/parallelizer/__init__.py create mode 100644 python-lib/dkulib/parallelizer/parallelizer.py create mode 100644 python-lib/dkulib/parallelizer/requirements.txt delete mode 100644 tests/python/requirements.txt create mode 100644 tests/python/unit/.gitkeep delete mode 100644 tests/python/unit/test_api_parallelizer.py diff --git a/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py b/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py index 1425bce..51ee651 100644 --- a/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py @@ -2,18 +2,17 @@ import json from typing import List, Dict, AnyStr, Union -from retry import retry from ratelimit import limits, RateLimitException +from retry import retry import dataiku from dataiku.customrecipe import get_recipe_config, get_input_names_for_role, get_output_names_for_role -from plugin_io_utils import ErrorHandlingEnum, validate_column_input -from dku_io_utils import set_column_description from amazon_comprehend_api_client import API_EXCEPTIONS, batch_api_response_parser, get_client -from api_parallelizer import api_parallelizer from amazon_comprehend_api_formatting import KeyPhraseExtractionAPIFormatter - +from dkulib.dku_io_utils import set_column_descriptions +from dkulib.parallelizer import DataFrameParallelizer +from plugin_io_utils import ErrorHandlingEnum, validate_column_input # ============================================================================== # SETUP @@ -41,9 +40,9 @@ validate_column_input(text_column, input_columns_names) batch_kwargs = { - "api_support_batch": True, + "batch_support": True, "batch_size": batch_size, - "batch_api_response_parser": batch_api_response_parser, + "batch_response_parser": batch_api_response_parser, } if text_language == "language_column": batch_kwargs = {"api_support_batch": False} @@ -87,18 +86,20 @@ def call_api_key_phrase_extraction( responses = client.batch_detect_key_phrases(TextList=text_list, LanguageCode=text_language) return responses +df_parallelizer = DataFrameParallelizer( + function=call_api_key_phrase_extraction, + error_handling=error_handling, + exceptions_to_catch=API_EXCEPTIONS, + parallel_workers=parallel_workers, + output_column_prefix=column_prefix, + **batch_kwargs +) -df = api_parallelizer( - input_df=input_df, - api_call_function=call_api_key_phrase_extraction, - api_exceptions=API_EXCEPTIONS, - column_prefix=column_prefix, +df = df_parallelizer.run( + input_df, text_column=text_column, text_language=text_language, language_column=language_column, - parallel_workers=parallel_workers, - error_handling=error_handling, - **batch_kwargs ) api_formatter = KeyPhraseExtractionAPIFormatter( @@ -107,8 +108,8 @@ def call_api_key_phrase_extraction( output_df = api_formatter.format_df(df) output_dataset.write_with_schema(output_df) -set_column_description( +set_column_descriptions( input_dataset=input_dataset, output_dataset=output_dataset, - column_description_dict=api_formatter.column_description_dict, + column_descriptions=api_formatter.column_description_dict, ) diff --git a/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py b/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py index f709423..d4e0e43 100644 --- a/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py @@ -1,16 +1,16 @@ # -*- coding: utf-8 -*- from typing import List, Dict, AnyStr -from retry import retry from ratelimit import limits, RateLimitException +from retry import retry import dataiku from dataiku.customrecipe import get_recipe_config, get_input_names_for_role, get_output_names_for_role from plugin_io_utils import ErrorHandlingEnum, validate_column_input -from dku_io_utils import set_column_description +from dkulib.dku_io_utils import set_column_descriptions +from dkulib.parallelizer import DataFrameParallelizer from amazon_comprehend_api_client import API_EXCEPTIONS, batch_api_response_parser, get_client -from api_parallelizer import api_parallelizer from amazon_comprehend_api_formatting import LanguageDetectionAPIFormatter @@ -39,9 +39,9 @@ client = get_client(api_configuration_preset) column_prefix = "lang_detect_api" batch_kwargs = { - "api_support_batch": True, + "batch_support": True, "batch_size": batch_size, - "batch_api_response_parser": batch_api_response_parser, + "batch_response_parser": batch_api_response_parser, } @@ -58,25 +58,28 @@ def call_api_language_detection(batch: List[Dict], text_column: AnyStr) -> List[ return responses -df = api_parallelizer( - input_df=input_df, - api_call_function=call_api_language_detection, - api_exceptions=API_EXCEPTIONS, - column_prefix=column_prefix, - text_column=text_column, - parallel_workers=parallel_workers, +df_parallelizer = DataFrameParallelizer( + function=call_api_language_detection, error_handling=error_handling, + exceptions_to_catch=API_EXCEPTIONS, + parallel_workers=parallel_workers, + output_column_prefix=column_prefix, **batch_kwargs ) +df = df_parallelizer.run( + input_df, + text_column=text_column, +) + api_formatter = LanguageDetectionAPIFormatter( input_df=input_df, column_prefix=column_prefix, error_handling=error_handling, ) output_df = api_formatter.format_df(df) output_dataset.write_with_schema(output_df) -set_column_description( +set_column_descriptions( input_dataset=input_dataset, output_dataset=output_dataset, - column_description_dict=api_formatter.column_description_dict, + column_descriptions=api_formatter.column_description_dict, ) diff --git a/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py b/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py index 2ec9c9e..9672111 100644 --- a/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py @@ -2,17 +2,17 @@ import json from typing import List, Dict, AnyStr, Union -from retry import retry from ratelimit import limits, RateLimitException +from retry import retry import dataiku from dataiku.customrecipe import get_recipe_config, get_input_names_for_role, get_output_names_for_role -from plugin_io_utils import ErrorHandlingEnum, validate_column_input -from dku_io_utils import set_column_description from amazon_comprehend_api_client import API_EXCEPTIONS, batch_api_response_parser, get_client -from api_parallelizer import api_parallelizer from amazon_comprehend_api_formatting import EntityTypeEnum, NamedEntityRecognitionAPIFormatter +from dkulib.dku_io_utils import set_column_descriptions +from dkulib.parallelizer import DataFrameParallelizer +from plugin_io_utils import ErrorHandlingEnum, validate_column_input # ============================================================================== @@ -44,9 +44,9 @@ validate_column_input(text_column, input_columns_names) batch_kwargs = { - "api_support_batch": True, + "batch_support": True, "batch_size": batch_size, - "batch_api_response_parser": batch_api_response_parser, + "batch_response_parser": batch_api_response_parser, } if text_language == "language_column": batch_kwargs = {"api_support_batch": False} @@ -91,17 +91,20 @@ def call_api_named_entity_recognition( return responses -df = api_parallelizer( - input_df=input_df, - api_call_function=call_api_named_entity_recognition, - api_exceptions=API_EXCEPTIONS, - column_prefix=column_prefix, +df_parallelizer = DataFrameParallelizer( + function=call_api_named_entity_recognition, + error_handling=error_handling, + exceptions_to_catch=API_EXCEPTIONS, + parallel_workers=parallel_workers, + output_column_prefix=column_prefix, + **batch_kwargs +) + +df = df_parallelizer.run( + input_df, text_column=text_column, text_language=text_language, language_column=language_column, - parallel_workers=parallel_workers, - error_handling=error_handling, - **batch_kwargs ) api_formatter = NamedEntityRecognitionAPIFormatter( @@ -114,8 +117,8 @@ def call_api_named_entity_recognition( output_df = api_formatter.format_df(df) output_dataset.write_with_schema(output_df) -set_column_description( +set_column_descriptions( input_dataset=input_dataset, output_dataset=output_dataset, - column_description_dict=api_formatter.column_description_dict, + column_descriptions=api_formatter.column_description_dict, ) diff --git a/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py b/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py index 5dcee05..1195b0d 100644 --- a/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py @@ -2,18 +2,17 @@ import json from typing import List, Dict, AnyStr, Union -from retry import retry from ratelimit import limits, RateLimitException +from retry import retry import dataiku from dataiku.customrecipe import get_recipe_config, get_input_names_for_role, get_output_names_for_role from plugin_io_utils import ErrorHandlingEnum, validate_column_input -from dku_io_utils import set_column_description from amazon_comprehend_api_client import API_EXCEPTIONS, batch_api_response_parser, get_client -from api_parallelizer import api_parallelizer from amazon_comprehend_api_formatting import SentimentAnalysisAPIFormatter - +from dkulib.dku_io_utils import set_column_descriptions +from dkulib.parallelizer import DataFrameParallelizer # ============================================================================== # SETUP @@ -40,9 +39,9 @@ validate_column_input(text_column, input_columns_names) batch_kwargs = { - "api_support_batch": True, + "batch_support": True, "batch_size": batch_size, - "batch_api_response_parser": batch_api_response_parser, + "batch_response_parser": batch_api_response_parser, } if text_language == "language_column": batch_kwargs = {"api_support_batch": False} @@ -87,17 +86,20 @@ def call_api_sentiment_analysis( return responses -df = api_parallelizer( - input_df=input_df, - api_call_function=call_api_sentiment_analysis, - api_exceptions=API_EXCEPTIONS, - column_prefix=column_prefix, +df_parallelizer = DataFrameParallelizer( + function=call_api_sentiment_analysis, + error_handling=error_handling, + exceptions_to_catch=API_EXCEPTIONS, + parallel_workers=parallel_workers, + output_column_prefix=column_prefix, + **batch_kwargs +) + +df = df_parallelizer.run( + input_df, text_column=text_column, text_language=text_language, language_column=language_column, - parallel_workers=parallel_workers, - error_handling=error_handling, - **batch_kwargs ) api_formatter = SentimentAnalysisAPIFormatter( @@ -106,8 +108,8 @@ def call_api_sentiment_analysis( output_df = api_formatter.format_df(df) output_dataset.write_with_schema(output_df) -set_column_description( +set_column_descriptions( input_dataset=input_dataset, output_dataset=output_dataset, - column_description_dict=api_formatter.column_description_dict, + column_descriptions=api_formatter.column_description_dict, ) diff --git a/python-lib/amazon_comprehend_api_client.py b/python-lib/amazon_comprehend_api_client.py index c065804..25ea49f 100644 --- a/python-lib/amazon_comprehend_api_client.py +++ b/python-lib/amazon_comprehend_api_client.py @@ -32,7 +32,7 @@ def get_client(api_configuration_preset): return client -def batch_api_response_parser(batch: List[Dict], response: Union[Dict, List], api_column_names: NamedTuple) -> Dict: +def batch_api_response_parser(batch: List[Dict], response: Union[Dict, List], output_column_names: NamedTuple) -> Dict: """ Function to parse API results in the batch case. Needed for api_parallelizer.api_call_batch as each batch API needs specific response parsing. @@ -40,18 +40,18 @@ def batch_api_response_parser(batch: List[Dict], response: Union[Dict, List], ap results = response.get("ResultList", []) errors = response.get("ErrorList", []) for i in range(len(batch)): - for k in api_column_names: + for k in output_column_names: batch[i][k] = "" result = [r for r in results if str(r.get("Index", "")) == str(i)] error = [r for r in errors if str(r.get("Index", "")) == str(i)] if len(result) != 0: # result must be json serializable - batch[i][api_column_names.response] = json.dumps(result[0]) + batch[i][output_column_names.response] = json.dumps(result[0]) if len(error) != 0: inner_error = error[0] logging.warning(str(inner_error)) # custom for Azure edge case which is highly nested - batch[i][api_column_names.error_message] = inner_error.get("ErrorMessage", "") - batch[i][api_column_names.error_type] = inner_error.get("ErrorCode", "") - batch[i][api_column_names.error_raw] = str(inner_error) + batch[i][output_column_names.error_message] = inner_error.get("ErrorMessage", "") + batch[i][output_column_names.error_type] = inner_error.get("ErrorCode", "") + batch[i][output_column_names.error_raw] = str(inner_error) return batch diff --git a/python-lib/api_parallelizer.py b/python-lib/api_parallelizer.py deleted file mode 100644 index eab0014..0000000 --- a/python-lib/api_parallelizer.py +++ /dev/null @@ -1,197 +0,0 @@ -# -*- coding: utf-8 -*- -"""Module with functions to parallelize API calls with error handling""" - -import logging -import inspect -import math -from typing import Callable, AnyStr, List, Tuple, NamedTuple, Dict, Union -from concurrent.futures import ThreadPoolExecutor, as_completed - -import pandas as pd -from more_itertools import chunked, flatten -from tqdm.auto import tqdm as tqdm_auto - -from plugin_io_utils import ErrorHandlingEnum, build_unique_column_names - - -# ============================================================================== -# CONSTANT DEFINITION -# ============================================================================== - -DEFAULT_PARALLEL_WORKERS = 4 -DEFAULT_BATCH_SIZE = 10 -DEFAULT_API_SUPPORT_BATCH = False -DEFAULT_VERBOSE = False - - -# ============================================================================== -# CLASS AND FUNCTION DEFINITION -# ============================================================================== - - -def api_call_single_row( - api_call_function: Callable, - api_column_names: NamedTuple, - row: Dict, - api_exceptions: Union[Exception, Tuple[Exception]], - error_handling: ErrorHandlingEnum = ErrorHandlingEnum.LOG, - verbose: bool = DEFAULT_VERBOSE, - **api_call_function_kwargs -) -> Dict: - """ - Wraps a single-row API calling function to: - - ensure it has a 'row' parameter which is a dict - (for batches of rows, use the api_call_batch function below) - - return the row with a new 'response' key containing the function result - - handles errors from the function with two methods: - * (default) do not fail on API-related exceptions, just log it - and return the row with new error keys - * fail if there is an error and raise it - """ - if error_handling == ErrorHandlingEnum.FAIL: - response = api_call_function(row=row, **api_call_function_kwargs) - row[api_column_names.response] = response - else: - for k in api_column_names: - row[k] = "" - try: - response = api_call_function(row=row, **api_call_function_kwargs) - row[api_column_names.response] = response - except api_exceptions as e: - logging.warning(str(e)) - error_type = str(type(e).__qualname__) - module = inspect.getmodule(e) - if module is not None: - error_type = str(module.__name__) + "." + error_type - row[api_column_names.error_message] = str(e) - row[api_column_names.error_type] = error_type - row[api_column_names.error_raw] = str(e.args) - return row - - -def api_call_batch( - api_call_function: Callable, - api_column_names: NamedTuple, - batch: List[Dict], - batch_api_response_parser: Callable, - api_exceptions: Union[Exception, Tuple[Exception]], - error_handling: ErrorHandlingEnum = ErrorHandlingEnum.LOG, - verbose: bool = DEFAULT_VERBOSE, - **api_call_function_kwargs -) -> List[Dict]: - """ - Wraps a batch API calling function to: - - ensure it has a 'batch' parameter which is a list of dict - - return the batch with a new 'response' key in each dict - containing the function result - - handles errors from the function with two methods: - * (default) do not fail on API-related exceptions, just log it - and return the batch with new error keys in each dict (using batch_api_parser) - * fail if there is an error and raise it - """ - if error_handling == ErrorHandlingEnum.FAIL: - response = api_call_function(batch=batch, **api_call_function_kwargs) - batch = batch_api_response_parser(batch=batch, response=response, api_column_names=api_column_names) - errors = [row[api_column_names.error_message] for row in batch if row[api_column_names.error_message] != ""] - if len(errors) != 0: - raise Exception("API returned errors: " + str(errors)) - else: - try: - response = api_call_function(batch=batch, **api_call_function_kwargs) - batch = batch_api_response_parser(batch=batch, response=response, api_column_names=api_column_names) - except api_exceptions as e: - logging.warning(str(e)) - error_type = str(type(e).__qualname__) - module = inspect.getmodule(e) - if module is not None: - error_type = str(module.__name__) + "." + error_type - for row in batch: - row[api_column_names.response] = "" - row[api_column_names.error_message] = str(e) - row[api_column_names.error_type] = error_type - row[api_column_names.error_raw] = str(e.args) - return batch - - -def convert_api_results_to_df( - input_df: pd.DataFrame, - api_results: List[Dict], - api_column_names: NamedTuple, - error_handling: ErrorHandlingEnum = ErrorHandlingEnum.LOG, - verbose: bool = DEFAULT_VERBOSE, -) -> pd.DataFrame: - """ - Helper function to the "api_parallelizer" main function. - Combine API results (list of dict) with input dataframe, - and convert it to a dataframe. - """ - if error_handling == ErrorHandlingEnum.FAIL: - columns_to_exclude = [v for k, v in api_column_names._asdict().items() if "error" in k] - else: - columns_to_exclude = [] - if not verbose: - columns_to_exclude = [api_column_names.error_raw] - output_schema = {**{v: str for v in api_column_names}, **dict(input_df.dtypes)} - output_schema = {k: v for k, v in output_schema.items() if k not in columns_to_exclude} - record_list = [{col: result.get(col) for col in output_schema.keys()} for result in api_results] - api_column_list = [c for c in api_column_names if c not in columns_to_exclude] - output_column_list = list(input_df.columns) + api_column_list - output_df = pd.DataFrame.from_records(record_list).astype(output_schema).reindex(columns=output_column_list) - assert len(output_df.index) == len(input_df.index) - return output_df - - -def api_parallelizer( - input_df: pd.DataFrame, - api_call_function: Callable, - api_exceptions: Union[Exception, Tuple[Exception]], - column_prefix: AnyStr, - parallel_workers: int = DEFAULT_PARALLEL_WORKERS, - api_support_batch: bool = DEFAULT_API_SUPPORT_BATCH, - batch_size: int = DEFAULT_BATCH_SIZE, - error_handling: ErrorHandlingEnum = ErrorHandlingEnum.LOG, - verbose: bool = DEFAULT_VERBOSE, - **api_call_function_kwargs -) -> pd.DataFrame: - """ - Apply an API call function in parallel to a pandas.DataFrame. - The DataFrame is passed to the function as row dictionaries. - Parallelism works by: - - (default) sending multiple concurrent threads - - if the API supports it, sending batches of row - """ - df_iterator = (i[1].to_dict() for i in input_df.iterrows()) - len_iterator = len(input_df.index) - log_msg = "Calling remote API endpoint with {} rows...".format(len_iterator) - if api_support_batch: - log_msg += ", chunked by {}".format(batch_size) - df_iterator = chunked(df_iterator, batch_size) - len_iterator = math.ceil(len_iterator / batch_size) - logging.info(log_msg) - api_column_names = build_unique_column_names(input_df.columns, column_prefix) - pool_kwargs = api_call_function_kwargs.copy() - more_kwargs = [ - "api_call_function", - "error_handling", - "api_exceptions", - "api_column_names", - ] - for k in more_kwargs: - pool_kwargs[k] = locals()[k] - for k in ["fn", "row", "batch"]: # Reserved pool keyword arguments - pool_kwargs.pop(k, None) - api_results = [] - with ThreadPoolExecutor(max_workers=parallel_workers) as pool: - if api_support_batch: - futures = [pool.submit(api_call_batch, batch=batch, **pool_kwargs) for batch in df_iterator] - else: - futures = [pool.submit(api_call_single_row, row=row, **pool_kwargs) for row in df_iterator] - for f in tqdm_auto(as_completed(futures), total=len_iterator): - api_results.append(f.result()) - if api_support_batch: - api_results = flatten(api_results) - output_df = convert_api_results_to_df(input_df, api_results, api_column_names, error_handling, verbose) - num_api_error = sum(output_df[api_column_names.response] == "") - num_api_success = len(input_df.index) - num_api_error - logging.info("Remote API call results: {} rows succeeded, {} rows failed.".format(num_api_success, num_api_error)) - return output_df diff --git a/python-lib/dku_io_utils.py b/python-lib/dku_io_utils.py deleted file mode 100644 index 6b99e42..0000000 --- a/python-lib/dku_io_utils.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -"""Module with read/write utility functions based on the Dataiku API""" - -from typing import Dict - -import dataiku - - -# ============================================================================== -# CLASS AND FUNCTION DEFINITION -# ============================================================================== - - -def set_column_description( - output_dataset: dataiku.Dataset, column_description_dict: Dict, input_dataset: dataiku.Dataset = None, -) -> None: - """ - Set column descriptions of the output dataset based on a dictionary of column descriptions - and retains the column descriptions from the input dataset (optional) if the column name matches. - """ - if input_dataset is None: - input_dataset_schema = [] - else: - input_dataset_schema = input_dataset.read_schema() - output_dataset_schema = output_dataset.read_schema() - input_columns_names = [col["name"] for col in input_dataset_schema] - for output_col_info in output_dataset_schema: - output_col_name = output_col_info.get("name", "") - output_col_info["comment"] = column_description_dict.get(output_col_name) - if output_col_name in input_columns_names: - matched_comment = [ - input_col_info.get("comment", "") - for input_col_info in input_dataset_schema - if input_col_info.get("name") == output_col_name - ] - if len(matched_comment) != 0: - output_col_info["comment"] = matched_comment[0] - output_dataset.write_schema(output_dataset_schema) diff --git a/python-lib/dkulib/__init__.py b/python-lib/dkulib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-lib/dkulib/dku_config/README.md b/python-lib/dkulib/dku_config/README.md new file mode 100644 index 0000000..aa145f0 --- /dev/null +++ b/python-lib/dkulib/dku_config/README.md @@ -0,0 +1,94 @@ +# DKU Config + +## Description + +This lib can be used to validate parameters in a custom form. We know that front validation is never enough +and does not work in some cases in DSS (Webapps for example). Thus, you can create an object DkuConfig that +behavies as a dict, and add parameters with checks. + +The DkuConfig object also supports local vars. If you want to use a Dku App, the only way to get user settings is +to use local vars. A good practice is to prefix these vars by the name of your plugin like that : +`MY_AWESOME_PLUGIN__param1`. That way, there won't be any conflicts if another plugin uses local vars. + +## Examples + +Let's say you have built a custom form containing the following fields : + +- first_name +- gender +- age +- phone_number + +You want to validate in backend that the parameters have been filled properly and if not, display an understandable +message. You can then build a DkuConfig object. + +```python +from dkulib.core.dku_config.dku_config import DkuConfig +import dataiku +from dataiku.customrecipe import get_recipe_config + +config = get_recipe_config() +dku_config = DkuConfig( + local_vars=dataiku.Project().get_variables()['local'], + local_prefix="MY_AWESOME_PLUGIN__" +) + +dku_config.add_param( + name="first_name", + value=config.get("first_name"), + required=True +) + +dku_config.add_param( + name="gender", + value=config.get("gender"), + checks=[{ + "type": "in", + "op": ['M', 'F'] + }], + required=True +) + +dku_config.add_param( + name="age", + value=config.get("age"), + checks=[{ + "type": "between", + "op": (18, 100), + "err_msg": "You must be over 18 to use the plugin (You specified {value})" + }], + required=True +) + +dku_config.add_param( + name="phone_number", + value=config.get("phone_number"), + checks=[{ + "type": "match", + "op": '^(?:(?:\+|00)33[\s.-]{0,3}(?:\(0\)[\s.-]{0,3})?|0)[1-9](?:(?:[\s.-]?\d{2}){4}|\d{2}(?:[\s.-]?\d{3}){2})$' + }], + required=True +) + +# ... + +assert dku_config.age < 100 +assert dku_config["age"] < 100 + +``` + +## Projects using the library + +Don't hesitate to check these plugins using the library for more examples : + +- [dss-plugin-deeplearning-image](https://github.com/dataiku/dss-plugin-deeplearning-image) +- [dss-plugin-ml-assisted-labeling](https://github.com/dataiku/dss-plugin-ml-assisted-labeling) + +## Version + +- Version: 0.1.6 +- State: Supported + +## Credit + +Library created and maintained by Henri Chabert. diff --git a/python-lib/dkulib/dku_config/__init__.py b/python-lib/dkulib/dku_config/__init__.py new file mode 100644 index 0000000..3afe297 --- /dev/null +++ b/python-lib/dkulib/dku_config/__init__.py @@ -0,0 +1,12 @@ +######################################################## +# ------------- dku_config: 0.1.6 ---------------- + +# For more information, see https://github.com/dataiku/dss-plugin-dkulib/tree/main/core/dku_config +# Library version: 0.1.6 +# Last update: 2021-07-12 +# Author: Dataiku (Henri Chabert) +######################################################### + +from .custom_check import CustomCheck +from .dku_config import DkuConfig +from .dss_parameter import DSSParameter diff --git a/python-lib/dkulib/dku_config/custom_check.py b/python-lib/dkulib/dku_config/custom_check.py new file mode 100644 index 0000000..63b63d3 --- /dev/null +++ b/python-lib/dkulib/dku_config/custom_check.py @@ -0,0 +1,261 @@ +import logging +from typing import Any +import re + +logger = logging.getLogger(__name__) + +DEFAULT_ERROR_MESSAGES = { + 'exists': 'This field is required.', + 'in': 'Should be in {op}. (Currently {value})', + 'not_in': 'Should not be in {op}. (Currently {value})', + 'eq': 'Should be equal to {op} (Currently {value}).', + 'sup': 'Should be greater than {op} (Currently {value}).', + 'sup_eq': 'Should be greater than or equal to {op} (Currently {value}).', + 'inf': 'Should be less than {op} (Currently {value}).', + 'inf_eq': 'Should be less than or equal to {op} (Currently {value}).', + 'between': 'Should be between {op[0]} and {op[1]} inclusive (Currently {value}).', + 'between_strict': 'Should be between {op[0]} and {op[1]} exclusive (Currently {value}).', + 'is_type': 'Should be of type (Currently {value_type}).', + 'is_castable': 'Should be castable to type {op}> (Currently {value} with type {value_type}.', + 'custom': "There has been an unknown error.", + 'match': "Should match the following pattern: {op}.", + 'is_subset': 'Should be a subset of {op}. (Currently {value})', +} + + +class CustomCheckError(Exception): + """Exception raised when condition of CustomCheck are not met. + """ + pass + + +class CustomCheck: + """Class related to a check. Use run() to verify whether the check fails or pass + + Attributes: + type (str): Type of the CustomCheck. Must have a related method having "_" before type name + op (Any, optional): Operator to compare the value to. Unnecessary for som checks + err_msg (str, optional): Custom message to display if check fails. Default is a generic message + """ + def __init__(self, type, + op: Any = None, + err_msg: str = ''): + """Initialization method for the CustomCheck class + + Args: + type (str): Type of the CustomCheck. Must have a related method having "_" before type name + op (Any, optional): Operator to compare the value to. Unnecessary for som checks + err_msg (str, optional): Custom message to display if check fails. Default is a generic message + """ + self.type = type + func_name = '_{}'.format(self.type) + if not hasattr(self, func_name): + raise CustomCheckError('Check of type {} does not exist.'.format(self.type)) + self.op = op + self.err_msg = err_msg or self.get_default_err_msg() + + def run(self, value: Any = None): + """Runs the check on a value + + Args: + value(Any, optional): The value to run the check on. Default is None + """ + func_name = '_{}'.format(self.type) + result = getattr(self, func_name)(value) + self.handle_return(result, value) + + def handle_return(self, result: bool, value: Any): + """Checks whether the check has failed or pass + + Args: + result(bool): True if check has passed else False + value(Any): The value on which the test has been ran + + Raises: + CustomCheckError if check fails + """ + try: + assert result + except AssertionError: + raise CustomCheckError(self.format_err_msg(value)) + + def get_default_err_msg(self) -> str: + """Returns the default message related to check's type + + Returns: + str: Unformatted default error message + """ + return DEFAULT_ERROR_MESSAGES.get(self.type, 'custom') + + def format_err_msg(self, value: Any) -> str: + """Format the error message with the value that has failed the test + + Args: + value(Any): Failure value + + Returns: + str: Error messages formatted + """ + formatted_err_msg = self.err_msg.format(value=value, op=self.op, value_type=type(value)) + return f'{formatted_err_msg}' + + def _exists(self, value: Any) -> bool: + """Checks whether the value is None + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + EMPTY_ENTRIES = [[], "", None] + return value not in EMPTY_ENTRIES + + def _in(self, value: Any) -> bool: + """Checks whether the value is in the iterable given in "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value in self.op + + def _not_in(self, value: Any) -> bool: + """Checks whether the value is not in the iterable given in "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value not in self.op + + def _eq(self, value: Any) -> bool: + """Checks whether the value is equal to "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value == self.op + + def _sup(self, value: Any) -> bool: + """Checks whether the value is superior to "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value > float(self.op) + + def _inf(self, value: Any) -> bool: + """Checks whether the value is inferior to "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value < float(self.op) + + def _sup_eq(self, value: Any) -> bool: + """Checks whether the value is superior or equal to "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value >= float(self.op) + + def _inf_eq(self, value: Any) -> bool: + """Checks whether the value is inferior or equal to "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return value <= float(self.op) + + def _between(self, value: Any) -> bool: + """Checks whether the value is between the first and second member of "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return float(self.op[0]) <= value <= float(self.op[1]) + + def _between_strict(self, value: Any) -> bool: + """Checks whether the value is strictly between the first and second member of "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return float(self.op[0]) < value < float(self.op[1]) + + def _is_type(self, value: Any) -> bool: + """Checks whether the value has the type given in "op" attribute + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + return isinstance(value, self.op) + + def _is_castable(self, value: Any) -> bool: + """Checks whether the value can be cast to the op type + + Args: + value(Any): Value to test + + Returns: + bool: Whether the check has succeed + """ + try: + _ = self.op(value) + return True + except (TypeError, ValueError): + return False + + def _custom(self, *args) -> bool: + """Checks whether "op" attribute is true or false + + Returns: + bool: Whether the check has succeed + """ + return self.op + + def _match(self, value: Any) -> bool: + """Checks whether "value" matches the regex provided in "op" attribute + + Returns: + bool: Whether the check has succeed + """ + return not not re.match(self.op, value) + + def _is_subset(self, value: Any) -> bool: + """Checks whether "value" is a subset of "op" + + Returns: + bool: Whether the check has succeed + """ + return set(value).issubset(set(self.op)) diff --git a/python-lib/dkulib/dku_config/dku_config.py b/python-lib/dkulib/dku_config/dku_config.py new file mode 100644 index 0000000..29c3504 --- /dev/null +++ b/python-lib/dkulib/dku_config/dku_config.py @@ -0,0 +1,99 @@ +from .dss_parameter import DSSParameter +from collections.abc import MutableMapping +from typing import Any, AnyStr + + +class DkuConfig(MutableMapping): + """Mapping structure containing DSSParameter objects. It behaves as a dict with the following differences: + - You can access elements with a dot structure (Example: dku_config.param1 or dku_config["param1"]) + - You can set an element with a dot structure (Example: dku_config.param1 = 123) + - All objects stored are converted in DSSParameter + - Accessing an element returns the value of the object DSSParameter + + Attributes: + config(dict): Dict storing the DSSParameters + """ + def __init__(self, local_vars: dict = None, local_prefix: AnyStr = '', **kwargs): + """Initialization method for the DkuConfig class + + Args: + local_vars(dict, optional): Dict containing vars fetched from project local variables. Default is {} + local_prefix(str, optional): If project vars prefixed, write the prefix here, it will be added when + searching for the var + **kwargs: DSSParameters. Each key will be set as the parameter name and the values must be of type + dict. These dicts must contain at least an attribute "value". For other attributes, see + DSSParameter help. + """ + object.__setattr__(self, 'config', {}) + object.__setattr__(self, 'local_vars', local_vars or {}) + object.__setattr__(self, 'local_prefix', local_prefix) + if kwargs: + for k, v in kwargs.items(): + if 'value' not in v: + raise ValueError('Each init kwargs must have a "value" field.') + val = v.pop('value') + self.add_param(name=k, value=val, **v) + + def add_param(self, name: AnyStr, value: Any = None, **kwargs): + """Add a new DSSParameter to the config + + Args: + name(str): The name of the parameter + value(Any, optional): The value of the parameter. If empty, the parameter must be in local vars + **kwargs: Other arguments. See DSSParameter help. + """ + if self.local_vars: + value = value or self._get_local_var(name) + self.config[name] = DSSParameter(name=name, value=value, **kwargs) + + def get_param(self, name: AnyStr) -> DSSParameter: + """Returns the DSSParameter of given name + + Args: + name(str): Name of object to return + + Returns: + DSSParameter: Parameter of given name + """ + return self.config.get(name) + + def _get_local_var(self, var_name: AnyStr) -> Any: + """Returns the value of the local variable related to var_name. + + Args: + var_name(str): The variable to fetch from local_vars. It will be prefixed by the attribute "local_prefix" + + Returns: + Any: The value matching the given name + """ + return self.local_vars.get('{}{}'.format(self.local_prefix, var_name), None) + + def __delitem__(self, item): + del self.config[item] + + def __getattr__(self, name): + return self[name] + + def __setattr__(self, key, value): + self[key] = value + + def __getitem__(self, item): + if item in self.config: + return self.config.get(item).value + else: + raise KeyError(item) + + def __setitem__(self, key, value): + self.add_param(name=key, value=value) + + def __iter__(self): + return iter(self.config) + + def __len__(self): + return len(self.config) + + def __repr__(self): + return self.config.__repr__() + + def __str__(self): + return self.config.__str__() diff --git a/python-lib/dkulib/dku_config/dku_file_manager.py b/python-lib/dkulib/dku_config/dku_file_manager.py new file mode 100644 index 0000000..a15e936 --- /dev/null +++ b/python-lib/dkulib/dku_config/dku_file_manager.py @@ -0,0 +1,51 @@ +from dataiku.customrecipe import get_input_names_for_role, get_output_names_for_role +import dataiku +from .dku_config import DkuConfig + + +class DkuFileManager(DkuConfig): + """ + Use this class to create an object that contains the different input and output datasets/folders of a custom recipe + + Usage example: + + .. code-block:: python + + file_manager = DkuFileManager() + # add a required and an optional input dataset + file_manager.add_input_dataset("input_dataset") + file_manager.add_input_dataset("optional_input_dataset", required=False) + # add a required output dataset and an optional output folder + file_manager.add_output_dataset("output_dataset") + file_manager.add_output_folder("optional_output_folder", required=False) + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def add_file(self, side, type_, role, **kwargs): + file = DkuFileManager._retrieve_file_from_dss(side, type_, role) + self.add_param(name=role, value=file, **kwargs) + + def add_input_folder(self, role, required=True): + self.add_file("input", "folder", role, required=required) + + def add_output_folder(self, role, required=True): + self.add_file("output", "folder", role, required=required) + + def add_input_dataset(self, role, required=True): + self.add_file("input", "dataset", role, required=required) + + def add_output_dataset(self, role, required=True): + self.add_file("output", "dataset", role, required=required) + + @staticmethod + def _retrieve_file_from_dss(side, type_, role): + dku_func = get_input_names_for_role if side == "input" else get_output_names_for_role + dku_type = dataiku.Folder if type_ == "folder" else dataiku.Dataset + roles = dku_func(role) + return dku_type(roles[0]) if roles else None + + @staticmethod + def write_to_folder(folder, file_path, content): + with folder.get_writer(file_path) as w: + w.write(content.encode()) diff --git a/python-lib/dkulib/dku_config/dss_parameter.py b/python-lib/dkulib/dku_config/dss_parameter.py new file mode 100644 index 0000000..52a3dae --- /dev/null +++ b/python-lib/dkulib/dku_config/dss_parameter.py @@ -0,0 +1,126 @@ +from .custom_check import CustomCheck, CustomCheckError +from typing import Any, List + +import logging +logger = logging.getLogger(__name__) + + +class DSSParameterError(Exception): + """Exception raised when at least one CustomCheck fails. + """ + pass + + +class DSSParameter: + """Object related to one parameter. It is mainly used for checks to run in backend for custom forms. + + Attributes: + name(str): Name of the parameter + value(Any): Value of the parameter + label(str, optional): The name displayed to the end user + checks(list[dict], optional): Checks to run on provided value + required(bool, optional): Whether the value can be None + cast_to(type, optional): The type to cast the variable in + cast_to(Any, optional): The default value of the variable (If value is None) + """ + + def __init__(self, name: str, value: Any, label: str = None, checks: List[dict] = None, required: bool = False, cast_to: type = None, default: Any = None): + """Initialization method for the DSSParameter class + + Args: + name(str): Name of the parameter + value(Any): Value of the parameter + label(str, optional): The name displayed to the end user + checks(list[dict], optional): Checks to run on provided value + required(bool, optional): Whether the value can be None + cast_to(type, optional): The type to cast the variable in + default(Any, optional): The default value of the variable (If value is None) + """ + if checks is None: + checks = [] + self.name = name + self.value = value if value is not None else default + self.label = label or name + self.required = required + self.cast_to = cast_to + self.checks = [CustomCheck(**check) for check in checks] + + value_exists = self.run_checks([CustomCheck(type='exists')], raise_error=self.required) + if value_exists: + if self.cast_to: + self.cast_value() + self.run_checks(self.checks) + + def cast_value(self): + """Cast the value if there is as cast_to attribute else return the value as it is + """ + if self.cast_to: + self.run_checks([CustomCheck(type='is_castable', op=self.cast_to)]) + self.value = self.cast_to(self.value) + + def run_checks(self, checks, raise_error=True): + """Runs all checks provided for this parameter + + Args: + checks(list[Check]): Checks to run + raise_error(bool, optional): Whether to rise an error if a check fails + + Returns: + bool: Whether all checks have passed + + Raises: + DSSParameterError: Raises if at least on check fails and raise_error is True + """ + for check in checks: + try: + check.run(self.value) + except CustomCheckError as err: + if raise_error: + self.handle_failure(err) + return False + self.handle_success() + return True + + def handle_failure(self, error: CustomCheckError): + """Is called when at least one test fails. It will raise an Exception with understandable text + + Args: + error(CustomCheckError): Errors met when running checks + + Raises: + DSSParameterError: Raises if at least on check fails + """ + raise DSSParameterError(self.format_failure_message(error)) + + def format_failure_message(self, error: CustomCheckError) -> str: + """Format failure text + + Args: + error (CustomCheckError): Error met when running check + + Returns: + str: Formatted error message + """ + return """ + Validation error with parameter \"{name}\": + {error} + """.format( + name=self.label, + error=error + ) + + def handle_success(self): + """Called if all checks are successful. Prints a success message + """ + self.print_success_message() + + def print_success_message(self): + """Formats the success message + """ + logger.debug('All checks passed successfully for {}.'.format(self.name)) + + def __repr__(self): + return "DSSParameter(name={}, value={})".format(self.name, self.value) + + def __str__(self): + return "DSSParameter(name={}, value={})".format(self.name, self.value) diff --git a/python-lib/dkulib/dku_config/requirements.txt b/python-lib/dkulib/dku_config/requirements.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/python-lib/dkulib/dku_config/requirements.txt @@ -0,0 +1 @@ + diff --git a/python-lib/dkulib/dku_io_utils/README.md b/python-lib/dkulib/dku_io_utils/README.md new file mode 100644 index 0000000..6c35d50 --- /dev/null +++ b/python-lib/dkulib/dku_io_utils/README.md @@ -0,0 +1,44 @@ +# DKU IO Utils + +## Description + +The lib contains utility functions to read/write from and to Dataiku objects. + +## Examples + +Here is an example of usage: + +``` +import dataiku + +from dkulib.core.dku_io_utils.chunked_processing import process_dataset_chunks +from dkulib.core.dku_io_utils.column_descriptions import set_column_descriptions + +process_dataset_chunks( + input_dataset=dataiku.Dataset("input"), + output_dataset=dataiku.Dataset("output"), + func=lambda df, param: df, + param=42 +) + +set_column_descriptions( + input_dataset=dataiku.Dataset("input"), + output_dataset=dataiku.Dataset("output"), + column_descriptions={"your_column": "Your description"}, +) +``` + +## Projects using the library + +Don't hesitate to check these plugins using the library for more examples: +- [dss-plugin-nlp-preparation](https://github.com/dataiku/dss-plugin-nlp-preparation/blob/main/custom-recipes/nlp-preparation-cleaning/recipe.py) +- [dss-plugin-similarity-search](https://github.com/dataiku/dss-plugin-similarity-search/blob/main/custom-recipes/similarity-search-query/recipe.py) + +## Version + +- Version: 0.1.0 +- State: Supported + +## Credit + +Library created and maintained by Alex Combessie. diff --git a/python-lib/dkulib/dku_io_utils/__init__.py b/python-lib/dkulib/dku_io_utils/__init__.py new file mode 100644 index 0000000..65acf40 --- /dev/null +++ b/python-lib/dkulib/dku_io_utils/__init__.py @@ -0,0 +1,11 @@ +######################################################## +# ------------- dku_io_utils: 0.1.0 ---------------- + +# For more information, see https://github.com/dataiku/dss-plugin-dkulib/tree/main/core/dku_io_utils +# Library version: 0.1.0 +# Last update: 2021-07 +# Author: Dataiku (Alex Combessie) +######################################################### + +from .chunked_processing import count_records, process_dataset_chunks # noqa +from .column_descriptions import set_column_descriptions # noqa diff --git a/python-lib/dkulib/dku_io_utils/chunked_processing.py b/python-lib/dkulib/dku_io_utils/chunked_processing.py new file mode 100644 index 0000000..5f21818 --- /dev/null +++ b/python-lib/dkulib/dku_io_utils/chunked_processing.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +"""Module with read/write utility for chunked processing based on the Dataiku API""" + +import logging +import math +from time import perf_counter +from typing import Callable + +from tqdm.auto import tqdm as tqdm_auto +import dataiku + + +def count_records(dataset: dataiku.Dataset) -> int: + """Counts the number of records of a dataset using the Dataiku dataset metrics API + + Args: + dataset: dataiku.Dataset instance + + Returns: + Number of records + + """ + metric_id = "records:COUNT_RECORDS" + partitions = dataset.read_partitions + client = dataiku.api_client() + project = client.get_project(dataset.project_key) + record_count = 0 + logging.info(f"Counting records of dataset: {dataset.name}...") + if partitions is None or len(partitions) == 0: + project.get_dataset(dataset.short_name).compute_metrics(metric_ids=[metric_id]) + metric = dataset.get_last_metric_values() + record_count = dataiku.ComputedMetrics.get_value_from_data( + metric.get_global_data(metric_id=metric_id) + ) + logging.info( + f"Dataset {dataset.name} contains {record_count:d} records and is not partitioned" + ) + else: + for partition in partitions: + project.get_dataset(dataset.short_name).compute_metrics( + partition=partition, metric_ids=[metric_id] + ) + metric = dataset.get_last_metric_values() + record_count += dataiku.ComputedMetrics.get_value_from_data( + metric.get_partition_data(partition=partition, metric_id=metric_id) + ) + logging.info( + f"Dataset {dataset.name} contains {record_count:d} records in partition(s) {partitions}" + ) + return record_count + + +def process_dataset_chunks( + input_dataset: dataiku.Dataset, + output_dataset: dataiku.Dataset, + func: Callable, + chunksize: float = 1000, + **kwargs, +) -> None: + """Reads a dataset by chunks, process each dataframe chunk with a function and write back to another dataset. + + Pass keyword arguments to the function, adds a tqdm progress bar and generic logging. + Directly write chunks to the output_dataset, so that only one chunk needs to be processed in-memory at a time. + + Args: + input_dataset: Input dataiku.Dataset instance + output_dataset: Output dataiku.Dataset instance + func: The function to apply to the `input_dataset` by chunks of pandas.DataFrame + This function must take a pandas.DataFrame as first input argument, + and output another pandas.DataFrame + chunksize: Number of rows of each chunk of pandas.DataFrame fed to `func` + **kwargs: Optional keyword arguments fed to `func` + + Raises: + ValueError: If the input dataset is empty or if pandas cannot read it without type inference + + """ + input_count_records = count_records(input_dataset) + if input_count_records == 0: + raise ValueError("Input dataset has no records") + logging.info( + f"Processing dataset {input_dataset.name} of {input_count_records} rows " + + f"by chunks of {chunksize}..." + ) + start = perf_counter() + # First, initialize output schema if empty. Required to show the real error if `iter_dataframes` fails. + if not output_dataset.read_schema(raise_if_empty=False): + df = input_dataset.get_dataframe(limit=5, infer_with_pandas=False) + output_df = func(df=df, **kwargs) + output_dataset.write_schema_from_dataframe(output_df) + with output_dataset.get_writer() as writer: + df_iterator = input_dataset.iter_dataframes( + chunksize=chunksize, infer_with_pandas=False + ) + len_iterator = math.ceil(input_count_records / chunksize) + for i, df in tqdm_auto( + enumerate(df_iterator), + total=len_iterator, + unit="chunk", + miniters=1, + mininterval=1.0, + ): + output_df = func(df=df, **kwargs) + if i == 0: + output_dataset.write_schema_from_dataframe( + output_df, dropAndCreate=bool(not output_dataset.writePartition) + ) + writer.write_dataframe(output_df) + logging.info( + f"Processing dataset {input_dataset.name} of {input_count_records} rows: " + + f"Done in {perf_counter() - start:.2f} seconds." + ) diff --git a/python-lib/dkulib/dku_io_utils/column_descriptions.py b/python-lib/dkulib/dku_io_utils/column_descriptions.py new file mode 100644 index 0000000..fccd85d --- /dev/null +++ b/python-lib/dkulib/dku_io_utils/column_descriptions.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +"""Module with read/write utility to set dataset column descriptions based on the Dataiku API""" + +from typing import Dict, Optional + +import dataiku + + +def get_description_for_column( + dataset_schema: dataiku.core.dataset.Schema, column_name: str +) -> str: + """Gets the column description from a dataiku dataset schema for a given column name + + The dataiku dataset schema is a list of dictionaries as following: + [{"name": "column1", "type": "string", "comment": "blabla"}, {...}]. + The optional "comment" key corresponds to the column description. + + Args: + dataset_schema: dataiku.Dataset schema instance obtained from the `read_schema` method + column_name: Name of the column whose description you want to retrieve + + Returns: + The column description if it exists + + """ + for column in dataset_schema: + if column["name"] == column_name and "comment" in column: + return column["comment"] + + +def set_column_descriptions( + output_dataset: dataiku.Dataset, + column_descriptions: Dict[str, str], + input_dataset: Optional[dataiku.Dataset] = None, +) -> None: + """Sets column descriptions of the output dataset based on a dictionary of column descriptions + + Can also retain the column descriptions from the input dataset if specified + + Args: + output_dataset: Output dataiku.Dataset instance + column_descriptions: Dictionary holding column descriptions (value) by column name (key) + input_dataset: Optional input dataiku.Dataset instance + in case you want to retain input column descriptions + + """ + output_schema = output_dataset.read_schema() + # First, set all output column descriptions to those of the input dataset if specified + if input_dataset is not None: + input_schema = input_dataset.read_schema() + for output_column in output_schema: + input_column_description = get_description_for_column( + input_schema, output_column["name"] + ) + if input_column_description is not None: + output_column["comment"] = input_column_description + # Then, update the output column descriptions according to the column_descriptions argument + for output_column in output_schema: + if output_column["name"] in column_descriptions: + output_column["comment"] = column_descriptions[output_column["name"]] + output_dataset.write_schema(output_schema) diff --git a/python-lib/dkulib/io_utils/README.md b/python-lib/dkulib/io_utils/README.md new file mode 100644 index 0000000..91033ca --- /dev/null +++ b/python-lib/dkulib/io_utils/README.md @@ -0,0 +1,24 @@ +# IO Utils + +## Description + +The lib provides you tools for input / output + +## Examples + +Here is an example of usage + +## Projects using the library + +Don't hesitate to check these plugins using the library for more examples : + +- [dss-plugin-example](https://github.com/dataiku/dss-plugin-example) + +## Version + +- Version: 0.1.0 +- State: Supported + +## Credit + +Library created and maintained by Alex Combessie. \ No newline at end of file diff --git a/python-lib/dkulib/io_utils/__init__.py b/python-lib/dkulib/io_utils/__init__.py new file mode 100644 index 0000000..f1d720d --- /dev/null +++ b/python-lib/dkulib/io_utils/__init__.py @@ -0,0 +1,16 @@ +######################################################## +# ------------- io_utils: 0.1.0 ---------------- + +# For more information, see https://github.com/dataiku/dss-plugin-dkulib/tree/main/core/io_utils +# Library version: 0.1.0 +# Last update: 2021-01-11 +# Author: Dataiku (Alex Combessie) +######################################################### + +from .plugin_io_utils import clean_empty_list # noqa +from .plugin_io_utils import clean_text_df # noqa +from .plugin_io_utils import generate_unique # noqa +from .plugin_io_utils import move_columns_after # noqa +from .plugin_io_utils import time_logging # noqa +from .plugin_io_utils import truncate_text_list # noqa +from .plugin_io_utils import unique_list # noqa diff --git a/python-lib/dkulib/io_utils/plugin_io_utils.py b/python-lib/dkulib/io_utils/plugin_io_utils.py new file mode 100644 index 0000000..a5a2855 --- /dev/null +++ b/python-lib/dkulib/io_utils/plugin_io_utils.py @@ -0,0 +1,144 @@ +# -*- coding: utf-8 -*- +"""Module with read/write utility functions which are *not* based on the Dataiku API""" + +import re +import logging +import functools +from typing import List, AnyStr, Union, Callable +from time import perf_counter + +import pandas as pd +import numpy as np + + +def clean_empty_list(sequence: List) -> Union[List, AnyStr]: + """If the input sequence is a valid non-empty list, return list, else an empty string + + Args: + sequence: Original list + + Returns: + Original list or empty string + + """ + output = "" + if isinstance(sequence, list): + if len(sequence) != 0: + output = sequence + return output + + +def unique_list(sequence: List) -> List: + """Make a list unique, ordering values by order of appearance in the original list + + Args: + sequence: Original list + + Returns: + List with unique elements ordered by appearance in the original list + + """ + seen = set() + return [x for x in sequence if not (x in seen or seen.add(x))] + + +def truncate_text_list(text_list: List[AnyStr], num_characters: int = 140) -> List[AnyStr]: + """Truncate a list of strings to a given number of characters + + Args: + text_list: List of strings + num_characters: Number of characters to truncate each string to + + Returns: + List with truncated strings + + """ + output_text_list = [] + for text in text_list: + if len(text) > num_characters: + output_text_list.append(text[:num_characters] + " (...)") + else: + output_text_list.append(text) + return output_text_list + + +def clean_text_df(df: pd.DataFrame, dropna_columns: List[AnyStr] = None) -> pd.DataFrame: + """Clean a pandas.DataFrame with text columns to remove empty strings and NaNs values in the dataframe + + Args: + df: Input pandas.DataFrame which should contain only text + dropna_columns: Optional list of column names where empty strings and NaN should be checked + Default is None, which means that all columns will be checked + + Returns: + pandas.DataFrame with rows dropped in case of empty strings or NaN values + + """ + for col in df.columns: + df[col] = df[col].str.strip().replace("", np.NaN) + df = df.dropna(subset=dropna_columns) + return df + + +def generate_unique(name: AnyStr, existing_names: List[AnyStr], prefix: AnyStr = None) -> AnyStr: + """Generate a unique name among existing ones by suffixing a number and adding a prefix + + Args: + name: Input name + existing_names: List of existing names + prefix: Optional prefix to add to the output name + + Returns: + Unique name with a number suffix in case of conflict, and an optional prefix + + """ + name = re.sub(r"[^\x00-\x7F]", "_", name).replace( + " ", "_" + ) # replace non ASCII and whitespace characters by an underscore _ + if prefix: + new_name = f"{prefix}_{name}" + else: + new_name = name + for j in range(1, 1001): + if new_name not in existing_names: + return new_name + new_name = f"{new_name}_{j}" + raise RuntimeError(f"Failed to generated a unique name for '{name}'") + + +def move_columns_after(df: pd.DataFrame, columns_to_move: List[AnyStr], after_column: AnyStr) -> pd.DataFrame: + """Reorder columns by moving a list of columns after another column + + Args: + df: Input pandas.DataFrame + columns_to_move: List of column names to move + after_column: Name of the columns to move columns after + + Returns: + pandas.DataFrame with reordered columns + + """ + after_column_position = df.columns.get_loc(after_column) + 1 + reordered_columns = ( + df.columns[:after_column_position].tolist() + columns_to_move + df.columns[after_column_position:].tolist() + ) + df.reindex(columns=reordered_columns) + return df + + +def time_logging(log_message: AnyStr): + """Decorator to log timing with a custom message""" + + def inner_function(function: Callable): + @functools.wraps(function) + def wrapper(*args, **kwargs): + start = perf_counter() + logging.info(log_message + "...") + value = function(*args, **kwargs) + end = perf_counter() + logging.info(log_message + f": done in {end - start:.2f} seconds") + return value + + return wrapper + + return inner_function diff --git a/python-lib/dkulib/parallelizer/__init__.py b/python-lib/dkulib/parallelizer/__init__.py new file mode 100644 index 0000000..10d871c --- /dev/null +++ b/python-lib/dkulib/parallelizer/__init__.py @@ -0,0 +1,10 @@ +######################################################## +# ------------- parallelizer: 0.1.0 ---------------- + +# For more information, see https://github.com/dataiku/dss-plugin-dkulib/tree/main/core/parallelizer +# Library version: 0.1.0 +# Last update: 2021-07-16 +# Author: Dataiku (Alex Combessie, Niklas Muennighoff) +######################################################### + +from .parallelizer import DataFrameParallelizer diff --git a/python-lib/dkulib/parallelizer/parallelizer.py b/python-lib/dkulib/parallelizer/parallelizer.py new file mode 100644 index 0000000..950e17e --- /dev/null +++ b/python-lib/dkulib/parallelizer/parallelizer.py @@ -0,0 +1,315 @@ +# -*- coding: utf-8 -*- +"""Applies a function to a pandas DataFrame with parallelization, error logging and progress tracking""" + +import logging +import inspect +import math + +from collections import namedtuple +from collections import OrderedDict +from concurrent.futures import as_completed +from concurrent.futures import ThreadPoolExecutor +from copy import deepcopy +from enum import Enum +from time import perf_counter +from typing import Any +from typing import AnyStr +from typing import Callable +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Tuple +from typing import Union + +from more_itertools import chunked +from more_itertools import flatten +import pandas as pd +from tqdm.auto import tqdm as tqdm_auto + +from ..io_utils.plugin_io_utils import generate_unique + + +class ErrorHandling(Enum): + """Enum class to identify how to handle API errors""" + + LOG = "Log" + FAIL = "Fail" + + +class BatchError(ValueError): + """Custom exception raised if the Batch function fails""" + + +def _parse_batch_response_default( + batch: List[Dict], response: List[Any], output_column_names: NamedTuple +) -> List[Dict]: + """Adds responses to each row dictionary in the batch, assuming the batch response is a list of responses + in the same order as the batch, while keeping the existing row dictionary entries in the batch. + + Args: + batch: Single input row from the dataframe as a dict in a list of length 1 + response: List of one or more responses returned by the API, typically a JSON string + output_column_names: Column names to be added to the row, + as defined in _get_unique_output_column_names + + Returns: + batch: Same as input batch with additional columns + corresponding to the default output columns + """ + return [ + { + output_column_names.response: response, + output_column_names.error_message: "", + output_column_names.error_type: "", + output_column_names.error_raw: "", + **row, + } + for response, row in zip(response, batch) + ] + + +class DataFrameParallelizer: + """Applies a function to a pandas DataFrame with parallelization, error logging and progress tracking. + + This class is particularly well-suited for synchronous functions calling an API, either row-by-row or by batch. + + Attributes: + function: Any function taking a dict as input (row-by-row mode) or a list of dict (batch mode), + and returning a response with additional information, typically a JSON string. In batch mode, + the function is expected to return a list of responses for each row if 'DEFAULT_RESPONSE_PARSER' is used. + error_handling: If ErrorHandling.LOG (default), log the error from the function as a warning, + and add additional columns to the dataframe with the error message and error type. + If ErrorHandling.FAIL, the function will fail is there is any error. + We recommend letting the end user choose as there are contexts which justify one option or the other. + exceptions_to_catch: Tuple of Exception classes to catch. + Mandatory if ErrorHandling.LOG (default). + parallel_workers: Number of concurrent threads to parallelize the function. Default is 4. + We recommend letting the end user tune this parameter to get better performance. + batch_support: If True, send batches of row (list of dict) to the `function` + Else (default) send rows as dict to the function. + This parameter should be chosen according to the nature of the function to apply. + batch_size: Number of rows to include in each batch. Default is 10. + Taken into account if `batch_support` is True. + We recommend letting the end user tune this parameter if they need to increase performance. + batch_response_parser: Function used to parse the raw response from the function in batch mode, + and assign the actual responses and errors back to the original batch of row (list of dict). + This is often required for batch APIs which return nested objects with a mix of responses and errors. + This parameter is required if batch_support is True. + output_column_prefix: Column prefix to add to the output columns of the dataframe, + containing the `function` responses and errors. Default is "output". + This should be overriden by the developer: if the function to apply calls an API for text translation, + a good output_column_prefix would be "api_translation". + verbose: If True, log raw details on any error encountered along with the error message and error type. + Else (default) log only the error message and the error type. + We recommend trying without verbose first. Usually, the error message is enough to diagnose the issue. + """ + + # Default number of worker threads to use in parallel - may be tuned by the end user + DEFAULT_PARALLEL_WORKERS = 4 + # By default, we assume the function to apply is row-by-row - should be overriden in the batch case + DEFAULT_BATCH_SUPPORT = False + # Default number of rows in one batch - may be tuned by the end user + DEFAULT_BATCH_SIZE = 10 + # Default response parsing function for batch_size=1 - Simply assigns the response to the response column + DEFAULT_RESPONSE_PARSER = _parse_batch_response_default + # Default prefix to add to output columns - should be overriden for personalized output + DEFAULT_OUTPUT_COLUMN_PREFIX = "output" + # Default dictionary of output column names (key) and their descriptions (value) + OUTPUT_COLUMN_NAME_DESCRIPTIONS = OrderedDict( + [ + ("response", "Raw response in JSON format"), + ("error_message", "Error message"), + ("error_type", "Error type or code"), + ("error_raw", "Raw error"), + ] + ) + # By default, set verbose to False assuming error message and type are enough information in the logs + DEFAULT_VERBOSE = False + + def __init__( + self, + function: Callable[[Union[Dict, List[Dict]]], Union[Dict, List[Dict]]], + error_handling: ErrorHandling = ErrorHandling.LOG, + exceptions_to_catch: Tuple[Exception] = (), + parallel_workers: int = DEFAULT_PARALLEL_WORKERS, + batch_support: bool = DEFAULT_BATCH_SUPPORT, + batch_size: int = DEFAULT_BATCH_SIZE, + batch_response_parser: Callable[ + [List[Dict], Any, NamedTuple], List[Dict] + ] = DEFAULT_RESPONSE_PARSER, + output_column_prefix: AnyStr = DEFAULT_OUTPUT_COLUMN_PREFIX, + verbose: bool = DEFAULT_VERBOSE, + ): + self.function = function + self.error_handling = error_handling + self.exceptions_to_catch = exceptions_to_catch + if error_handling == ErrorHandling.LOG and not exceptions_to_catch: + raise ValueError("Please set at least one exception in exceptions_to_catch") + self.parallel_workers = parallel_workers + self.batch_support = batch_support + if not batch_support: + batch_size = 1 + self.batch_size = batch_size + self.batch_response_parser = batch_response_parser + self.output_column_prefix = output_column_prefix + self.verbose = verbose + self._output_column_names = None # Will be set at runtime by the run method + + def _get_unique_output_column_names( + self, existing_names: List[AnyStr] + ) -> NamedTuple: + """Returns a named tuple with prefixed column names and their descriptions""" + OutputColumnNameTuple = namedtuple( + "OutputColumnNameTuple", self.OUTPUT_COLUMN_NAME_DESCRIPTIONS.keys() + ) + return OutputColumnNameTuple( + *[ + generate_unique( + name=column_name, + existing_names=existing_names, + prefix=self.output_column_prefix, + ) + for column_name in OutputColumnNameTuple._fields + ] + ) + + def _apply_function_with_error_logging( + self, batch: List[Dict] = None, **function_kwargs, + ) -> Union[Dict, List[Dict]]: # sourcery skip: or-if-exp-identity + """Wraps a row-by-row or batch function with error logging + It applies `self.function` and: + - If batch, parse the function response to extract results and errors using `self.batch_response_parser` + Else, in the row-by-row case, the batch only contains one row. + We thus use the `_parse_batch_response_default` function, which simply assigns the function response + to a new key in the dictionary, without parsing errors from the response. + Parsing the function response to extract errors is only required for batch functions, + as most batch APIs return succesful responses (and make users pay for the request) + even if all rows within the batch failed from a functional perspective. + - handles errors from the function with two methods: + * (default) log the error message as a warning and return the row with error keys + * fail if there is an error (if `self.error_handling == ErrorHandling.FAIL`) + """ + output = deepcopy(batch) + for output_column in self._output_column_names: + for output_row in output: + output_row[output_column] = "" + try: + if not self.batch_support: + # In the row-by-row case, there is only one element in the list as batch_size=1 + response = [(self.function(row=batch[0], **function_kwargs))] + else: + response = self.function(batch=batch, **function_kwargs) + output = self.batch_response_parser( + batch=batch, + response=response, + output_column_names=self._output_column_names, + ) + errors = [ + row[self._output_column_names.error_message] + for row in output + if row[self._output_column_names.error_message] + ] + if errors: + raise BatchError(str(errors)) + except self.exceptions_to_catch + (BatchError,) as error: + if self.error_handling == ErrorHandling.FAIL: + raise error + logging.warning( + f"Function {self.function.__name__} failed on: {batch} because of error: {error}" + ) + error_type = str(type(error).__qualname__) + module = inspect.getmodule(error) + if module: + error_type = f"{module.__name__}.{error_type}" + for output_row in output: + output_row[self._output_column_names.error_message] = str(error) + output_row[self._output_column_names.error_type] = error_type + output_row[self._output_column_names.error_raw] = str(error.args) + return output + + def _post_process_results( + self, df: pd.DataFrame, results: List[Dict] + ) -> pd.DataFrame: + """Combines results from the function with the input dataframe""" + results = flatten(results) + output_schema = { + **{column_name: str for column_name in self._output_column_names}, + **dict(df.dtypes), + } + output_df = ( + pd.DataFrame.from_records(results) + .reindex(columns=list(df.columns) + list(self._output_column_names)) + .astype(output_schema) + ) + if not self.verbose: + output_df.drop( + labels=self._output_column_names.error_raw, axis=1, inplace=True + ) + if self.error_handling == ErrorHandling.FAIL: + error_columns = [ + self._output_column_names.error_message, + self._output_column_names.error_type, + self._output_column_names.error_raw, + ] + output_df.drop(labels=error_columns, axis=1, inplace=True, errors="ignore") + num_error = sum(output_df[self._output_column_names.response] == "") + num_success = len(df.index) - num_error + logging.info( + f"Applying function {self.function.__name__} in parallel to {len(df.index)} row(s): " + + f"{num_success} row(s) succeeded, {num_error} failed." + ) + return output_df + + def run(self, df: pd.DataFrame, **function_kwargs,) -> pd.DataFrame: + """Applies a function to a pandas.DataFrame with parallelization, error logging and progress tracking. + + The DataFrame is iterated on and fed to the function as dictionaries, row-by-row or by batches of rows. + This process is accelerated by the use of concurrent threads and is tracked with a progress bar. + Errors are catched if they match the `self.exceptions_to_catch` attribute and automatically logged. + Once the whole DataFrame has been iterated on, results and errors are added as additional columns. + + Args: + df: Input dataframe on which the function will be applied + **function_kwargs: Arbitrary keyword arguments passed to the `function` + + Returns: + Input dataframe with additional columns: + - response from the `function` + - error message if any + - error type if any + + """ + # First, we create a generator expression to yield each row of the input dataframe. + # Each row will be represented as a dictionary like {"column_name_1": "foo", "column_name_2": 42} + df_row_generator = ( + index_series_pair[1].to_dict() for index_series_pair in df.iterrows() + ) + len_generator = math.ceil(len(df.index) / self.batch_size) + logging.info( + f"Applying function {self.function.__name__} in parallel to {len(df.index)} row(s)" + + f" using batch size of {self.batch_size}..." + ) + start = perf_counter() + self._output_column_names = self._get_unique_output_column_names( + existing_names=df.columns + ) + pool_kwargs = function_kwargs.copy() + for kwarg in ["function", "row", "batch"]: # Reserved pool keyword arguments + pool_kwargs.pop(kwarg, None) + (futures, results) = ([], []) + with ThreadPoolExecutor(max_workers=self.parallel_workers) as pool: + for batch in chunked(df_row_generator, self.batch_size): + futures.append( + pool.submit( + fn=self._apply_function_with_error_logging, + batch=batch, + **pool_kwargs, + ) + ) + for future in tqdm_auto( + as_completed(futures), total=len_generator, miniters=1, mininterval=1.0 + ): + results.append(future.result()) + output_df = self._post_process_results(df, results) + logging.info(f"Parallelization done in {(perf_counter() - start):.2f} seconds.") + return output_df diff --git a/python-lib/dkulib/parallelizer/requirements.txt b/python-lib/dkulib/parallelizer/requirements.txt new file mode 100644 index 0000000..e8dcd4c --- /dev/null +++ b/python-lib/dkulib/parallelizer/requirements.txt @@ -0,0 +1,2 @@ +more-itertools==8.8.0 +tqdm==4.61.0 diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt deleted file mode 100644 index b7bb4d2..0000000 --- a/tests/python/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -pandas==1.1.3 -pytest==6.1.1 \ No newline at end of file diff --git a/tests/python/unit/.gitkeep b/tests/python/unit/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/python/unit/test_api_parallelizer.py b/tests/python/unit/test_api_parallelizer.py deleted file mode 100644 index ee95ba2..0000000 --- a/tests/python/unit/test_api_parallelizer.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- -# This is a test file intended to be used with pytest -# pytest automatically runs all the function starting with "test_" -# see https://docs.pytest.org for more information - -import json -from typing import AnyStr, Dict -from enum import Enum - -import pandas as pd -from boto3.exceptions import Boto3Error - -from api_parallelizer import api_parallelizer # noqa - - -# ============================================================================== -# CONSTANT DEFINITION -# ============================================================================== - -API_EXCEPTIONS = (Boto3Error, ValueError) -COLUMN_PREFIX = "test_api" -INPUT_COLUMN = "test_case" - - -class APICaseEnum(Enum): - SUCCESS = { - "test_api_response": '{"result": "Great success"}', - "test_api_error_message": "", - "test_api_error_type": "", - } - INVALID_INPUT = { - "test_api_response": "", - "test_api_error_message": "invalid literal for int() with base 10: 'invalid_integer'", - "test_api_error_type": "ValueError", - } - API_FAILURE = { - "test_api_response": "", - "test_api_error_message": "", - "test_api_error_type": "boto3.exceptions.Boto3Error", - } - - -# ============================================================================== -# CLASS AND FUNCTION DEFINITION -# ============================================================================== - - -def call_mock_api(row: Dict, api_function_param: int = 42) -> AnyStr: - test_case = row.get(INPUT_COLUMN) - response = {} - if test_case == APICaseEnum.SUCCESS: - response = {"result": "Great success"} - elif test_case == APICaseEnum.INVALID_INPUT: - try: - response = {"result": int(api_function_param)} - except ValueError as e: - raise e - elif test_case == APICaseEnum.API_FAILURE: - raise Boto3Error - return json.dumps(response) - - -def test_api_success(): - input_df = pd.DataFrame({INPUT_COLUMN: [APICaseEnum.SUCCESS]}) - df = api_parallelizer( - input_df=input_df, api_call_function=call_mock_api, api_exceptions=API_EXCEPTIONS, column_prefix=COLUMN_PREFIX - ) - output_dictionary = df.iloc[0, :].to_dict() - expected_dictionary = APICaseEnum.SUCCESS.value - for k in expected_dictionary: - assert output_dictionary[k] == expected_dictionary[k] - - -def test_api_failure(): - input_df = pd.DataFrame({INPUT_COLUMN: [APICaseEnum.API_FAILURE]}) - df = api_parallelizer( - input_df=input_df, api_call_function=call_mock_api, api_exceptions=API_EXCEPTIONS, column_prefix=COLUMN_PREFIX - ) - output_dictionary = df.iloc[0, :].to_dict() - expected_dictionary = APICaseEnum.API_FAILURE.value - for k in expected_dictionary: - assert output_dictionary[k] == expected_dictionary[k] - - -def test_invalid_input(): - input_df = pd.DataFrame({INPUT_COLUMN: [APICaseEnum.INVALID_INPUT]}) - df = api_parallelizer( - input_df=input_df, - api_call_function=call_mock_api, - api_exceptions=API_EXCEPTIONS, - column_prefix=COLUMN_PREFIX, - api_function_param="invalid_integer", - ) - output_dictionary = df.iloc[0, :].to_dict() - expected_dictionary = APICaseEnum.INVALID_INPUT.value - for k in expected_dictionary: - assert output_dictionary[k] == expected_dictionary[k] From 82c197f33b1baea902fa7f710313b7f5b77018f9 Mon Sep 17 00:00:00 2001 From: Muennighoff Date: Thu, 5 Jan 2023 19:55:50 +0400 Subject: [PATCH 2/5] Remove superfluous code --- python-lib/dkulib/dku_config/README.md | 94 ------- python-lib/dkulib/dku_config/__init__.py | 12 - python-lib/dkulib/dku_config/custom_check.py | 261 ------------------ python-lib/dkulib/dku_config/dku_config.py | 99 ------- .../dkulib/dku_config/dku_file_manager.py | 51 ---- python-lib/dkulib/dku_config/dss_parameter.py | 126 --------- python-lib/dkulib/dku_config/requirements.txt | 1 - 7 files changed, 644 deletions(-) delete mode 100644 python-lib/dkulib/dku_config/README.md delete mode 100644 python-lib/dkulib/dku_config/__init__.py delete mode 100644 python-lib/dkulib/dku_config/custom_check.py delete mode 100644 python-lib/dkulib/dku_config/dku_config.py delete mode 100644 python-lib/dkulib/dku_config/dku_file_manager.py delete mode 100644 python-lib/dkulib/dku_config/dss_parameter.py delete mode 100644 python-lib/dkulib/dku_config/requirements.txt diff --git a/python-lib/dkulib/dku_config/README.md b/python-lib/dkulib/dku_config/README.md deleted file mode 100644 index aa145f0..0000000 --- a/python-lib/dkulib/dku_config/README.md +++ /dev/null @@ -1,94 +0,0 @@ -# DKU Config - -## Description - -This lib can be used to validate parameters in a custom form. We know that front validation is never enough -and does not work in some cases in DSS (Webapps for example). Thus, you can create an object DkuConfig that -behavies as a dict, and add parameters with checks. - -The DkuConfig object also supports local vars. If you want to use a Dku App, the only way to get user settings is -to use local vars. A good practice is to prefix these vars by the name of your plugin like that : -`MY_AWESOME_PLUGIN__param1`. That way, there won't be any conflicts if another plugin uses local vars. - -## Examples - -Let's say you have built a custom form containing the following fields : - -- first_name -- gender -- age -- phone_number - -You want to validate in backend that the parameters have been filled properly and if not, display an understandable -message. You can then build a DkuConfig object. - -```python -from dkulib.core.dku_config.dku_config import DkuConfig -import dataiku -from dataiku.customrecipe import get_recipe_config - -config = get_recipe_config() -dku_config = DkuConfig( - local_vars=dataiku.Project().get_variables()['local'], - local_prefix="MY_AWESOME_PLUGIN__" -) - -dku_config.add_param( - name="first_name", - value=config.get("first_name"), - required=True -) - -dku_config.add_param( - name="gender", - value=config.get("gender"), - checks=[{ - "type": "in", - "op": ['M', 'F'] - }], - required=True -) - -dku_config.add_param( - name="age", - value=config.get("age"), - checks=[{ - "type": "between", - "op": (18, 100), - "err_msg": "You must be over 18 to use the plugin (You specified {value})" - }], - required=True -) - -dku_config.add_param( - name="phone_number", - value=config.get("phone_number"), - checks=[{ - "type": "match", - "op": '^(?:(?:\+|00)33[\s.-]{0,3}(?:\(0\)[\s.-]{0,3})?|0)[1-9](?:(?:[\s.-]?\d{2}){4}|\d{2}(?:[\s.-]?\d{3}){2})$' - }], - required=True -) - -# ... - -assert dku_config.age < 100 -assert dku_config["age"] < 100 - -``` - -## Projects using the library - -Don't hesitate to check these plugins using the library for more examples : - -- [dss-plugin-deeplearning-image](https://github.com/dataiku/dss-plugin-deeplearning-image) -- [dss-plugin-ml-assisted-labeling](https://github.com/dataiku/dss-plugin-ml-assisted-labeling) - -## Version - -- Version: 0.1.6 -- State: Supported - -## Credit - -Library created and maintained by Henri Chabert. diff --git a/python-lib/dkulib/dku_config/__init__.py b/python-lib/dkulib/dku_config/__init__.py deleted file mode 100644 index 3afe297..0000000 --- a/python-lib/dkulib/dku_config/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -######################################################## -# ------------- dku_config: 0.1.6 ---------------- - -# For more information, see https://github.com/dataiku/dss-plugin-dkulib/tree/main/core/dku_config -# Library version: 0.1.6 -# Last update: 2021-07-12 -# Author: Dataiku (Henri Chabert) -######################################################### - -from .custom_check import CustomCheck -from .dku_config import DkuConfig -from .dss_parameter import DSSParameter diff --git a/python-lib/dkulib/dku_config/custom_check.py b/python-lib/dkulib/dku_config/custom_check.py deleted file mode 100644 index 63b63d3..0000000 --- a/python-lib/dkulib/dku_config/custom_check.py +++ /dev/null @@ -1,261 +0,0 @@ -import logging -from typing import Any -import re - -logger = logging.getLogger(__name__) - -DEFAULT_ERROR_MESSAGES = { - 'exists': 'This field is required.', - 'in': 'Should be in {op}. (Currently {value})', - 'not_in': 'Should not be in {op}. (Currently {value})', - 'eq': 'Should be equal to {op} (Currently {value}).', - 'sup': 'Should be greater than {op} (Currently {value}).', - 'sup_eq': 'Should be greater than or equal to {op} (Currently {value}).', - 'inf': 'Should be less than {op} (Currently {value}).', - 'inf_eq': 'Should be less than or equal to {op} (Currently {value}).', - 'between': 'Should be between {op[0]} and {op[1]} inclusive (Currently {value}).', - 'between_strict': 'Should be between {op[0]} and {op[1]} exclusive (Currently {value}).', - 'is_type': 'Should be of type (Currently {value_type}).', - 'is_castable': 'Should be castable to type {op}> (Currently {value} with type {value_type}.', - 'custom': "There has been an unknown error.", - 'match': "Should match the following pattern: {op}.", - 'is_subset': 'Should be a subset of {op}. (Currently {value})', -} - - -class CustomCheckError(Exception): - """Exception raised when condition of CustomCheck are not met. - """ - pass - - -class CustomCheck: - """Class related to a check. Use run() to verify whether the check fails or pass - - Attributes: - type (str): Type of the CustomCheck. Must have a related method having "_" before type name - op (Any, optional): Operator to compare the value to. Unnecessary for som checks - err_msg (str, optional): Custom message to display if check fails. Default is a generic message - """ - def __init__(self, type, - op: Any = None, - err_msg: str = ''): - """Initialization method for the CustomCheck class - - Args: - type (str): Type of the CustomCheck. Must have a related method having "_" before type name - op (Any, optional): Operator to compare the value to. Unnecessary for som checks - err_msg (str, optional): Custom message to display if check fails. Default is a generic message - """ - self.type = type - func_name = '_{}'.format(self.type) - if not hasattr(self, func_name): - raise CustomCheckError('Check of type {} does not exist.'.format(self.type)) - self.op = op - self.err_msg = err_msg or self.get_default_err_msg() - - def run(self, value: Any = None): - """Runs the check on a value - - Args: - value(Any, optional): The value to run the check on. Default is None - """ - func_name = '_{}'.format(self.type) - result = getattr(self, func_name)(value) - self.handle_return(result, value) - - def handle_return(self, result: bool, value: Any): - """Checks whether the check has failed or pass - - Args: - result(bool): True if check has passed else False - value(Any): The value on which the test has been ran - - Raises: - CustomCheckError if check fails - """ - try: - assert result - except AssertionError: - raise CustomCheckError(self.format_err_msg(value)) - - def get_default_err_msg(self) -> str: - """Returns the default message related to check's type - - Returns: - str: Unformatted default error message - """ - return DEFAULT_ERROR_MESSAGES.get(self.type, 'custom') - - def format_err_msg(self, value: Any) -> str: - """Format the error message with the value that has failed the test - - Args: - value(Any): Failure value - - Returns: - str: Error messages formatted - """ - formatted_err_msg = self.err_msg.format(value=value, op=self.op, value_type=type(value)) - return f'{formatted_err_msg}' - - def _exists(self, value: Any) -> bool: - """Checks whether the value is None - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - EMPTY_ENTRIES = [[], "", None] - return value not in EMPTY_ENTRIES - - def _in(self, value: Any) -> bool: - """Checks whether the value is in the iterable given in "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value in self.op - - def _not_in(self, value: Any) -> bool: - """Checks whether the value is not in the iterable given in "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value not in self.op - - def _eq(self, value: Any) -> bool: - """Checks whether the value is equal to "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value == self.op - - def _sup(self, value: Any) -> bool: - """Checks whether the value is superior to "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value > float(self.op) - - def _inf(self, value: Any) -> bool: - """Checks whether the value is inferior to "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value < float(self.op) - - def _sup_eq(self, value: Any) -> bool: - """Checks whether the value is superior or equal to "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value >= float(self.op) - - def _inf_eq(self, value: Any) -> bool: - """Checks whether the value is inferior or equal to "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return value <= float(self.op) - - def _between(self, value: Any) -> bool: - """Checks whether the value is between the first and second member of "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return float(self.op[0]) <= value <= float(self.op[1]) - - def _between_strict(self, value: Any) -> bool: - """Checks whether the value is strictly between the first and second member of "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return float(self.op[0]) < value < float(self.op[1]) - - def _is_type(self, value: Any) -> bool: - """Checks whether the value has the type given in "op" attribute - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - return isinstance(value, self.op) - - def _is_castable(self, value: Any) -> bool: - """Checks whether the value can be cast to the op type - - Args: - value(Any): Value to test - - Returns: - bool: Whether the check has succeed - """ - try: - _ = self.op(value) - return True - except (TypeError, ValueError): - return False - - def _custom(self, *args) -> bool: - """Checks whether "op" attribute is true or false - - Returns: - bool: Whether the check has succeed - """ - return self.op - - def _match(self, value: Any) -> bool: - """Checks whether "value" matches the regex provided in "op" attribute - - Returns: - bool: Whether the check has succeed - """ - return not not re.match(self.op, value) - - def _is_subset(self, value: Any) -> bool: - """Checks whether "value" is a subset of "op" - - Returns: - bool: Whether the check has succeed - """ - return set(value).issubset(set(self.op)) diff --git a/python-lib/dkulib/dku_config/dku_config.py b/python-lib/dkulib/dku_config/dku_config.py deleted file mode 100644 index 29c3504..0000000 --- a/python-lib/dkulib/dku_config/dku_config.py +++ /dev/null @@ -1,99 +0,0 @@ -from .dss_parameter import DSSParameter -from collections.abc import MutableMapping -from typing import Any, AnyStr - - -class DkuConfig(MutableMapping): - """Mapping structure containing DSSParameter objects. It behaves as a dict with the following differences: - - You can access elements with a dot structure (Example: dku_config.param1 or dku_config["param1"]) - - You can set an element with a dot structure (Example: dku_config.param1 = 123) - - All objects stored are converted in DSSParameter - - Accessing an element returns the value of the object DSSParameter - - Attributes: - config(dict): Dict storing the DSSParameters - """ - def __init__(self, local_vars: dict = None, local_prefix: AnyStr = '', **kwargs): - """Initialization method for the DkuConfig class - - Args: - local_vars(dict, optional): Dict containing vars fetched from project local variables. Default is {} - local_prefix(str, optional): If project vars prefixed, write the prefix here, it will be added when - searching for the var - **kwargs: DSSParameters. Each key will be set as the parameter name and the values must be of type - dict. These dicts must contain at least an attribute "value". For other attributes, see - DSSParameter help. - """ - object.__setattr__(self, 'config', {}) - object.__setattr__(self, 'local_vars', local_vars or {}) - object.__setattr__(self, 'local_prefix', local_prefix) - if kwargs: - for k, v in kwargs.items(): - if 'value' not in v: - raise ValueError('Each init kwargs must have a "value" field.') - val = v.pop('value') - self.add_param(name=k, value=val, **v) - - def add_param(self, name: AnyStr, value: Any = None, **kwargs): - """Add a new DSSParameter to the config - - Args: - name(str): The name of the parameter - value(Any, optional): The value of the parameter. If empty, the parameter must be in local vars - **kwargs: Other arguments. See DSSParameter help. - """ - if self.local_vars: - value = value or self._get_local_var(name) - self.config[name] = DSSParameter(name=name, value=value, **kwargs) - - def get_param(self, name: AnyStr) -> DSSParameter: - """Returns the DSSParameter of given name - - Args: - name(str): Name of object to return - - Returns: - DSSParameter: Parameter of given name - """ - return self.config.get(name) - - def _get_local_var(self, var_name: AnyStr) -> Any: - """Returns the value of the local variable related to var_name. - - Args: - var_name(str): The variable to fetch from local_vars. It will be prefixed by the attribute "local_prefix" - - Returns: - Any: The value matching the given name - """ - return self.local_vars.get('{}{}'.format(self.local_prefix, var_name), None) - - def __delitem__(self, item): - del self.config[item] - - def __getattr__(self, name): - return self[name] - - def __setattr__(self, key, value): - self[key] = value - - def __getitem__(self, item): - if item in self.config: - return self.config.get(item).value - else: - raise KeyError(item) - - def __setitem__(self, key, value): - self.add_param(name=key, value=value) - - def __iter__(self): - return iter(self.config) - - def __len__(self): - return len(self.config) - - def __repr__(self): - return self.config.__repr__() - - def __str__(self): - return self.config.__str__() diff --git a/python-lib/dkulib/dku_config/dku_file_manager.py b/python-lib/dkulib/dku_config/dku_file_manager.py deleted file mode 100644 index a15e936..0000000 --- a/python-lib/dkulib/dku_config/dku_file_manager.py +++ /dev/null @@ -1,51 +0,0 @@ -from dataiku.customrecipe import get_input_names_for_role, get_output_names_for_role -import dataiku -from .dku_config import DkuConfig - - -class DkuFileManager(DkuConfig): - """ - Use this class to create an object that contains the different input and output datasets/folders of a custom recipe - - Usage example: - - .. code-block:: python - - file_manager = DkuFileManager() - # add a required and an optional input dataset - file_manager.add_input_dataset("input_dataset") - file_manager.add_input_dataset("optional_input_dataset", required=False) - # add a required output dataset and an optional output folder - file_manager.add_output_dataset("output_dataset") - file_manager.add_output_folder("optional_output_folder", required=False) - """ - def __init__(self, **kwargs): - super().__init__(**kwargs) - - def add_file(self, side, type_, role, **kwargs): - file = DkuFileManager._retrieve_file_from_dss(side, type_, role) - self.add_param(name=role, value=file, **kwargs) - - def add_input_folder(self, role, required=True): - self.add_file("input", "folder", role, required=required) - - def add_output_folder(self, role, required=True): - self.add_file("output", "folder", role, required=required) - - def add_input_dataset(self, role, required=True): - self.add_file("input", "dataset", role, required=required) - - def add_output_dataset(self, role, required=True): - self.add_file("output", "dataset", role, required=required) - - @staticmethod - def _retrieve_file_from_dss(side, type_, role): - dku_func = get_input_names_for_role if side == "input" else get_output_names_for_role - dku_type = dataiku.Folder if type_ == "folder" else dataiku.Dataset - roles = dku_func(role) - return dku_type(roles[0]) if roles else None - - @staticmethod - def write_to_folder(folder, file_path, content): - with folder.get_writer(file_path) as w: - w.write(content.encode()) diff --git a/python-lib/dkulib/dku_config/dss_parameter.py b/python-lib/dkulib/dku_config/dss_parameter.py deleted file mode 100644 index 52a3dae..0000000 --- a/python-lib/dkulib/dku_config/dss_parameter.py +++ /dev/null @@ -1,126 +0,0 @@ -from .custom_check import CustomCheck, CustomCheckError -from typing import Any, List - -import logging -logger = logging.getLogger(__name__) - - -class DSSParameterError(Exception): - """Exception raised when at least one CustomCheck fails. - """ - pass - - -class DSSParameter: - """Object related to one parameter. It is mainly used for checks to run in backend for custom forms. - - Attributes: - name(str): Name of the parameter - value(Any): Value of the parameter - label(str, optional): The name displayed to the end user - checks(list[dict], optional): Checks to run on provided value - required(bool, optional): Whether the value can be None - cast_to(type, optional): The type to cast the variable in - cast_to(Any, optional): The default value of the variable (If value is None) - """ - - def __init__(self, name: str, value: Any, label: str = None, checks: List[dict] = None, required: bool = False, cast_to: type = None, default: Any = None): - """Initialization method for the DSSParameter class - - Args: - name(str): Name of the parameter - value(Any): Value of the parameter - label(str, optional): The name displayed to the end user - checks(list[dict], optional): Checks to run on provided value - required(bool, optional): Whether the value can be None - cast_to(type, optional): The type to cast the variable in - default(Any, optional): The default value of the variable (If value is None) - """ - if checks is None: - checks = [] - self.name = name - self.value = value if value is not None else default - self.label = label or name - self.required = required - self.cast_to = cast_to - self.checks = [CustomCheck(**check) for check in checks] - - value_exists = self.run_checks([CustomCheck(type='exists')], raise_error=self.required) - if value_exists: - if self.cast_to: - self.cast_value() - self.run_checks(self.checks) - - def cast_value(self): - """Cast the value if there is as cast_to attribute else return the value as it is - """ - if self.cast_to: - self.run_checks([CustomCheck(type='is_castable', op=self.cast_to)]) - self.value = self.cast_to(self.value) - - def run_checks(self, checks, raise_error=True): - """Runs all checks provided for this parameter - - Args: - checks(list[Check]): Checks to run - raise_error(bool, optional): Whether to rise an error if a check fails - - Returns: - bool: Whether all checks have passed - - Raises: - DSSParameterError: Raises if at least on check fails and raise_error is True - """ - for check in checks: - try: - check.run(self.value) - except CustomCheckError as err: - if raise_error: - self.handle_failure(err) - return False - self.handle_success() - return True - - def handle_failure(self, error: CustomCheckError): - """Is called when at least one test fails. It will raise an Exception with understandable text - - Args: - error(CustomCheckError): Errors met when running checks - - Raises: - DSSParameterError: Raises if at least on check fails - """ - raise DSSParameterError(self.format_failure_message(error)) - - def format_failure_message(self, error: CustomCheckError) -> str: - """Format failure text - - Args: - error (CustomCheckError): Error met when running check - - Returns: - str: Formatted error message - """ - return """ - Validation error with parameter \"{name}\": - {error} - """.format( - name=self.label, - error=error - ) - - def handle_success(self): - """Called if all checks are successful. Prints a success message - """ - self.print_success_message() - - def print_success_message(self): - """Formats the success message - """ - logger.debug('All checks passed successfully for {}.'.format(self.name)) - - def __repr__(self): - return "DSSParameter(name={}, value={})".format(self.name, self.value) - - def __str__(self): - return "DSSParameter(name={}, value={})".format(self.name, self.value) diff --git a/python-lib/dkulib/dku_config/requirements.txt b/python-lib/dkulib/dku_config/requirements.txt deleted file mode 100644 index 8b13789..0000000 --- a/python-lib/dkulib/dku_config/requirements.txt +++ /dev/null @@ -1 +0,0 @@ - From 6fd9ae79aeee110349454cda59a196768e2ad34c Mon Sep 17 00:00:00 2001 From: Muennighoff Date: Thu, 5 Jan 2023 20:00:11 +0400 Subject: [PATCH 3/5] Improve formatting --- .../amazon-comprehend-nlp-keyphrase-extraction/recipe.py | 2 -- .../amazon-comprehend-nlp-language-detection/recipe.py | 9 +++------ .../recipe.py | 3 --- .../amazon-comprehend-nlp-sentiment-analysis/recipe.py | 4 +--- 4 files changed, 4 insertions(+), 14 deletions(-) diff --git a/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py b/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py index 51ee651..1cf7afb 100644 --- a/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-keyphrase-extraction/recipe.py @@ -52,12 +52,10 @@ client = get_client(api_configuration_preset) column_prefix = "keyphrase_api" - # ============================================================================== # RUN # ============================================================================== - @retry((RateLimitException, OSError), delay=api_quota_period, tries=5) @limits(calls=api_quota_rate_limit, period=api_quota_period) def call_api_key_phrase_extraction( diff --git a/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py b/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py index d4e0e43..12f06f3 100644 --- a/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-language-detection/recipe.py @@ -7,12 +7,11 @@ import dataiku from dataiku.customrecipe import get_recipe_config, get_input_names_for_role, get_output_names_for_role -from plugin_io_utils import ErrorHandlingEnum, validate_column_input -from dkulib.dku_io_utils import set_column_descriptions -from dkulib.parallelizer import DataFrameParallelizer from amazon_comprehend_api_client import API_EXCEPTIONS, batch_api_response_parser, get_client from amazon_comprehend_api_formatting import LanguageDetectionAPIFormatter - +from dkulib.dku_io_utils import set_column_descriptions +from dkulib.parallelizer import DataFrameParallelizer +from plugin_io_utils import ErrorHandlingEnum, validate_column_input # ============================================================================== # SETUP @@ -44,12 +43,10 @@ "batch_response_parser": batch_api_response_parser, } - # ============================================================================== # RUN # ============================================================================== - @retry((RateLimitException, OSError), delay=api_quota_period, tries=5) @limits(calls=api_quota_rate_limit, period=api_quota_period) def call_api_language_detection(batch: List[Dict], text_column: AnyStr) -> List[Dict]: diff --git a/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py b/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py index 9672111..0e4fba4 100644 --- a/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-named-entity-recognition/recipe.py @@ -14,7 +14,6 @@ from dkulib.parallelizer import DataFrameParallelizer from plugin_io_utils import ErrorHandlingEnum, validate_column_input - # ============================================================================== # SETUP # ============================================================================== @@ -56,12 +55,10 @@ client = get_client(api_configuration_preset) column_prefix = "entity_api" - # ============================================================================== # RUN # ============================================================================== - @retry((RateLimitException, OSError), delay=api_quota_period, tries=5) @limits(calls=api_quota_rate_limit, period=api_quota_period) def call_api_named_entity_recognition( diff --git a/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py b/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py index 1195b0d..37bd631 100644 --- a/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py +++ b/custom-recipes/amazon-comprehend-nlp-sentiment-analysis/recipe.py @@ -8,11 +8,11 @@ import dataiku from dataiku.customrecipe import get_recipe_config, get_input_names_for_role, get_output_names_for_role -from plugin_io_utils import ErrorHandlingEnum, validate_column_input from amazon_comprehend_api_client import API_EXCEPTIONS, batch_api_response_parser, get_client from amazon_comprehend_api_formatting import SentimentAnalysisAPIFormatter from dkulib.dku_io_utils import set_column_descriptions from dkulib.parallelizer import DataFrameParallelizer +from plugin_io_utils import ErrorHandlingEnum, validate_column_input # ============================================================================== # SETUP @@ -51,12 +51,10 @@ client = get_client(api_configuration_preset) column_prefix = "sentiment_api" - # ============================================================================== # RUN # ============================================================================== - @retry((RateLimitException, OSError), delay=api_quota_period, tries=5) @limits(calls=api_quota_rate_limit, period=api_quota_period) def call_api_sentiment_analysis( From 4b84be97639c6608957a1b203f8631d3151af3c0 Mon Sep 17 00:00:00 2001 From: Muennighoff Date: Thu, 5 Jan 2023 20:05:22 +0400 Subject: [PATCH 4/5] Add CHANGELOG --- CHANGELOG.md | 14 ++++++++++++++ plugin.json | 4 ++-- 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..9453878 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,14 @@ +# Changelog + + +## [Version 1.0.2](https://github.com/dataiku/dss-plugin-nlp-amazon-translation/releases/tag/v1.0.2) - Dkulib integration 2023-01 + +- ✨ Integrate dkulib and allow empty inputs + +## [Version 1.0.1](https://github.com/dataiku/dss-plugin-nlp-amazon-translation/releases/tag/v1.0.1) - Move configuration parameters - 2020-08 + +- ✨ Move configuration parameters + +## [Version 1.0.0](https://github.com/dataiku/dss-plugin-nlp-amazon-translation/releases/tag/v1.0.0) - Initial release - 2020-05 + +- ✨ Integration with the [Amazon Comprehend API](https://docs.aws.amazon.com/comprehend/index.html) diff --git a/plugin.json b/plugin.json index 73a314f..9205f97 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id": "amazon-comprehend-nlp", - "version": "1.0.1", + "version": "1.0.2", "meta": { "label": "Amazon Comprehend NLP", "category": "Natural Language Processing", @@ -16,4 +16,4 @@ ], "supportLevel": "TIER2_SUPPORT" } -} \ No newline at end of file +} From c63e3ee9aa777e367d6a6d83d07528c658700887 Mon Sep 17 00:00:00 2001 From: Muennighoff Date: Thu, 5 Jan 2023 20:06:57 +0400 Subject: [PATCH 5/5] Increment packages --- code-env/python/spec/requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/code-env/python/spec/requirements.txt b/code-env/python/spec/requirements.txt index 6ce29e9..a13aa80 100644 --- a/code-env/python/spec/requirements.txt +++ b/code-env/python/spec/requirements.txt @@ -1,5 +1,5 @@ boto3==1.15.14 -tqdm==4.50.1 +tqdm==4.61.0 ratelimit==2.2.1 retry==0.9.2 -more-itertools==8.5.0 +more-itertools==8.8.0