From 938c71f64799444e44fb21d986fc05de15c3bbe6 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Thu, 3 Oct 2024 10:54:24 +0000 Subject: [PATCH 01/24] Prepared mod runner --- export_mods_main.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 export_mods_main.py diff --git a/export_mods_main.py b/export_mods_main.py new file mode 100644 index 000000000..7c46f0f17 --- /dev/null +++ b/export_mods_main.py @@ -0,0 +1,29 @@ +"""Script that creates all directories""" +import os + +# Change to the project repository location +my_wd = os.getcwd() +my_repo = "research-and-development" +if not my_wd.endswith(my_repo): + os.chdir(my_repo) + +from src.utils.helpers import tree_to_list +import src.utils.s3_mods as mods +from src.utils.singleton_boto import SingletonBoto + +config = { + "s3": { + "ssl_file": "/etc/pki/tls/certs/ca-bundle.crt", + "s3_bucket": "onscdp-dev-data01-5320d6ca" + } +} + +boto3_client = SingletonBoto.get_client(config) + + +def run_file_size(): + root = "/bat/res_dev/project_data" + + +if __name__ == "__main__": + run_make_dirs() From 5ab04d5b85e2d4c01d481313e3bd7ff5b09dd4bd Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Thu, 3 Oct 2024 11:29:59 +0000 Subject: [PATCH 02/24] Created rd_delete_file --- export_mods_main.py | 15 +++++++++++---- src/utils/s3_mods.py | 36 +++++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 7c46f0f17..a330796cc 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -8,7 +8,6 @@ os.chdir(my_repo) from src.utils.helpers import tree_to_list -import src.utils.s3_mods as mods from src.utils.singleton_boto import SingletonBoto config = { @@ -19,11 +18,19 @@ } boto3_client = SingletonBoto.get_client(config) +import src.utils.s3_mods as mods -def run_file_size(): - root = "/bat/res_dev/project_data" if __name__ == "__main__": - run_make_dirs() + + my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v17.csv" + my_size = mods.rd_file_size(my_path) + + print(f"File size is {my_size}") + + status = mods.rd_delete_file(my_path) + if status: + print(f"File {my_path} successfully deleted") + diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 39a32c558..982987c83 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -31,7 +31,11 @@ from io import StringIO # Local libraries -from rdsa_utils.cdp.helpers.s3_utils import file_exists, create_folder_on_s3 +from rdsa_utils.cdp.helpers.s3_utils import ( + file_exists, + create_folder_on_s3, + delete_file, +) from src.utils.singleton_boto import SingletonBoto # from src.utils.singleton_config import SingletonConfig @@ -167,3 +171,33 @@ def rd_write_feather(filepath, df): def rd_read_feather(filepath): """Placeholder Function to read feather file from HDFS""" return None + +def rd_file_size(filepath: str) -> int: + """Function to check the size of a file on s3 bucket. + + Args: + filepath (string) -- The filepath in s3 bucket + + Returns: + Int - an integer value indicating the size + of the file in bytes + """ + + _response = s3_client.head_object(Bucket=s3_bucket, Key=filepath) + file_size = _response['ContentLength'] + + return file_size + +def rd_delete_file(filepath: str) -> bool: + """ + Delete a file from s3 bucket. + Args: + filepath (string): The filepath in s3 bucket to be deleted + Returns: + status (bool): True for successfully completed deletion. Else False. + """ + status = delete_file(s3_client, s3_bucket, filepath) + return status + + + From e42ef179e05f338bdde3453a6633a08dcd8d0e85 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Thu, 3 Oct 2024 11:57:04 +0000 Subject: [PATCH 03/24] Created rd_md5sum) --- export_mods_main.py | 21 +++++++++++++++------ src/utils/s3_mods.py | 21 +++++++++++++++++++++ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index a330796cc..ffefa2870 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -25,12 +25,21 @@ if __name__ == "__main__": - my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v17.csv" - my_size = mods.rd_file_size(my_path) + my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v20.csv" + +# # Checking that file exists +# my_size = mods.rd_file_size(my_path) +# print(f"File size is {my_size}") - print(f"File size is {my_size}") +# # Deleting a file +# status = mods.rd_delete_file(my_path) +# if status: +# print(f"File {my_path} successfully deleted") - status = mods.rd_delete_file(my_path) - if status: - print(f"File {my_path} successfully deleted") + # Calculating md5sum + my_sum = mods.rd_md5sum(my_path) + expected_output = "ea94424aceecf11c8a70d289e51c34ea" + print(my_sum) + if expected_output == my_sum + \ No newline at end of file diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 982987c83..415609c02 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -199,5 +199,26 @@ def rd_delete_file(filepath: str) -> bool: status = delete_file(s3_client, s3_bucket, filepath) return status +def rd_md5sum(filepath: str) -> int: + """ + Get md5sum of a specific file on s3. + Args: + filepath (string): The filepath in s3 bucket. + Returns: + md5result (int): The control sum md5. + """ + + try: + md5result = s3_client.head_object( + Bucket=s3_bucket, + Key=filepath + )['ETag'][1:-1] + except ValueError: + md5result = None + pass + return md5result + + + From 7bfd56d8d6e302e4bec5e32a3b62942318db8f24 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Thu, 3 Oct 2024 14:13:29 +0000 Subject: [PATCH 04/24] Added rd_isdir --- export_mods_main.py | 18 ++++++++++++------ src/utils/s3_mods.py | 32 +++++++++++++++++++++++++++++--- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index ffefa2870..1a47fb540 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -36,10 +36,16 @@ # if status: # print(f"File {my_path} successfully deleted") - # Calculating md5sum - my_sum = mods.rd_md5sum(my_path) - expected_output = "ea94424aceecf11c8a70d289e51c34ea" - print(my_sum) - if expected_output == my_sum + # # Calculating md5sum + # my_sum = mods.rd_md5sum(my_path) + # expected_output = "ea94424aceecf11c8a70d289e51c34ea" + # print(type(my_sum)) + # if expected_output == my_sum: + # print("Same md5sum") - \ No newline at end of file +# # Calculating rd_isdir +# mydir = "bat" +# response = mods.rd_isdir(mydir) + +# print("Got response") +# print(response) diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 415609c02..c6640ffc0 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -35,6 +35,7 @@ file_exists, create_folder_on_s3, delete_file, + is_s3_directory, ) from src.utils.singleton_boto import SingletonBoto # from src.utils.singleton_config import SingletonConfig @@ -199,7 +200,7 @@ def rd_delete_file(filepath: str) -> bool: status = delete_file(s3_client, s3_bucket, filepath) return status -def rd_md5sum(filepath: str) -> int: +def rd_md5sum(filepath: str) -> str: """ Get md5sum of a specific file on s3. Args: @@ -213,11 +214,36 @@ def rd_md5sum(filepath: str) -> int: Bucket=s3_bucket, Key=filepath )['ETag'][1:-1] - except ValueError: + except s3_client.exceptions.ClientError as e: + s3_logger.error(f"Failed to compute the md5 checksum: {str(e)}") md5result = None - pass return md5result +def rd_isdir(dirpath: str): # -> bool: + """ + Test if directory exists in s3 bucket. + + Args: + dirpath (string): The "directory" path in s3 bucket. + Returns: + status (bool): True if the dirpath is a directory, false otherwise. + + """ + # The directory name must end with forward slash + if not dirpath.endswith('/'): + path = dirpath + '/' + + # Any slashes at the beginning should be removed + while dirpath.startswith('/'): + dirpath = dirpath[1:] + + # Use the function from rdsa_utils + + response = is_s3_directory(client=s3_client, bucket_name=s3_bucket, object_name=dirpath) + + return response + + From 13eea1c9235b6e7edd6da3397bf6e55fb519597b Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Thu, 3 Oct 2024 14:59:32 +0000 Subject: [PATCH 05/24] Added rd_isfile --- export_mods_main.py | 4 ++++ src/utils/s3_mods.py | 25 ++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/export_mods_main.py b/export_mods_main.py index 1a47fb540..103fbe6a3 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -49,3 +49,7 @@ # print("Got response") # print(response) + + # Checking rd_isfile + response = mods.rd_isfile(my_path) + print(response) diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index c6640ffc0..50b919f28 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -238,10 +238,33 @@ def rd_isdir(dirpath: str): # -> bool: dirpath = dirpath[1:] # Use the function from rdsa_utils - response = is_s3_directory(client=s3_client, bucket_name=s3_bucket, object_name=dirpath) + return response +def rd_isfile(filepath: str): # -> bool: + """ + Test if given path is a file in s3 bucket. Check that it exists, not a directory and + the size is greater than 0. + + Args: + filepath (string): The "directory" path in s3 bucket. + Returns: + status (bool): True if the dirpath is a directory, false otherwise. + + """ + if filepath is None: + response = False + + if rd_file_exists(filepath): + isdir = rd_isdir(filepath) + size = rd_file_size(filepath) + response = (not isdir) and (size > 0) + else: + response = False return response + + + From b13c1f5329f45ef5508ab3d42cbd85dc0bb4e579 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Fri, 4 Oct 2024 12:00:02 +0000 Subject: [PATCH 06/24] Added and tested rd_stat_size --- export_mods_main.py | 16 ++++++++++++---- src/utils/s3_mods.py | 6 +++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 103fbe6a3..ee154ef41 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -26,7 +26,7 @@ if __name__ == "__main__": my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v20.csv" - + my_dir = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" # # Checking that file exists # my_size = mods.rd_file_size(my_path) # print(f"File size is {my_size}") @@ -50,6 +50,14 @@ # print("Got response") # print(response) - # Checking rd_isfile - response = mods.rd_isfile(my_path) - print(response) + # # Checking rd_isfile + # response = mods.rd_isfile(my_path) + # print(response) + +# # Checking that rd_stat_size works for files and directories +# file_size = mods.rd_stat_size(my_path) +# print(f"File {my_path} size is {file_size} bytes.") + +# dir_size = mods.rd_stat_size(my_dir) +# print(f"Directory {my_dir} size is {dir_size} bytes.") + diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 50b919f28..77114aa6d 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -263,7 +263,11 @@ def rd_isfile(filepath: str): # -> bool: response = False return response - +def rd_stat_size(path: str): + """ + Gets the file size of a file or directory in bytes. + """ + return rd_file_size(path) From 33ca7fcbdcb54e3b87c50f489f88606fd5ee3ebe Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Fri, 4 Oct 2024 12:32:30 +0000 Subject: [PATCH 07/24] Created and tested rd_read_header --- export_mods_main.py | 4 ++++ src/utils/s3_mods.py | 30 ++++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index ee154ef41..dae4af5a4 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -60,4 +60,8 @@ # dir_size = mods.rd_stat_size(my_dir) # print(f"Directory {my_dir} size is {dir_size} bytes.") + + # Testing rd_read_header  + response = mods.rd_read_header(my_path) + print(response) diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 77114aa6d..0dbfd6995 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -28,7 +28,8 @@ # Third party libraries import pandas as pd -from io import StringIO +from io import StringIO, TextIOWrapper + # Local libraries from rdsa_utils.cdp.helpers.s3_utils import ( @@ -263,12 +264,37 @@ def rd_isfile(filepath: str): # -> bool: response = False return response -def rd_stat_size(path: str): +def rd_stat_size(path: str) -> int: """ Gets the file size of a file or directory in bytes. + Alias of as rd_file_size. + Works for directories, but returns 0 bytes, which is typical for s3. """ return rd_file_size(path) +def rd_read_header(path: str) -> str: + """ + Reads the first line of a file on s3. Gets the entire file using boto3 get_objects, + converts its body into an input stream, reads the first line and remove the carriage + return character (backslash-n) from the end. + Args: + filepath (string): The "directory" path in s3 bucket. + Returns: + status (bool): True if the dirpath is a directory, false otherwise. + """ + # Create an input/output stream pointer, same as open + stream = TextIOWrapper(s3_client.get_object(Bucket=s3_bucket, Key=path)['Body']) + + # Read the first line from the stream + response = stream.readline() + + # Remove the last character (carriage return, or new line) + response = response[:-1] + + return response + + + From fce04abbc9ac4d1dcc69ef0f46379a212f4fcbc1 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Fri, 4 Oct 2024 12:48:52 +0000 Subject: [PATCH 08/24] Created and tested rd_write_string_to_file --- export_mods_main.py | 11 ++++++++--- src/utils/s3_mods.py | 18 ++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index dae4af5a4..58b1e540b 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -61,7 +61,12 @@ # dir_size = mods.rd_stat_size(my_dir) # print(f"Directory {my_dir} size is {dir_size} bytes.") - # Testing rd_read_header  - response = mods.rd_read_header(my_path) - print(response) + # # Testing rd_read_header  + # response = mods.rd_read_header(my_path) + # print(response) + # Testing rd_write_string_to_file + out_path = "/bat/res_dev/project_data/write_string_test.txt" + content = "George is great!" + mods.rd_write_string_to_file(content, out_path) + print("all done") diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 0dbfd6995..fc7273567 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -293,6 +293,24 @@ def rd_read_header(path: str) -> str: return response +def rd_write_string_to_file(content: bytes, filepath: str): + """ + Writes a string into the specified file path + """ + + # Put context to a new Input-Output buffer + str_buffer = StringIO(content) + + # "Rewind" the stream to the start of the buffer + str_buffer.seek(0) + + # Write the buffer into the s3 bucket + _ = s3_client.put_object( + Bucket=s3_bucket, Body=str_buffer.getvalue(), Key=filepath + ) + return None + + From 8d4a0f4d29214022e8ee869d1706c99861345f18 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Fri, 4 Oct 2024 13:05:42 +0000 Subject: [PATCH 09/24] Added and tested rd_copy_file --- export_mods_main.py | 17 ++++++++++++----- src/utils/s3_mods.py | 20 ++++++++++++++++---- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 58b1e540b..815f3bf35 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -65,8 +65,15 @@ # response = mods.rd_read_header(my_path) # print(response) - # Testing rd_write_string_to_file - out_path = "/bat/res_dev/project_data/write_string_test.txt" - content = "George is great!" - mods.rd_write_string_to_file(content, out_path) - print("all done") + # # Testing rd_write_string_to_file + # out_path = "/bat/res_dev/project_data/write_string_test.txt" + # content = "Some content" + # mods.rd_write_string_to_file(content, out_path) + # print("all done") + + # Testing rd_copy_file + src_path = "/bat/res_dev/project_data/write_string_test.txt" + dst_path = "/bat/res_dev/write_string_test_copy.txt" + success = mods.rd_copy_file(src_path, dst_path) + if success: + print("File copied successfully") diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index fc7273567..3582c7f20 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -15,10 +15,6 @@ To do: Read feather - possibly, not needed Write to feather - possibly, not needed - Copy file - Move file - Compute md5 sum - TBC """ # Standard libraries @@ -37,6 +33,7 @@ create_folder_on_s3, delete_file, is_s3_directory, + copy_file, ) from src.utils.singleton_boto import SingletonBoto # from src.utils.singleton_config import SingletonConfig @@ -310,6 +307,21 @@ def rd_write_string_to_file(content: bytes, filepath: str): ) return None +def rd_copy_file(src_path: str, dst_path: str) -> bool: + """ + Copy a file from one location to another. Uses rdsa_utils. + """ + success = copy_file( + client=s3_client, + source_bucket_name=s3_bucket, + source_object_name=src_path, + destination_bucket_name=s3_bucket, + destination_object_name=dst_path, + ) + return success + + + From f703cd6c49b013a82c497aa1ab0c2b768e59b157 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Fri, 4 Oct 2024 13:21:15 +0000 Subject: [PATCH 10/24] Added rd_move_file --- export_mods_main.py | 17 ++++++++++++----- src/utils/s3_mods.py | 16 ++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 815f3bf35..6f7ce0c2d 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -71,9 +71,16 @@ # mods.rd_write_string_to_file(content, out_path) # print("all done") - # Testing rd_copy_file - src_path = "/bat/res_dev/project_data/write_string_test.txt" - dst_path = "/bat/res_dev/write_string_test_copy.txt" - success = mods.rd_copy_file(src_path, dst_path) + # # Testing rd_copy_file + # src_path = "/bat/res_dev/project_data/write_string_test.txt" + # dst_path = "/bat/res_dev/write_string_test_copy.txt" + # success = mods.rd_copy_file(src_path, dst_path) + # if success: + # print("File copied successfully") + + # Testing rd_move_file + src_path = "/bat/res_dev/write_string_test_copy.txt" + dst_path = "/bat/res_dev/project_data/write_string_moved.txt" + success = mods.rd_move_file(src_path, dst_path) if success: - print("File copied successfully") + print("File moved successfully") diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 3582c7f20..fc5310da7 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -34,6 +34,7 @@ delete_file, is_s3_directory, copy_file, + move_file, ) from src.utils.singleton_boto import SingletonBoto # from src.utils.singleton_config import SingletonConfig @@ -320,6 +321,21 @@ def rd_copy_file(src_path: str, dst_path: str) -> bool: ) return success +def rd_move_file(src_path: str, dst_path: str) -> bool: + """ + Move a file from one location to another. Uses rdsa_utils. + """ + success = move_file( + client=s3_client, + source_bucket_name=s3_bucket, + source_object_name=src_path, + destination_bucket_name=s3_bucket, + destination_object_name=dst_path + ) + return success + + + From 3862631a4810b5314bb1bb946c675b8d9689df75 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Fri, 4 Oct 2024 14:10:07 +0000 Subject: [PATCH 11/24] Added rd_search_file --- export_mods_main.py | 19 ++++++++---- src/utils/s3_mods.py | 74 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 6f7ce0c2d..7e8fd69cb 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -78,9 +78,16 @@ # if success: # print("File copied successfully") - # Testing rd_move_file - src_path = "/bat/res_dev/write_string_test_copy.txt" - dst_path = "/bat/res_dev/project_data/write_string_moved.txt" - success = mods.rd_move_file(src_path, dst_path) - if success: - print("File moved successfully") + # # Testing rd_move_file + # src_path = "/bat/res_dev/write_string_test_copy.txt" + # dst_path = "/bat/res_dev/project_data/write_string_moved.txt" + # success = mods.rd_move_file(src_path, dst_path) + # if success: + # print("File moved successfully") + + # Testing rd_search_file + dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" + ending = "24-10-02_v20.csv" + + found_file = mods.rd_search_file(dir_path, ending) + print(f"Found file: {found_file}") diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index fc5310da7..eaa04c8be 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -275,8 +275,10 @@ def rd_read_header(path: str) -> str: Reads the first line of a file on s3. Gets the entire file using boto3 get_objects, converts its body into an input stream, reads the first line and remove the carriage return character (backslash-n) from the end. - Args: + + Args: filepath (string): The "directory" path in s3 bucket. + Returns: status (bool): True if the dirpath is a directory, false otherwise. """ @@ -334,6 +336,76 @@ def rd_move_file(src_path: str, dst_path: str) -> bool: ) return success +# Function to replicate os.walk behavior +def s3walk(locations: list, prefix: str) -> tuple: + """ + Mimics the functionality of os.walk in s3 bucket using long filenames with slashes. + Recursively goes through the long filenames and splits it into "locations" - + subdirectories, and "files" - short file names. + + Args: + locations (list): a list of s3 locations that can be "directories" + prefix (str): Name of "subdirectory" of root where further locations will be found. + + Returns: + A tuple of (root, (subdir, files)). + """ + # recursively add location to roots starting from prefix + def processLocation( root, prefixLocal, location): + # add new root location if not available + if not prefixLocal in root: + root[prefixLocal]=(set(),set()) + # check how many folders are available after prefix + remainder = location[len(prefixLocal):] + structure = remainder.split('/') + #if we are not yet in the folder of the file we need to continue with a larger prefix + if len(structure)>1: + # add folder dir + root[prefixLocal][0].add(structure[0]) + #make sure file is added allong the way + processLocation(root, prefixLocal+'/'+structure[0],location ) + else: + # add to file + root[prefixLocal][1].add(structure[0]) + + root={} + for location in locations: + processLocation(root,prefix,location) + + return root.items() + + + +def rd_search_file(dir_path: str, ending: str) -> str: + """Find a file in a directory with a specific ending. + + Args: + dir_path (str): s3 "directory" where to search for files + ending (str): File name ending to search for. + Returns: + Full file name that ends with the given string. + + """ + target_file = None + + # get list of objects with prefix + response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=dir_path) + + # retrieve key values + locations = [object['Key'] for object in response['Contents']] + + for _, (__, files) in s3walk(locations, dir_path): + for file in files: + + # Check for ending + if file.endswith(ending): + target_file = str(file) + return target_file + + + + + From b937ba558bc733365b2eb91e5d1d446919e1c9a6 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Mon, 7 Oct 2024 08:21:49 +0000 Subject: [PATCH 12/24] Updated copy and move --- export_main.py | 7 +++++++ export_mods_main.py | 20 ++++++++++---------- src/dev_config.yaml | 2 +- src/outputs/export_files.py | 16 ++++++++++------ src/pipeline.py | 2 -- src/user_config.yaml | 4 ++-- src/utils/s3_mods.py | 37 +++++++++++++++++++++++++++++++++++-- 7 files changed, 65 insertions(+), 23 deletions(-) diff --git a/export_main.py b/export_main.py index 37614729f..678988a14 100644 --- a/export_main.py +++ b/export_main.py @@ -2,6 +2,13 @@ import os from importlib import reload + +# Change to the project repository location +my_wd = os.getcwd() +my_repo = "research-and-development" +if not my_wd.endswith(my_repo): + os.chdir(my_repo) + from src.outputs import export_files reload(export_files) diff --git a/export_mods_main.py b/export_mods_main.py index 7e8fd69cb..d922b3939 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -65,11 +65,11 @@ # response = mods.rd_read_header(my_path) # print(response) - # # Testing rd_write_string_to_file - # out_path = "/bat/res_dev/project_data/write_string_test.txt" - # content = "Some content" - # mods.rd_write_string_to_file(content, out_path) - # print("all done") + # Testing rd_write_string_to_file + out_path = "/bat/res_dev/project_data/write_string_test.txt" + content = "Some content" + mods.rd_write_string_to_file(content.encode(encoding, "utf-8"), out_path) + print("all done") # # Testing rd_copy_file # src_path = "/bat/res_dev/project_data/write_string_test.txt" @@ -85,9 +85,9 @@ # if success: # print("File moved successfully") - # Testing rd_search_file - dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" - ending = "24-10-02_v20.csv" +# # Testing rd_search_file +# dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" +# ending = "24-10-02_v20.csv" - found_file = mods.rd_search_file(dir_path, ending) - print(f"Found file: {found_file}") +# found_file = mods.rd_search_file(dir_path, ending) +# print(f"Found file: {found_file}") diff --git a/src/dev_config.yaml b/src/dev_config.yaml index f168b1dc9..124bb8c69 100644 --- a/src/dev_config.yaml +++ b/src/dev_config.yaml @@ -8,7 +8,7 @@ global: table_config: "SingleLine" # Environment settings dev_test : False - platform: network #whether to load from hdfs, network (Windows) or s3 (CDP) + platform: s3 #whether to load from hdfs, network (Windows) or s3 (CDP) load_from_feather: False runlog_writer: write_csv: True # Write the runlog to a CSV file diff --git a/src/outputs/export_files.py b/src/outputs/export_files.py index 304b4740c..c3b09a223 100644 --- a/src/outputs/export_files.py +++ b/src/outputs/export_files.py @@ -131,7 +131,7 @@ def check_files_exist(file_list: List, config: dict, isfile: callable): for file in file_list: file_path = Path(file) # Changes to path if str OutgoingLogger.debug(f"Using {platform} isfile function") - if not isfile(file_path): + if not isfile(str(file_path)): OutgoingLogger.error( f"File {file} does not exist. Check existence and spelling" ) @@ -152,7 +152,9 @@ def transfer_files(source, destination, method, logger, copy_files, move_files): """ transfer_func = {"copy": copy_files, "move": move_files}[method] past_tense = {"copy": "copied", "move": "moved"}[method] - transfer_func(source, destination) + from ipdb import set_trace + set_trace() + transfer_func(str(source), destination) logger.info(f"Files {source} successfully {past_tense} to {destination}.") @@ -227,10 +229,12 @@ def run_export(user_config_path: str, dev_config_path: str): platform = config["global"]["platform"] if platform == "s3": - from src.utils import s3_mods as mods + # create singletion boto3 client object & pass in bucket string + from src.utils.singleton_boto import SingletonBoto - # Creating boto3 client and adding it to the config dict - config["client"] = mods.create_client(config) + boto3_client = SingletonBoto.get_client(config) # noqa + from src.utils import s3_mods as mods + elif platform == "network": # If the platform is "network" or "hdfs", there is no need for a client. # Adding a client = None for consistency. @@ -301,7 +305,7 @@ def run_export(user_config_path: str, dev_config_path: str): # Copy or Move files to outgoing folder file_transfer_method = config["export_choices"]["copy_or_move_files"] - + for file_path in file_select_dict.values(): file_path = os.path.join(file_path) transfer_files( diff --git a/src/pipeline.py b/src/pipeline.py index f275892d7..5ea26e2be 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -53,8 +53,6 @@ def run_pipeline(user_config_path, dev_config_path): boto3_client = SingletonBoto.get_client(config) # noqa from src.utils import s3_mods as mods - # Creating boto3 client and adding it to the config dict - # config["client"] = boto3_client elif platform == "network": # If the platform is "network" or "hdfs", there is no need for a client. # Adding a client = None for consistency. diff --git a/src/user_config.yaml b/src/user_config.yaml index 6ba000887..2f73f394d 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -151,8 +151,8 @@ export_choices: export_fte_total_qa: None export_status_filtered: None export_frozen_group: None - export_staged_BERD_full_responses: None + export_staged_BERD_full_responses: "2023_staged_BERD_full_responses_24-10-02_v20.csv" export_staged_NI_full_responses: None export_full_responses_imputed: None export_full_estimation_qa: None # "2022_full_estimation_qa_24-07-15_v555.csv" - export_invalid_unrecognised_postcodes: "2022_invalid_unrecognised_postcodes_24-07-04_v503.csv" + export_invalid_unrecognised_postcodes: None # "2022_invalid_unrecognised_postcodes_24-07-04_v503.csv" diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index eaa04c8be..a469f59a6 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -299,7 +299,7 @@ def rd_write_string_to_file(content: bytes, filepath: str): """ # Put context to a new Input-Output buffer - str_buffer = StringIO(content) + str_buffer = StringIO(content.decode("utf-8")) # "Rewind" the stream to the start of the buffer str_buffer.seek(0) @@ -310,7 +310,33 @@ def rd_write_string_to_file(content: bytes, filepath: str): ) return None +def _path_long2short(path: "str") -> str: + """ + Extracts a short file name from the full path. + If there is at least one forward slash, finds the lates slash to the right + and rerurns all characrers afrer it. + + If there are no slashes, returns the path as is. + """ + if "/" in path: + last_slash = path.rfind("/") + return path[last_slash + 1:] + else: + return path + +def _remove_end_slashes(path: "str") -> str: + """ + Removes any amount of consequitive forward slashes from a path. + """ + while path.endswith("/"): + path = path[:-1] + + return path + def rd_copy_file(src_path: str, dst_path: str) -> bool: + + dst_path = _remove_end_slashes(dst_path) + dst_path += "/" + _path_long2short(src_path) """ Copy a file from one location to another. Uses rdsa_utils. """ @@ -326,7 +352,10 @@ def rd_copy_file(src_path: str, dst_path: str) -> bool: def rd_move_file(src_path: str, dst_path: str) -> bool: """ Move a file from one location to another. Uses rdsa_utils. + """ + dst_path = _remove_end_slashes(dst_path) + dst_path += "/" + _path_long2short(src_path) success = move_file( client=s3_client, source_bucket_name=s3_bucket, @@ -388,9 +417,13 @@ def rd_search_file(dir_path: str, ending: str) -> str: """ target_file = None + # Remove preceding forward slashes if needed + while dir_path.startswith("/"): + dir_path = dir_path[1:] + # get list of objects with prefix response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=dir_path) - + # retrieve key values locations = [object['Key'] for object in response['Contents']] From d8b1feff07758e1eb9c546a8438e4c39d5463284 Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Mon, 7 Oct 2024 09:45:00 +0100 Subject: [PATCH 13/24] Improved style --- export_mods_main.py | 35 +++++----- src/outputs/export_files.py | 6 +- src/utils/s3_mods.py | 127 ++++++++++++++++++------------------ 3 files changed, 82 insertions(+), 86 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index d922b3939..42eb9fc37 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -7,7 +7,6 @@ if not my_wd.endswith(my_repo): os.chdir(my_repo) -from src.utils.helpers import tree_to_list from src.utils.singleton_boto import SingletonBoto config = { @@ -21,73 +20,71 @@ import src.utils.s3_mods as mods - - if __name__ == "__main__": - + my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v20.csv" my_dir = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" # # Checking that file exists # my_size = mods.rd_file_size(my_path) # print(f"File size is {my_size}") - + # # Deleting a file # status = mods.rd_delete_file(my_path) # if status: # print(f"File {my_path} successfully deleted") - + # # Calculating md5sum # my_sum = mods.rd_md5sum(my_path) # expected_output = "ea94424aceecf11c8a70d289e51c34ea" # print(type(my_sum)) # if expected_output == my_sum: # print("Same md5sum") - -# # Calculating rd_isdir + +# # Calculating rd_isdir # mydir = "bat" # response = mods.rd_isdir(mydir) - + # print("Got response") # print(response) - + # # Checking rd_isfile # response = mods.rd_isfile(my_path) # print(response) - + # # Checking that rd_stat_size works for files and directories # file_size = mods.rd_stat_size(my_path) # print(f"File {my_path} size is {file_size} bytes.") - + # dir_size = mods.rd_stat_size(my_dir) # print(f"Directory {my_dir} size is {dir_size} bytes.") # # Testing rd_read_header  # response = mods.rd_read_header(my_path) # print(response) - + # Testing rd_write_string_to_file out_path = "/bat/res_dev/project_data/write_string_test.txt" content = "Some content" mods.rd_write_string_to_file(content.encode(encoding, "utf-8"), out_path) print("all done") - + # # Testing rd_copy_file # src_path = "/bat/res_dev/project_data/write_string_test.txt" - # dst_path = "/bat/res_dev/write_string_test_copy.txt" + # dst_path = "/bat/res_dev/" # success = mods.rd_copy_file(src_path, dst_path) # if success: # print("File copied successfully") - + # # Testing rd_move_file # src_path = "/bat/res_dev/write_string_test_copy.txt" - # dst_path = "/bat/res_dev/project_data/write_string_moved.txt" + # dst_path = "/bat/res_dev/project_data/" # success = mods.rd_move_file(src_path, dst_path) # if success: # print("File moved successfully") - + # # Testing rd_search_file # dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" # ending = "24-10-02_v20.csv" - + # found_file = mods.rd_search_file(dir_path, ending) # print(f"Found file: {found_file}") diff --git a/src/outputs/export_files.py b/src/outputs/export_files.py index c3b09a223..1b19b3e16 100644 --- a/src/outputs/export_files.py +++ b/src/outputs/export_files.py @@ -152,8 +152,6 @@ def transfer_files(source, destination, method, logger, copy_files, move_files): """ transfer_func = {"copy": copy_files, "move": move_files}[method] past_tense = {"copy": "copied", "move": "moved"}[method] - from ipdb import set_trace - set_trace() transfer_func(str(source), destination) logger.info(f"Files {source} successfully {past_tense} to {destination}.") @@ -234,7 +232,7 @@ def run_export(user_config_path: str, dev_config_path: str): boto3_client = SingletonBoto.get_client(config) # noqa from src.utils import s3_mods as mods - + elif platform == "network": # If the platform is "network" or "hdfs", there is no need for a client. # Adding a client = None for consistency. @@ -305,7 +303,7 @@ def run_export(user_config_path: str, dev_config_path: str): # Copy or Move files to outgoing folder file_transfer_method = config["export_choices"]["copy_or_move_files"] - + for file_path in file_select_dict.values(): file_path = os.path.join(file_path) transfer_files( diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index a469f59a6..fdc37a416 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -44,6 +44,7 @@ s3_client = SingletonBoto.get_client() s3_bucket = SingletonBoto.get_bucket() + # Read a CSV file into a Pandas dataframe def rd_read_csv(filepath: str, **kwargs) -> pd.DataFrame: """Reads a csv from s3 bucket into a Pandas Dataframe using boto3. @@ -172,6 +173,7 @@ def rd_read_feather(filepath): """Placeholder Function to read feather file from HDFS""" return None + def rd_file_size(filepath: str) -> int: """Function to check the size of a file on s3 bucket. @@ -188,6 +190,7 @@ def rd_file_size(filepath: str) -> int: return file_size + def rd_delete_file(filepath: str) -> bool: """ Delete a file from s3 bucket. @@ -199,6 +202,7 @@ def rd_delete_file(filepath: str) -> bool: status = delete_file(s3_client, s3_bucket, filepath) return status + def rd_md5sum(filepath: str) -> str: """ Get md5sum of a specific file on s3. @@ -218,10 +222,11 @@ def rd_md5sum(filepath: str) -> str: md5result = None return md5result -def rd_isdir(dirpath: str): # -> bool: + +def rd_isdir(dirpath: str) -> bool: """ Test if directory exists in s3 bucket. - + Args: dirpath (string): The "directory" path in s3 bucket. Returns: @@ -230,30 +235,35 @@ def rd_isdir(dirpath: str): # -> bool: """ # The directory name must end with forward slash if not dirpath.endswith('/'): - path = dirpath + '/' - + dirpath = dirpath + '/' + # Any slashes at the beginning should be removed while dirpath.startswith('/'): dirpath = dirpath[1:] # Use the function from rdsa_utils - response = is_s3_directory(client=s3_client, bucket_name=s3_bucket, object_name=dirpath) + response = is_s3_directory( + client=s3_client, + bucket_name=s3_bucket, + object_name=dirpath + ) return response -def rd_isfile(filepath: str): # -> bool: + +def rd_isfile(filepath: str) -> bool: """ - Test if given path is a file in s3 bucket. Check that it exists, not a directory and - the size is greater than 0. - + Test if given path is a file in s3 bucket. Check that it exists, not a + directory and the size is greater than 0. + Args: filepath (string): The "directory" path in s3 bucket. Returns: status (bool): True if the dirpath is a directory, false otherwise. - + """ if filepath is None: response = False - + if rd_file_exists(filepath): isdir = rd_isdir(filepath) size = rd_file_size(filepath) @@ -261,7 +271,8 @@ def rd_isfile(filepath: str): # -> bool: else: response = False return response - + + def rd_stat_size(path: str) -> int: """ Gets the file size of a file or directory in bytes. @@ -270,35 +281,37 @@ def rd_stat_size(path: str) -> int: """ return rd_file_size(path) + def rd_read_header(path: str) -> str: """ Reads the first line of a file on s3. Gets the entire file using boto3 get_objects, converts its body into an input stream, reads the first line and remove the carriage return character (backslash-n) from the end. - + Args: filepath (string): The "directory" path in s3 bucket. - + Returns: status (bool): True if the dirpath is a directory, false otherwise. """ # Create an input/output stream pointer, same as open stream = TextIOWrapper(s3_client.get_object(Bucket=s3_bucket, Key=path)['Body']) - + # Read the first line from the stream response = stream.readline() - + # Remove the last character (carriage return, or new line) response = response[:-1] - + return response - + + def rd_write_string_to_file(content: bytes, filepath: str): """ Writes a string into the specified file path """ - - # Put context to a new Input-Output buffer + + # Put context to a new Input-Output buffer str_buffer = StringIO(content.decode("utf-8")) # "Rewind" the stream to the start of the buffer @@ -310,12 +323,13 @@ def rd_write_string_to_file(content: bytes, filepath: str): ) return None + def _path_long2short(path: "str") -> str: """ - Extracts a short file name from the full path. + Extracts a short file name from the full path. If there is at least one forward slash, finds the lates slash to the right and rerurns all characrers afrer it. - + If there are no slashes, returns the path as is. """ if "/" in path: @@ -324,17 +338,19 @@ def _path_long2short(path: "str") -> str: else: return path + def _remove_end_slashes(path: "str") -> str: """ Removes any amount of consequitive forward slashes from a path. """ while path.endswith("/"): path = path[:-1] - + return path + def rd_copy_file(src_path: str, dst_path: str) -> bool: - + dst_path = _remove_end_slashes(dst_path) dst_path += "/" + _path_long2short(src_path) """ @@ -346,13 +362,14 @@ def rd_copy_file(src_path: str, dst_path: str) -> bool: source_object_name=src_path, destination_bucket_name=s3_bucket, destination_object_name=dst_path, - ) + ) return success + def rd_move_file(src_path: str, dst_path: str) -> bool: """ Move a file from one location to another. Uses rdsa_utils. - + """ dst_path = _remove_end_slashes(dst_path) dst_path += "/" + _path_long2short(src_path) @@ -365,68 +382,70 @@ def rd_move_file(src_path: str, dst_path: str) -> bool: ) return success -# Function to replicate os.walk behavior + def s3walk(locations: list, prefix: str) -> tuple: """ Mimics the functionality of os.walk in s3 bucket using long filenames with slashes. - Recursively goes through the long filenames and splits it into "locations" - + Recursively goes through the long filenames and splits it into "locations" - subdirectories, and "files" - short file names. Args: locations (list): a list of s3 locations that can be "directories" - prefix (str): Name of "subdirectory" of root where further locations will be found. - + prefix (str): Name of "subdirectory" of root where further locations + will be found. + Returns: A tuple of (root, (subdir, files)). """ # recursively add location to roots starting from prefix - def processLocation( root, prefixLocal, location): + def processLocation(root, prefixLocal, location): # add new root location if not available - if not prefixLocal in root: - root[prefixLocal]=(set(),set()) + if prefixLocal not in root: + root[prefixLocal] = (set(), set()) # check how many folders are available after prefix remainder = location[len(prefixLocal):] structure = remainder.split('/') - #if we are not yet in the folder of the file we need to continue with a larger prefix - if len(structure)>1: + + # If we are not yet in the folder of the file we need to continue with + # a larger prefix + if len(structure) > 1: # add folder dir root[prefixLocal][0].add(structure[0]) - #make sure file is added allong the way - processLocation(root, prefixLocal+'/'+structure[0],location ) + # make sure file is added allong the way + processLocation(root, prefixLocal + '/' + structure[0], location) else: # add to file root[prefixLocal][1].add(structure[0]) - root={} + root = {} for location in locations: - processLocation(root,prefix,location) + processLocation(root, prefix, location) return root.items() - def rd_search_file(dir_path: str, ending: str) -> str: """Find a file in a directory with a specific ending. - + Args: dir_path (str): s3 "directory" where to search for files ending (str): File name ending to search for. Returns: Full file name that ends with the given string. - + """ target_file = None - + # Remove preceding forward slashes if needed while dir_path.startswith("/"): dir_path = dir_path[1:] - + # get list of objects with prefix response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=dir_path) # retrieve key values locations = [object['Key'] for object in response['Contents']] - + for _, (__, files) in s3walk(locations, dir_path): for file in files: @@ -434,21 +453,3 @@ def rd_search_file(dir_path: str, ending: str) -> str: if file.endswith(ending): target_file = str(file) return target_file - - - - - - - - - - - - - - - - - - From d846802aeee282b323e641d82a00bdb5a0cbd7e6 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Mon, 7 Oct 2024 08:52:02 +0000 Subject: [PATCH 14/24] Fixed encoding in write string --- export_mods_main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 42eb9fc37..84e2d81de 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -64,8 +64,8 @@ # Testing rd_write_string_to_file out_path = "/bat/res_dev/project_data/write_string_test.txt" - content = "Some content" - mods.rd_write_string_to_file(content.encode(encoding, "utf-8"), out_path) + content = "New content" + mods.rd_write_string_to_file(content.encode(encoding="utf-8"), out_path) print("all done") # # Testing rd_copy_file From 1f0514e3a9efc601ffc31563495cb6d072ac5c54 Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Mon, 7 Oct 2024 09:56:11 +0100 Subject: [PATCH 15/24] Changed platform to network --- src/dev_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dev_config.yaml b/src/dev_config.yaml index 124bb8c69..f168b1dc9 100644 --- a/src/dev_config.yaml +++ b/src/dev_config.yaml @@ -8,7 +8,7 @@ global: table_config: "SingleLine" # Environment settings dev_test : False - platform: s3 #whether to load from hdfs, network (Windows) or s3 (CDP) + platform: network #whether to load from hdfs, network (Windows) or s3 (CDP) load_from_feather: False runlog_writer: write_csv: True # Write the runlog to a CSV file From dd8417e8f5409c3c1ffec2bb0f7d4255ddc63a3f Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Tue, 8 Oct 2024 14:34:07 +0100 Subject: [PATCH 16/24] Added read_excel --- src/utils/s3_mods.py | 58 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index fdc37a416..4a1122972 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -35,6 +35,8 @@ is_s3_directory, copy_file, move_file, + validate_bucket_name, + validate_s3_file_path, ) from src.utils.singleton_boto import SingletonBoto # from src.utils.singleton_config import SingletonConfig @@ -453,3 +455,59 @@ def rd_search_file(dir_path: str, ending: str) -> str: if file.endswith(ending): target_file = str(file) return target_file + + +def read_excel( + filepath: str, + client: boto3.client = s3_client, + bucket_name: str = s3_bucket, + **kwargs, +) -> pd.DataFrame: + """ + Read an Excel file from s3 bucket into a Pandas dataframe. + + Parameters + ---------- + filepath : str + The filepath to save the dataframe to. + client : boto3.client + The boto3 S3 client instance. + bucket_name : str + The name of the S3 bucket. + kwargs : dict + Optional dictionary of Pandas read_excel keyword arguments. + + Returns + ------- + pd.DataFrame + A dataframe with data, if reading was successful. + + Raises + ------ + InvalidBucketNameError + If the bucket name is invalid according to AWS rules. + Exception + If there is an error reading from s3 or cnverting to Excel. + + """ + + bucket_name = validate_bucket_name(bucket_name) + filepath = validate_s3_file_path(filepath, allow_s3_scheme=False) + + try: + # Get the Excel file from S3 + response = client.get_object(Bucket=bucket_name, Key=filepath) + s3_logger.info( + f"Loaded Excel file from S3 bucket {bucket_name}, filepath {filepath}", + ) + + # Read the Excel file into a Pandas DataFrame + df = pd.read_excel(response["Body"], **kwargs) + + except Exception as e: + error_message = ( + f"Error loading file from bucket {bucket_name}, filepath {filepath}: {e}" + ) + s3_logger.error(error_message) + raise Exception(error_message) from e + return df From 8348617b0b89d2d4cab3d68974c5ada28c9aaed7 Mon Sep 17 00:00:00 2001 From: George Zorinyants Date: Tue, 8 Oct 2024 14:17:28 +0000 Subject: [PATCH 17/24] Added read_csv and openpyxl requirement --- export_mods_main.py | 15 ++++++++++----- requirements.txt | 1 + src/utils/s3_mods.py | 11 ++++++----- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 84e2d81de..7dadd2775 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -62,11 +62,11 @@ # response = mods.rd_read_header(my_path) # print(response) - # Testing rd_write_string_to_file - out_path = "/bat/res_dev/project_data/write_string_test.txt" - content = "New content" - mods.rd_write_string_to_file(content.encode(encoding="utf-8"), out_path) - print("all done") + # # Testing rd_write_string_to_file + # out_path = "/bat/res_dev/project_data/write_string_test.txt" + # content = "New content" + # mods.rd_write_string_to_file(content.encode(encoding="utf-8"), out_path) + # print("all done") # # Testing rd_copy_file # src_path = "/bat/res_dev/project_data/write_string_test.txt" @@ -88,3 +88,8 @@ # found_file = mods.rd_search_file(dir_path, ending) # print(f"Found file: {found_file}") + + # Testing read_excel + my_path = "bat/res_dev/project_data/test_excel_gz.xlsx" + df = mods.read_excel(my_path) + print(df.head()) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index fa25fa6da..d73ac17f4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ typing # boto3 # raz_client # rdsa-utils==2.0.2 +openpyxl diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 4a1122972..850f86a09 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -24,7 +24,7 @@ # Third party libraries import pandas as pd -from io import StringIO, TextIOWrapper +from io import StringIO, TextIOWrapper, BytesIO # Local libraries @@ -459,7 +459,7 @@ def rd_search_file(dir_path: str, ending: str) -> str: def read_excel( filepath: str, - client: boto3.client = s3_client, + client = s3_client, bucket_name: str = s3_bucket, **kwargs, ) -> pd.DataFrame: @@ -491,18 +491,19 @@ def read_excel( """ - bucket_name = validate_bucket_name(bucket_name) - filepath = validate_s3_file_path(filepath, allow_s3_scheme=False) +# bucket_name = validate_bucket_name(bucket_name) +# filepath = validate_s3_file_path(filepath, allow_s3_scheme=False) try: # Get the Excel file from S3 + response = client.get_object(Bucket=bucket_name, Key=filepath) s3_logger.info( f"Loaded Excel file from S3 bucket {bucket_name}, filepath {filepath}", ) # Read the Excel file into a Pandas DataFrame - df = pd.read_excel(response["Body"], **kwargs) + df = pd.read_excel(BytesIO(response['Body'].read()), **kwargs) except Exception as e: error_message = ( From 5ad9092c38406a69237d3c466c14971a7d654b30 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 22 Oct 2024 10:37:05 +0000 Subject: [PATCH 18/24] update user and dev config for s3 testing --- src/dev_config.yaml | 2 +- src/user_config.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dev_config.yaml b/src/dev_config.yaml index f168b1dc9..0b7e4c973 100644 --- a/src/dev_config.yaml +++ b/src/dev_config.yaml @@ -8,7 +8,7 @@ global: table_config: "SingleLine" # Environment settings dev_test : False - platform: network #whether to load from hdfs, network (Windows) or s3 (CDP) + platform: s3 # network #whether to load from hdfs, network (Windows) or s3 (CDP) load_from_feather: False runlog_writer: write_csv: True # Write the runlog to a CSV file diff --git a/src/user_config.yaml b/src/user_config.yaml index 2f73f394d..9016febb8 100644 --- a/src/user_config.yaml +++ b/src/user_config.yaml @@ -151,7 +151,7 @@ export_choices: export_fte_total_qa: None export_status_filtered: None export_frozen_group: None - export_staged_BERD_full_responses: "2023_staged_BERD_full_responses_24-10-02_v20.csv" + export_staged_BERD_full_responses: "2023_staged_BERD_full_responses_24-10-14_v33.csv" export_staged_NI_full_responses: None export_full_responses_imputed: None export_full_estimation_qa: None # "2022_full_estimation_qa_24-07-15_v555.csv" From e45710c32c416609ae391e97f4dacd07807a8596 Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Tue, 22 Oct 2024 11:45:08 +0100 Subject: [PATCH 19/24] Changed platform to network so it can pass the unit test --- src/dev_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dev_config.yaml b/src/dev_config.yaml index 581bed023..a65f8ff2e 100644 --- a/src/dev_config.yaml +++ b/src/dev_config.yaml @@ -8,7 +8,7 @@ global: table_config: "SingleLine" # Environment settings dev_test : False - platform: s3 # network #whether to load from hdfs, network (Windows) or s3 (CDP) + platform: network # network #whether to load from hdfs, network (Windows) or s3 (CDP) load_from_feather: False runlog_writer: write_csv: True # Write the runlog to a CSV file From 2f553664a79e93c39ad4a3f9d325749af12f51d1 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 22 Oct 2024 14:40:32 +0000 Subject: [PATCH 20/24] update the export mod tests --- export_mods_main.py | 139 +++++++++++++++++++++++--------------------- 1 file changed, 73 insertions(+), 66 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 7dadd2775..31b959d98 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -23,73 +23,80 @@ if __name__ == "__main__": my_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_24-10-02_v20.csv" +# to_delete_path = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/2023_staged_BERD_full_responses_test_to_delete.csv" my_dir = "/bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" # # Checking that file exists -# my_size = mods.rd_file_size(my_path) -# print(f"File size is {my_size}") - -# # Deleting a file -# status = mods.rd_delete_file(my_path) -# if status: -# print(f"File {my_path} successfully deleted") - - # # Calculating md5sum - # my_sum = mods.rd_md5sum(my_path) - # expected_output = "ea94424aceecf11c8a70d289e51c34ea" - # print(type(my_sum)) - # if expected_output == my_sum: - # print("Same md5sum") - -# # Calculating rd_isdir -# mydir = "bat" -# response = mods.rd_isdir(mydir) - -# print("Got response") -# print(response) - - # # Checking rd_isfile - # response = mods.rd_isfile(my_path) - # print(response) - -# # Checking that rd_stat_size works for files and directories -# file_size = mods.rd_stat_size(my_path) -# print(f"File {my_path} size is {file_size} bytes.") - -# dir_size = mods.rd_stat_size(my_dir) -# print(f"Directory {my_dir} size is {dir_size} bytes.") - - # # Testing rd_read_header  - # response = mods.rd_read_header(my_path) - # print(response) - - # # Testing rd_write_string_to_file - # out_path = "/bat/res_dev/project_data/write_string_test.txt" - # content = "New content" - # mods.rd_write_string_to_file(content.encode(encoding="utf-8"), out_path) - # print("all done") - - # # Testing rd_copy_file - # src_path = "/bat/res_dev/project_data/write_string_test.txt" - # dst_path = "/bat/res_dev/" - # success = mods.rd_copy_file(src_path, dst_path) - # if success: - # print("File copied successfully") - - # # Testing rd_move_file - # src_path = "/bat/res_dev/write_string_test_copy.txt" - # dst_path = "/bat/res_dev/project_data/" - # success = mods.rd_move_file(src_path, dst_path) - # if success: - # print("File moved successfully") - -# # Testing rd_search_file -# dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" -# ending = "24-10-02_v20.csv" - -# found_file = mods.rd_search_file(dir_path, ending) -# print(f"Found file: {found_file}") + my_size = mods.rd_file_size(my_path) + print(f"File size is {my_size}") + + # Calculating md5sum + my_sum = mods.rd_md5sum(my_path) + expected_output = "ea94424aceecf11c8a70d289e51c34ea" + print(type(my_sum)) + if expected_output == my_sum: + print("Same md5sum") + + # Calculating rd_isdir + mydir = "/bat" + response = mods.rd_isdir(mydir) + + print("Got response") + print(response) + + # Checking rd_isfile + response = mods.rd_isfile(my_path) + print(response) + + # Checking that rd_stat_size works for files and directories + file_size = mods.rd_stat_size(my_path) + print(f"File {my_path} size is {file_size} bytes.") + + dir_size = mods.rd_stat_size(my_dir) + print(f"Directory {my_dir} size is {dir_size} bytes.") + + # Testing rd_read_header  + response = mods.rd_read_header(my_path) + print(response) + + # Testing rd_write_string_to_file + out_path = "/bat/res_dev/project_data/new_write_string_test_2.txt" + content = "New content" + mods.rd_write_string_to_file(content.encode(encoding="utf-8"), out_path) + print("all done") + + # Testing rd_copy_file + src_path = "/bat/res_dev/project_data/new_write_string_test_2.txt" + dst_path = "/bat/res_dev/" + success = mods.rd_copy_file(src_path, dst_path) + if success: + print("File copied successfully") + else: + print("File not copied successfully") + + + # Testing rd_move_file + src_path = "/bat/res_dev/new_write_string_test_2.txt" + dst_path = "/bat/res_dev/project_data/" + success = mods.rd_move_file(src_path, dst_path) + if success: + print("File moved successfully") + else: + print("File not moved successfully") + + + # Testing rd_search_file + dir_path = "bat/res_dev/project_data/2023_surveys/BERD/01_staging/staging_qa/full_responses_qa/" + ending = "24-10-02_v20.csv" + + found_file = mods.rd_search_file(dir_path, ending) + print(f"Found file: {found_file}") + + # Deleting a file +# status = mods.rd_delete_file(my_path) +# if status: +# print(f"File {to_delete_path} successfully deleted") # Testing read_excel - my_path = "bat/res_dev/project_data/test_excel_gz.xlsx" - df = mods.read_excel(my_path) - print(df.head()) \ No newline at end of file +# my_path = "bat/res_dev/project_data/test_excel_gz.xlsx" +# df = mods.read_excel(my_path) +# print(df.head()) \ No newline at end of file From e374ef916f0722c5445da010a0f78b8e8c93fdae Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Tue, 22 Oct 2024 16:14:57 +0100 Subject: [PATCH 21/24] Removed red_excel function from s3_mods and openpyxl from requirements; enabled boto3, raz_client and rdsa-utils in the requirements --- requirements.txt | 7 +++--- src/utils/s3_mods.py | 57 -------------------------------------------- 2 files changed, 3 insertions(+), 61 deletions(-) diff --git a/requirements.txt b/requirements.txt index d73ac17f4..c50139ca9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,6 @@ setuptools toml tomli==2.0.1 typing -# boto3 -# raz_client -# rdsa-utils==2.0.2 -openpyxl +boto3 +raz_client +rdsa-utils==2.0.2 diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index 850f86a09..e74d84025 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -455,60 +455,3 @@ def rd_search_file(dir_path: str, ending: str) -> str: if file.endswith(ending): target_file = str(file) return target_file - - -def read_excel( - filepath: str, - client = s3_client, - bucket_name: str = s3_bucket, - **kwargs, -) -> pd.DataFrame: - """ - Read an Excel file from s3 bucket into a Pandas dataframe. - - Parameters - ---------- - filepath : str - The filepath to save the dataframe to. - client : boto3.client - The boto3 S3 client instance. - bucket_name : str - The name of the S3 bucket. - kwargs : dict - Optional dictionary of Pandas read_excel keyword arguments. - - Returns - ------- - pd.DataFrame - A dataframe with data, if reading was successful. - - Raises - ------ - InvalidBucketNameError - If the bucket name is invalid according to AWS rules. - Exception - If there is an error reading from s3 or cnverting to Excel. - - """ - -# bucket_name = validate_bucket_name(bucket_name) -# filepath = validate_s3_file_path(filepath, allow_s3_scheme=False) - - try: - # Get the Excel file from S3 - - response = client.get_object(Bucket=bucket_name, Key=filepath) - s3_logger.info( - f"Loaded Excel file from S3 bucket {bucket_name}, filepath {filepath}", - ) - - # Read the Excel file into a Pandas DataFrame - df = pd.read_excel(BytesIO(response['Body'].read()), **kwargs) - - except Exception as e: - error_message = ( - f"Error loading file from bucket {bucket_name}, filepath {filepath}: {e}" - ) - s3_logger.error(error_message) - raise Exception(error_message) from e - return df From 8f2187dbbb055f9d35103bbee58446218167c7d7 Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Tue, 22 Oct 2024 16:29:25 +0100 Subject: [PATCH 22/24] Updated rd_copy_file doc string --- export_mods_main.py | 3 ++- src/utils/s3_mods.py | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/export_mods_main.py b/export_mods_main.py index 31b959d98..f788f6713 100644 --- a/export_mods_main.py +++ b/export_mods_main.py @@ -99,4 +99,5 @@ # Testing read_excel # my_path = "bat/res_dev/project_data/test_excel_gz.xlsx" # df = mods.read_excel(my_path) -# print(df.head()) \ No newline at end of file +# print(df.head()) + \ No newline at end of file diff --git a/src/utils/s3_mods.py b/src/utils/s3_mods.py index e74d84025..c36df5e4c 100644 --- a/src/utils/s3_mods.py +++ b/src/utils/s3_mods.py @@ -330,7 +330,7 @@ def _path_long2short(path: "str") -> str: """ Extracts a short file name from the full path. If there is at least one forward slash, finds the lates slash to the right - and rerurns all characrers afrer it. + and rerurns all characrers after it. If there are no slashes, returns the path as is. """ @@ -352,12 +352,40 @@ def _remove_end_slashes(path: "str") -> str: def rd_copy_file(src_path: str, dst_path: str) -> bool: - - dst_path = _remove_end_slashes(dst_path) - dst_path += "/" + _path_long2short(src_path) """ Copy a file from one location to another. Uses rdsa_utils. + If destination path ends with any number of forward slashes, they are + removed. This is needed for the library method copy_file to work correctly. + + Library method copy_file requires that the paths are file paths: + old_dir/old.file and new_dir/new.file. The rd_copy_file takes full file name + with the full file path as a source, and just a directory path as a + destination, like this: old_dir/old.file and new_dir/ or new_dir without the + slash at the end. old.file will become new_dir/old.file, i.e. the file is + copied with the same name, not renamed. + Supplementary function _path_long2short decouples old.file from the full + source path and "glues it" to the end of destination path. + + Args: + src_path (string): Full path of the source file, not including the + bucket name, but including the quasi-directories and slashes preceding + the file name. + + dst_path (string): Full path of the destination directory, not including + bucket name, but including the quasi-directories and slashes preceding + the file name. It must be a directory, not a file. I + + Returns: + status (bool): True if copying was successful, False otherwise. """ + + # If destination ends with any number of slashes, they are removed + dst_path = _remove_end_slashes(dst_path) + + # Disconnect the source file name from the full source path and adds it tp + # the end of destination directory, separated by one forward slash. + dst_path += "/" + _path_long2short(src_path) + success = copy_file( client=s3_client, source_bucket_name=s3_bucket, From 1aef038dbb03cfc193f0d259f170db721a8753a2 Mon Sep 17 00:00:00 2001 From: Zorinyants Date: Tue, 22 Oct 2024 16:31:31 +0100 Subject: [PATCH 23/24] Commented three requirements so the unit test can pass --- requirements.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index c50139ca9..fa25fa6da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,6 +17,6 @@ setuptools toml tomli==2.0.1 typing -boto3 -raz_client -rdsa-utils==2.0.2 +# boto3 +# raz_client +# rdsa-utils==2.0.2 From 545784c9bea9b5ce399207fde86819fc6b251f69 Mon Sep 17 00:00:00 2001 From: Anne Griffith Date: Tue, 22 Oct 2024 16:36:17 +0100 Subject: [PATCH 24/24] update version to 2.2.0 --- src/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/_version.py b/src/_version.py index 4eabd0b3f..8a124bf64 100644 --- a/src/_version.py +++ b/src/_version.py @@ -1 +1 @@ -__version__ = "2.1.2" +__version__ = "2.2.0"