From d35e9ac6cffaba82af761b61ea872cb8feb0b67f Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 15:38:34 -0700 Subject: [PATCH 01/12] Refactor the init process by declaring paralellization variables after properties have been checked --- src/hashstore/filehashstore.py | 104 ++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 47 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 8e2f095a..9992656e 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -80,53 +80,6 @@ class FileHashStore(HashStore): ] def __init__(self, properties=None): - # Variables to orchestrate parallelization - # Check to see whether a multiprocessing or threading sync lock should be used - self.use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" - if self.use_multiprocessing == "True": - # Create multiprocessing synchronization variables - # Synchronization values for object locked pids - self.object_pid_lock_mp = multiprocessing.Lock() - self.object_pid_condition_mp = multiprocessing.Condition( - self.object_pid_lock_mp - ) - self.object_locked_pids_mp = multiprocessing.Manager().list() - # Synchronization values for object locked cids - self.object_cid_lock_mp = multiprocessing.Lock() - self.object_cid_condition_mp = multiprocessing.Condition( - self.object_cid_lock_mp - ) - self.object_locked_cids_mp = multiprocessing.Manager().list() - # Synchronization values for metadata locked documents - self.metadata_lock_mp = multiprocessing.Lock() - self.metadata_condition_mp = multiprocessing.Condition( - self.metadata_lock_mp - ) - self.metadata_locked_docs_mp = multiprocessing.Manager().list() - # Synchronization values for reference locked pids - self.reference_pid_lock_mp = multiprocessing.Lock() - self.reference_pid_condition_mp = multiprocessing.Condition( - self.reference_pid_lock_mp - ) - self.reference_locked_pids_mp = multiprocessing.Manager().list() - else: - # Create threading synchronization variables - # Synchronization values for object locked pids - self.object_pid_lock_th = threading.Lock() - self.object_pid_condition_th = threading.Condition(self.object_pid_lock_th) - self.object_locked_pids_th = [] - # Synchronization values for object locked cids - self.object_cid_lock_th = threading.Lock() - self.object_cid_condition_th = threading.Condition(self.object_cid_lock_th) - self.object_locked_cids_th = [] - # Synchronization values for metadata locked documents - self.metadata_lock_th = threading.Lock() - self.metadata_condition_th = threading.Condition(self.metadata_lock_th) - self.metadata_locked_docs_th = [] - # Synchronization values for reference locked pids - self.reference_pid_lock_th = threading.Lock() - self.reference_pid_condition_th = threading.Condition(self.metadata_lock_th) - self.reference_locked_pids_th = [] # Now check properties if properties: # Validate properties against existing configuration if present @@ -176,6 +129,63 @@ def __init__(self, properties=None): self._create_path(self.refs / "tmp") self._create_path(self.refs / "pids") self._create_path(self.refs / "cids") + + # Variables to orchestrate parallelization + # Check to see whether a multiprocessing or threading sync lock should be used + self.use_multiprocessing = ( + os.getenv("USE_MULTIPROCESSING", "False") == "True" + ) + if self.use_multiprocessing == "True": + # Create multiprocessing synchronization variables + # Synchronization values for object locked pids + self.object_pid_lock_mp = multiprocessing.Lock() + self.object_pid_condition_mp = multiprocessing.Condition( + self.object_pid_lock_mp + ) + self.object_locked_pids_mp = multiprocessing.Manager().list() + # Synchronization values for object locked cids + self.object_cid_lock_mp = multiprocessing.Lock() + self.object_cid_condition_mp = multiprocessing.Condition( + self.object_cid_lock_mp + ) + self.object_locked_cids_mp = multiprocessing.Manager().list() + # Synchronization values for metadata locked documents + self.metadata_lock_mp = multiprocessing.Lock() + self.metadata_condition_mp = multiprocessing.Condition( + self.metadata_lock_mp + ) + self.metadata_locked_docs_mp = multiprocessing.Manager().list() + # Synchronization values for reference locked pids + self.reference_pid_lock_mp = multiprocessing.Lock() + self.reference_pid_condition_mp = multiprocessing.Condition( + self.reference_pid_lock_mp + ) + self.reference_locked_pids_mp = multiprocessing.Manager().list() + else: + # Create threading synchronization variables + # Synchronization values for object locked pids + self.object_pid_lock_th = threading.Lock() + self.object_pid_condition_th = threading.Condition( + self.object_pid_lock_th + ) + self.object_locked_pids_th = [] + # Synchronization values for object locked cids + self.object_cid_lock_th = threading.Lock() + self.object_cid_condition_th = threading.Condition( + self.object_cid_lock_th + ) + self.object_locked_cids_th = [] + # Synchronization values for metadata locked documents + self.metadata_lock_th = threading.Lock() + self.metadata_condition_th = threading.Condition(self.metadata_lock_th) + self.metadata_locked_docs_th = [] + # Synchronization values for reference locked pids + self.reference_pid_lock_th = threading.Lock() + self.reference_pid_condition_th = threading.Condition( + self.metadata_lock_th + ) + self.reference_locked_pids_th = [] + logging.debug( "FileHashStore - Initialization success. Store root: %s", self.root ) From dedd3a537c9b18192e20f6b0981b19d53526b49b Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 15:42:16 -0700 Subject: [PATCH 02/12] Begin clean up logging statements by adding new method '_get_logger' to get a logger instance for the 'filehashstore' module name and revise init process --- src/hashstore/filehashstore.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 9992656e..f2295fe3 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -80,6 +80,7 @@ class FileHashStore(HashStore): ] def __init__(self, properties=None): + self.fhs_logger = logging.getLogger(__name__) # Now check properties if properties: # Validate properties against existing configuration if present @@ -100,7 +101,7 @@ def __init__(self, properties=None): self._verify_hashstore_properties(properties, prop_store_path) # If no exceptions thrown, FileHashStore ready for initialization - logging.debug("FileHashStore - Initializing, properties verified.") + self.fhs_logger.debug("Initializing, properties verified.") self.root = Path(prop_store_path) self.depth = prop_store_depth self.width = prop_store_width @@ -108,8 +109,8 @@ def __init__(self, properties=None): # Write 'hashstore.yaml' to store path if not os.path.isfile(self.hashstore_configuration_yaml): # pylint: disable=W1201 - logging.debug( - "FileHashStore - HashStore does not exist & configuration file not found." + self.fhs_logger.debug( + "HashStore does not exist & configuration file not found." + " Writing configuration file." ) self._write_properties(properties) @@ -186,16 +187,13 @@ def __init__(self, properties=None): ) self.reference_locked_pids_th = [] - logging.debug( - "FileHashStore - Initialization success. Store root: %s", self.root - ) + self.fhs_logger.debug("Initialization success. Store root: %s", self.root) else: # Cannot instantiate or initialize FileHashStore without config exception_string = ( - "FileHashStore - HashStore properties must be supplied." - + f" Properties: {properties}" + "HashStore properties must be supplied." + f" Properties: {properties}" ) - logging.debug(exception_string) + self.fhs_logger.debug(exception_string) raise ValueError(exception_string) # Configuration and Related Methods From 71827f578044f7045696a3291af3bf2376b07b98 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 16:07:08 -0700 Subject: [PATCH 03/12] Clean up code by renaming all 'exception_message' variables with 'err_msg' --- src/hashstore/filehashstore.py | 268 ++++++++++++++++----------------- 1 file changed, 130 insertions(+), 138 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index f2295fe3..51299cdb 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -190,11 +190,11 @@ def __init__(self, properties=None): self.fhs_logger.debug("Initialization success. Store root: %s", self.root) else: # Cannot instantiate or initialize FileHashStore without config - exception_string = ( + err_msg = ( "HashStore properties must be supplied." + f" Properties: {properties}" ) - self.fhs_logger.debug(exception_string) - raise ValueError(exception_string) + self.fhs_logger.debug(err_msg) + raise ValueError(err_msg) # Configuration and Related Methods @@ -211,12 +211,12 @@ def _load_properties( - store_metadata_namespace (str): Namespace for the HashStore's system metadata. """ if not os.path.isfile(hashstore_yaml_path): - exception_string = ( + err_msg = ( "FileHashStore - load_properties: hashstore.yaml not found" + " in store root path." ) - logging.critical(exception_string) - raise FileNotFoundError(exception_string) + logging.critical(err_msg) + raise FileNotFoundError(err_msg) # Open file with open(hashstore_yaml_path, "r", encoding="utf-8") as hs_yaml_file: @@ -244,12 +244,12 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: """ # If hashstore.yaml already exists, must throw exception and proceed with caution if os.path.isfile(self.hashstore_configuration_yaml): - exception_string = ( + err_msg = ( "FileHashStore - write_properties: configuration file 'hashstore.yaml'" + " already exists." ) - logging.error(exception_string) - raise FileExistsError(exception_string) + logging.error(err_msg) + raise FileExistsError(err_msg) # Validate properties checked_properties = self._validate_properties(properties) @@ -265,14 +265,14 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: if store_algorithm in accepted_store_algorithms: checked_store_algorithm = store_algorithm else: - exception_string = ( + err_msg = ( f"FileHashStore - write_properties: algorithm supplied ({store_algorithm})" f" cannot be used as default for HashStore. Must be one of: " + f"{', '.join(accepted_store_algorithms)}" f" which are DataONE controlled algorithm values" ) - logging.error(exception_string) - raise ValueError(exception_string) + logging.error(err_msg) + raise ValueError(err_msg) # If given store path doesn't exist yet, create it. if not os.path.exists(self.root): @@ -455,13 +455,13 @@ def _verify_hashstore_properties( if key == "store_depth" or key == "store_width": supplied_key = int(properties[key]) if hashstore_yaml_dict[key] != supplied_key: - exception_string = ( + err_msg = ( f"FileHashStore - Given properties ({key}: {properties[key]}) does not" + f" match. HashStore configuration ({key}: {hashstore_yaml_dict[key]})" + f" found at: {self.hashstore_configuration_yaml}" ) - logging.critical(exception_string) - raise ValueError(exception_string) + logging.critical(err_msg) + raise ValueError(err_msg) else: if os.path.exists(prop_store_path): # Check if HashStore exists and throw exception if found @@ -470,14 +470,14 @@ def _verify_hashstore_properties( os.path.isdir(os.path.join(prop_store_path, sub)) for sub in subfolders ): - exception_string = ( + err_msg = ( "FileHashStore - Unable to initialize HashStore. `hashstore.yaml` is not" + " present but conflicting HashStore directory exists. Please delete" + " '/objects', '/metadata' and/or '/refs' at the store path or supply" + " a new path." ) - logging.critical(exception_string) - raise RuntimeError(exception_string) + logging.critical(err_msg) + raise RuntimeError(err_msg) def _validate_properties( self, properties: Dict[str, Union[str, int]] @@ -493,33 +493,33 @@ def _validate_properties( :return: The given properties object (that has been validated). """ if not isinstance(properties, dict): - exception_string = ( + err_msg = ( "FileHashStore - _validate_properties: Invalid argument -" + " expected a dictionary." ) - logging.debug(exception_string) - raise ValueError(exception_string) + logging.debug(err_msg) + raise ValueError(err_msg) # New dictionary for validated properties checked_properties = {} for key in self.property_required_keys: if key not in properties: - exception_string = ( + err_msg = ( "FileHashStore - _validate_properties: Missing required" + f" key: {key}." ) - logging.debug(exception_string) - raise KeyError(exception_string) + logging.debug(err_msg) + raise KeyError(err_msg) value = properties.get(key) if value is None: - exception_string = ( + err_msg = ( "FileHashStore - _validate_properties: Value for key:" + f" {key} is none." ) - logging.debug(exception_string) - raise ValueError(exception_string) + logging.debug(err_msg) + raise ValueError(err_msg) # Add key and values to checked_properties if key == "store_depth" or key == "store_width": @@ -527,13 +527,13 @@ def _validate_properties( try: checked_properties[key] = int(value) except Exception as err: - exception_string = ( + err_msg = ( "FileHashStore - _validate_properties: Unexpected exception when" " attempting to ensure store depth and width are integers. Details: " + str(err) ) - logging.debug(exception_string) - raise ValueError(exception_string) + logging.debug(err_msg) + raise ValueError(err_msg) else: checked_properties[key] = value @@ -556,12 +556,12 @@ def lookup_algo(algo_to_translate): return dataone_algo_translation[algo_to_translate] if not os.path.isfile(self.hashstore_configuration_yaml): - exception_string = ( + err_msg = ( "FileHashStore - set_default_algorithms: hashstore.yaml not found" + " in store root path." ) - logging.critical(exception_string) - raise FileNotFoundError(exception_string) + logging.critical(err_msg) + raise FileNotFoundError(err_msg) with open( self.hashstore_configuration_yaml, "r", encoding="utf-8" @@ -662,12 +662,12 @@ def store_object( pid, ) except Exception as err: - exception_string = ( + err_msg = ( f"FileHashStore - store_object: failed to store object for pid: {pid}." + " Reference files will not be created or tagged. Unexpected error: " + str(err) ) - logging.error(exception_string) + logging.error(err_msg) raise err finally: # Release pid @@ -710,12 +710,12 @@ def delete_if_invalid_object( self._check_string(checksum_algorithm, "checksum_algorithm") self._check_integer(expected_file_size) if object_metadata is None or not isinstance(object_metadata, ObjectMetadata): - exception_string = ( + err_msg = ( "FileHashStore - verify_object: 'object_metadata' cannot be None." + " Must be a 'ObjectMetadata' object." ) - logging.error(exception_string) - raise ValueError(exception_string) + logging.error(err_msg) + raise ValueError(err_msg) else: logging.info( "FileHashStore - verify_object: Called to verify object with id: %s", @@ -829,11 +829,9 @@ def retrieve_object(self, pid: str) -> IO[bytes]: ) obj_stream = self._open(entity, object_cid) else: - exception_string = ( - f"FileHashStore - retrieve_object: No object found for pid: {pid}" - ) - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = f"FileHashStore - retrieve_object: No object found for pid: {pid}" + logging.error(err_msg) + raise ValueError(err_msg) logging.info( "FileHashStore - retrieve_object: Retrieved object for pid: %s", pid ) @@ -866,11 +864,11 @@ def retrieve_metadata(self, pid: str, format_id: Optional[str] = None) -> IO[byt ) return metadata_stream else: - exception_string = ( + err_msg = ( f"FileHashStore - retrieve_metadata: No metadata found for pid: {pid}" ) - logging.error(exception_string) - raise ValueError(exception_string) + logging.error(err_msg) + raise ValueError(err_msg) def delete_object(self, pid: str) -> None: logging.debug( @@ -1205,11 +1203,9 @@ def get_hex_digest(self, pid: str, algorithm: str) -> str: algorithm = self._clean_algorithm(algorithm) object_cid = self._find_object(pid).get("cid") if not self._exists(entity, object_cid): - exception_string = ( - f"FileHashStore - get_hex_digest: No object found for pid: {pid}" - ) - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = f"FileHashStore - get_hex_digest: No object found for pid: {pid}" + logging.error(err_msg) + raise ValueError(err_msg) cid_stream = self._open(entity, object_cid) hex_digest = self._computehash(cid_stream, algorithm=algorithm) @@ -1404,11 +1400,11 @@ def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": return object_metadata # pylint: disable=W0718 except Exception as err: - exception_string = ( + err_msg = ( "FileHashStore - _store_data_only: failed to store object." + f" Unexpected {err=}, {type(err)=}" ) - logging.error(exception_string) + logging.error(err_msg) raise err def _move_and_get_checksums( @@ -1483,11 +1479,11 @@ def _move_and_get_checksums( shutil.move(tmp_file_name, abs_file_path) except Exception as err: # Revert storage process - exception_string = ( + err_msg = ( "FileHashStore - _move_and_get_checksums:" + f" Unexpected Error: {err}" ) - logging.warning(exception_string) + logging.warning(err_msg) if os.path.isfile(abs_file_path): # Check to see if object exists before determining whether to delete debug_msg = ( @@ -1498,12 +1494,12 @@ def _move_and_get_checksums( pid_checksum = self.get_hex_digest(pid, self.algorithm) if pid_checksum == hex_digests.get(self.algorithm): # If the checksums match, return and log warning - exception_string = ( + err_msg = ( "FileHashStore - _move_and_get_checksums: Object exists at:" + f" {abs_file_path} but an unexpected issue has been encountered." + " Reference files will not be created and/or tagged." ) - logging.warning(exception_string) + logging.warning(err_msg) raise err else: debug_msg = ( @@ -1545,22 +1541,22 @@ def _move_and_get_checksums( ) except NonMatchingObjSize as nmose: # If any exception is thrown during validation, we do not tag. - exception_string = ( + err_msg = ( f"FileHashStore - _move_and_get_checksums: Object already exists for pid: {pid}" + " , deleting temp file. Reference files will not be created and/or tagged" + f" due to an issue with the supplied pid object metadata. {str(nmose)}" ) - logging.debug(exception_string) - raise NonMatchingObjSize(exception_string) from nmose + logging.debug(err_msg) + raise NonMatchingObjSize(err_msg) from nmose except NonMatchingChecksum as nmce: # If any exception is thrown during validation, we do not tag. - exception_string = ( + err_msg = ( f"FileHashStore - _move_and_get_checksums: Object already exists for pid: {pid}" + " , deleting temp file. Reference files will not be created and/or tagged" + f" due to an issue with the supplied pid object metadata. {str(nmce)}" ) - logging.debug(exception_string) - raise NonMatchingChecksum(exception_string) from nmce + logging.debug(err_msg) + raise NonMatchingChecksum(err_msg) from nmce finally: # Ensure that the tmp file has been removed, the data object already exists, so it # is redundant. No exception is thrown so 'store_object' can proceed to tag object @@ -1635,19 +1631,19 @@ def _write_to_tmp_file_and_get_hex_digests( return hex_digest_dict, tmp.name, tmp_file_size # pylint: disable=W0718 except Exception as err: - exception_string = ( + err_msg = ( "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" + f" Unexpected {err=}, {type(err)=}" ) - logging.error(exception_string) + logging.error(err_msg) # pylint: disable=W0707,W0719 - raise Exception(exception_string) + raise Exception(err_msg) except KeyboardInterrupt: - exception_string = ( + err_msg = ( "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" + " Keyboard interruption by user." ) - logging.error(exception_string) + logging.error(err_msg) if os.path.isfile(tmp.name): os.remove(tmp.name) finally: @@ -1657,12 +1653,12 @@ def _write_to_tmp_file_and_get_hex_digests( os.remove(tmp.name) # pylint: disable=W0718 except Exception as err: - exception_string = ( + err_msg = ( "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" + f"Unexpected {err=} while attempting to" + f" delete tmp file: {tmp.name}, {type(err)=}" ) - logging.error(exception_string) + logging.error(err_msg) def _mktmpfile(self, path: Path) -> IO[bytes]: """Create a temporary file at the given path ready to be written. @@ -1973,10 +1969,10 @@ def _put_metadata( ) return full_path except Exception as err: - exception_string = ( + err_msg = ( f"FileHashStore - _put_metadata: Unexpected {err=}, {type(err)=}" ) - logging.error(exception_string) + logging.error(err_msg) if os.path.isfile(metadata_tmp): # Remove tmp metadata, calling app must re-upload logging.debug( @@ -1986,12 +1982,12 @@ def _put_metadata( self._delete("metadata", metadata_tmp) raise else: - exception_string = ( + err_msg = ( f"FileHashStore - _put_metadata: Attempt to move metadata for pid: {pid}" + f", but metadata temp file not found: {metadata_tmp}" ) - logging.error(exception_string) - raise FileNotFoundError(exception_string) + logging.error(err_msg) + raise FileNotFoundError(err_msg) def _mktmpmetadata(self, stream: "Stream") -> str: """Create a named temporary file with `stream` (metadata). @@ -2129,11 +2125,11 @@ def _write_refs_file(self, path: Path, ref_id: str, ref_type: str) -> str: return tmp_file_path except Exception as err: - exception_string = ( + err_msg = ( "FileHashStore - _write_refs_file: failed to write cid refs file for pid:" + f" {ref_id} into path: {path}. Unexpected {err=}, {type(err)=}" ) - logging.error(exception_string) + logging.error(err_msg) raise err def _update_refs_file( @@ -2151,12 +2147,12 @@ def _update_refs_file( ) logging.debug(debug_msg) if not os.path.isfile(refs_file_path): - exception_string = ( + err_msg = ( f"FileHashStore - _update_refs_file: {refs_file_path} does not exist." + f" Cannot {update_type} ref_id: {ref_id}" ) - logging.error(exception_string) - raise FileNotFoundError(exception_string) + logging.error(err_msg) + raise FileNotFoundError(err_msg) try: if update_type == "add": pid_found = self._is_string_in_refs_file(ref_id, refs_file_path) @@ -2186,11 +2182,11 @@ def _update_refs_file( ) logging.debug(debug_msg) except Exception as err: - exception_string = ( + err_msg = ( f"FileHashStore - _update_refs_file: failed to {update_type} for ref_id: {ref_id}" + f" at refs file: {refs_file_path}. Unexpected {err=}, {type(err)=}" ) - logging.error(exception_string) + logging.error(err_msg) raise err @staticmethod @@ -2235,22 +2231,22 @@ def _verify_object_information( """ if file_size_to_validate is not None and file_size_to_validate > 0: if file_size_to_validate != tmp_file_size: - exception_string = ( + err_msg = ( "FileHashStore - _verify_object_information: Object file size calculated: " + f" {tmp_file_size} does not match with expected size:" + f" {file_size_to_validate}." ) if pid is not None: self._delete(entity, tmp_file_name) - exception_string_for_pid = ( - exception_string + err_msg_for_pid = ( + err_msg + f" Tmp file deleted and file not stored for pid: {pid}" ) - logging.debug(exception_string_for_pid) - raise NonMatchingObjSize(exception_string_for_pid) + logging.debug(err_msg_for_pid) + raise NonMatchingObjSize(err_msg_for_pid) else: - logging.debug(exception_string) - raise NonMatchingObjSize(exception_string) + logging.debug(err_msg) + raise NonMatchingObjSize(err_msg) if checksum_algorithm is not None and checksum is not None: if checksum_algorithm not in hex_digests: # Check to see if it is a supported algorithm @@ -2269,19 +2265,19 @@ def _verify_object_information( cid_stream, algorithm=checksum_algorithm ) if hex_digest_calculated != checksum: - exception_string = ( + err_msg = ( "FileHashStore - _verify_object_information: checksum_algorithm" + f" ({checksum_algorithm}) cannot be found in the default hex digests" + f" dict, but is supported. New checksum calculated: " f"{hex_digest_calculated}, does not match what has been provided: " + checksum ) - logging.debug(exception_string) - raise NonMatchingChecksum(exception_string) + logging.debug(err_msg) + raise NonMatchingChecksum(err_msg) else: hex_digest_stored = hex_digests[checksum_algorithm] if hex_digest_stored != checksum.lower(): - exception_string = ( + err_msg = ( "FileHashStore - _verify_object_information: Hex digest and checksum" + f" do not match - file not stored for pid: {pid}. Algorithm:" + f" {checksum_algorithm}. Checksum provided: {checksum} !=" @@ -2290,14 +2286,14 @@ def _verify_object_information( if pid is not None: # Delete the tmp file self._delete(entity, tmp_file_name) - exception_string_for_pid = ( - exception_string + f" Tmp file ({tmp_file_name}) deleted." + err_msg_for_pid = ( + err_msg + f" Tmp file ({tmp_file_name}) deleted." ) - logging.debug(exception_string_for_pid) - raise NonMatchingChecksum(exception_string_for_pid) + logging.debug(err_msg_for_pid) + raise NonMatchingChecksum(err_msg_for_pid) else: - logging.debug(exception_string) - raise NonMatchingChecksum(exception_string) + logging.debug(err_msg) + raise NonMatchingChecksum(err_msg) def _verify_hashstore_references( self, @@ -2328,42 +2324,42 @@ def _verify_hashstore_references( # Check that reference files were created if not os.path.isfile(pid_refs_path): - exception_string = ( + err_msg = ( "FileHashStore - _verify_hashstore_references: Pid refs file missing: " + str(pid_refs_path) + f" . Additional Context: {additional_log_string}" ) - logging.error(exception_string) - raise PidRefsFileNotFound(exception_string) + logging.error(err_msg) + raise PidRefsFileNotFound(err_msg) if not os.path.isfile(cid_refs_path): - exception_string = ( + err_msg = ( "FileHashStore - _verify_hashstore_references: Cid refs file missing: " + str(cid_refs_path) + f" . Additional Context: {additional_log_string}" ) - logging.error(exception_string) - raise CidRefsFileNotFound(exception_string) + logging.error(err_msg) + raise CidRefsFileNotFound(err_msg) # Check the content of the reference files # Start with the cid retrieved_cid = self._read_small_file_content(pid_refs_path) if retrieved_cid != cid: - exception_string = ( + err_msg = ( "FileHashStore - _verify_hashstore_references: Pid refs file exists" + f" ({pid_refs_path}) but cid ({cid}) does not match." + f" Additional Context: {additional_log_string}" ) - logging.error(exception_string) - raise PidRefsContentError(exception_string) + logging.error(err_msg) + raise PidRefsContentError(err_msg) # Then the pid pid_found = self._is_string_in_refs_file(pid, cid_refs_path) if not pid_found: - exception_string = ( + err_msg = ( "FileHashStore - _verify_hashstore_references: Cid refs file exists" + f" ({cid_refs_path}) but pid ({pid}) not found." + f" Additional Context: {additional_log_string}" ) - logging.error(exception_string) - raise CidRefsContentError(exception_string) + logging.error(err_msg) + raise CidRefsContentError(err_msg) def _delete_object_only(self, cid: str) -> None: """Attempt to delete an object based on the given content identifier (cid). If the object @@ -2458,9 +2454,9 @@ def _check_arg_format_id(self, format_id: str, method: str) -> str: :return: Valid metadata namespace. """ if format_id and not format_id.strip(): - exception_string = f"FileHashStore - {method}: Format_id cannot be empty." - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = f"FileHashStore - {method}: Format_id cannot be empty." + logging.error(err_msg) + raise ValueError(err_msg) elif format_id is None: # Use default value set by hashstore config checked_format_id = self.sysmeta_ns @@ -2523,12 +2519,12 @@ def _clean_algorithm(self, algorithm_string: str) -> str: cleaned_string not in self.default_algo_list and cleaned_string not in self.other_algo_list ): - exception_string = ( + err_msg = ( "FileHashStore: _clean_algorithm: Algorithm not supported:" + cleaned_string ) - logging.error(exception_string) - raise UnsupportedAlgorithm(exception_string) + logging.error(err_msg) + raise UnsupportedAlgorithm(err_msg) return cleaned_string def _computehash( @@ -2691,10 +2687,8 @@ def _delete(self, entity: str, file: Union[str, Path]) -> None: os.remove(realpath) except Exception as err: - exception_string = ( - f"FileHashStore - delete(): Unexpected {err=}, {type(err)=}" - ) - logging.error(exception_string) + err_msg = f"FileHashStore - delete(): Unexpected {err=}, {type(err)=}" + logging.error(err_msg) raise err def _create_path(self, path: Path) -> None: @@ -3054,19 +3048,19 @@ def _check_arg_data(data: Union[str, os.PathLike, io.BufferedReader]) -> bool: and not isinstance(data, Path) and not isinstance(data, io.BufferedIOBase) ): - exception_string = ( + err_msg = ( "FileHashStore - _validate_arg_data: Data must be a path, string or buffered" + f" stream type. Data type supplied: {type(data)}" ) - logging.error(exception_string) - raise TypeError(exception_string) + logging.error(err_msg) + raise TypeError(err_msg) if isinstance(data, str): if data.strip() == "": - exception_string = ( + err_msg = ( "FileHashStore - _validate_arg_data: Data string cannot be empty." ) - logging.error(exception_string) - raise TypeError(exception_string) + logging.error(err_msg) + raise TypeError(err_msg) return True @staticmethod @@ -3078,18 +3072,16 @@ def _check_integer(file_size: int) -> None: """ if file_size is not None: if not isinstance(file_size, int): - exception_string = ( + err_msg = ( "FileHashStore - _check_integer: size given must be an integer." + f" File size: {file_size}. Arg Type: {type(file_size)}." ) - logging.error(exception_string) - raise TypeError(exception_string) + logging.error(err_msg) + raise TypeError(err_msg) if file_size < 1: - exception_string = ( - "FileHashStore - _check_integer: size given must be > 0" - ) - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = "FileHashStore - _check_integer: size given must be > 0" + logging.error(err_msg) + raise ValueError(err_msg) @staticmethod def _check_string(string: str, arg: str) -> None: @@ -3101,12 +3093,12 @@ def _check_string(string: str, arg: str) -> None: """ if string is None or string.strip() == "" or any(ch.isspace() for ch in string): method = inspect.stack()[1].function - exception_string = ( + err_msg = ( f"FileHashStore - {method}: {arg} cannot be None" + f" or empty, {arg}: {string}." ) - logging.error(exception_string) - raise ValueError(exception_string) + logging.error(err_msg) + raise ValueError(err_msg) @staticmethod def _cast_to_bytes(text: any) -> bytes: From 6180301eb5d89605eaf74bd018f9c0fa117a4fa9 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 16:14:57 -0700 Subject: [PATCH 04/12] Clean-up logging in 'filehashstore' init related methods and delete redundant method --- src/hashstore/filehashstore.py | 128 +++++++-------------------------- 1 file changed, 26 insertions(+), 102 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 51299cdb..d6e9eb0c 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -211,10 +211,7 @@ def _load_properties( - store_metadata_namespace (str): Namespace for the HashStore's system metadata. """ if not os.path.isfile(hashstore_yaml_path): - err_msg = ( - "FileHashStore - load_properties: hashstore.yaml not found" - + " in store root path." - ) + err_msg = "'hashstore.yaml' not found in store root path." logging.critical(err_msg) raise FileNotFoundError(err_msg) @@ -227,9 +224,7 @@ def _load_properties( for key in hashstore_required_prop_keys: if key != "store_path": hashstore_yaml_dict[key] = yaml_data[key] - logging.debug( - "FileHashStore - load_properties: Successfully retrieved 'hashstore.yaml' properties." - ) + logging.debug("Successfully retrieved 'hashstore.yaml' properties.") return hashstore_yaml_dict def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: @@ -244,10 +239,7 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: """ # If hashstore.yaml already exists, must throw exception and proceed with caution if os.path.isfile(self.hashstore_configuration_yaml): - err_msg = ( - "FileHashStore - write_properties: configuration file 'hashstore.yaml'" - + " already exists." - ) + err_msg = "Configuration file 'hashstore.yaml' already exists." logging.error(err_msg) raise FileExistsError(err_msg) # Validate properties @@ -266,9 +258,8 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: checked_store_algorithm = store_algorithm else: err_msg = ( - f"FileHashStore - write_properties: algorithm supplied ({store_algorithm})" - f" cannot be used as default for HashStore. Must be one of: " - + f"{', '.join(accepted_store_algorithms)}" + f"Algorithm supplied ({store_algorithm}) cannot be used as default for" + f" HashStore. Must be one of: {', '.join(accepted_store_algorithms)}" f" which are DataONE controlled algorithm values" ) logging.error(err_msg) @@ -292,63 +283,10 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: hs_yaml_file.write(hashstore_configuration_yaml) logging.debug( - "FileHashStore - write_properties: Configuration file written to: %s", - self.hashstore_configuration_yaml, + "Configuration file written to: %s", self.hashstore_configuration_yaml ) return - @staticmethod - def _build_hashstore_yaml_string_old( - store_depth: int, - store_width: int, - store_algorithm: str, - store_metadata_namespace: str, - ) -> str: - """Build a YAML string representing the configuration for a HashStore. - - :param int store_depth: Depth when sharding an object's hex digest. - :param int store_width: Width of directories when sharding an object's hex digest. - :param str store_algorithm: Hash algorithm used for calculating the object's hex digest. - :param str store_metadata_namespace: Namespace for the HashStore's system metadata. - - :return: A YAML string representing the configuration for a HashStore. - """ - hashstore_configuration_yaml = f""" - # Default configuration variables for HashStore - - ############### Directory Structure ############### - # Desired amount of directories when sharding an object to form the permanent address - store_depth: {store_depth} # WARNING: DO NOT CHANGE UNLESS SETTING UP NEW HASHSTORE - # Width of directories created when sharding an object to form the permanent address - store_width: {store_width} # WARNING: DO NOT CHANGE UNLESS SETTING UP NEW HASHSTORE - # Example: - # Below, objects are shown listed in directories that are 3 levels deep (DIR_DEPTH=3), - # with each directory consisting of 2 characters (DIR_WIDTH=2). - # /var/filehashstore/objects - # ├── 7f - # │ └── 5c - # │ └── c1 - # │ └── 8f0b04e812a3b4c8f686ce34e6fec558804bf61e54b176742a7f6368d6 - - ############### Format of the Metadata ############### - # The default metadata format - store_metadata_namespace: "{store_metadata_namespace}" - - ############### Hash Algorithms ############### - # Hash algorithm to use when calculating object's hex digest for the permanent address - store_algorithm: "{store_algorithm}" - # Algorithm values supported by python hashlib 3.9.0+ for File Hash Store (FHS) - # The default algorithm list includes the hash algorithms calculated when storing an - # object to disk and returned to the caller after successful storage. - store_default_algo_list: - - "MD5" - - "SHA-1" - - "SHA-256" - - "SHA-384" - - "SHA-512" - """ - return hashstore_configuration_yaml - @staticmethod def _build_hashstore_yaml_string( store_depth: int, @@ -440,8 +378,8 @@ def _verify_hashstore_properties( :param str prop_store_path: Store path to check. """ if os.path.isfile(self.hashstore_configuration_yaml): - logging.debug( - "FileHashStore - Config found (hashstore.yaml) at {%s}. Verifying properties.", + self.fhs_logger.debug( + "Config found (hashstore.yaml) at {%s}. Verifying properties.", self.hashstore_configuration_yaml, ) # If 'hashstore.yaml' is found, verify given properties before init @@ -456,11 +394,11 @@ def _verify_hashstore_properties( supplied_key = int(properties[key]) if hashstore_yaml_dict[key] != supplied_key: err_msg = ( - f"FileHashStore - Given properties ({key}: {properties[key]}) does not" - + f" match. HashStore configuration ({key}: {hashstore_yaml_dict[key]})" + f"Given properties ({key}: {properties[key]}) does not match." + + f" HashStore configuration ({key}: {hashstore_yaml_dict[key]})" + f" found at: {self.hashstore_configuration_yaml}" ) - logging.critical(err_msg) + self.fhs_logger.critical(err_msg) raise ValueError(err_msg) else: if os.path.exists(prop_store_path): @@ -471,12 +409,11 @@ def _verify_hashstore_properties( for sub in subfolders ): err_msg = ( - "FileHashStore - Unable to initialize HashStore. `hashstore.yaml` is not" - + " present but conflicting HashStore directory exists. Please delete" - + " '/objects', '/metadata' and/or '/refs' at the store path or supply" - + " a new path." + "Unable to initialize HashStore. `hashstore.yaml` is not present but " + "conflicting HashStore directory exists. Please delete '/objects', " + "'/metadata' and/or '/refs' at the store path or supply a new path." ) - logging.critical(err_msg) + self.fhs_logger.critical(err_msg) raise RuntimeError(err_msg) def _validate_properties( @@ -493,11 +430,8 @@ def _validate_properties( :return: The given properties object (that has been validated). """ if not isinstance(properties, dict): - err_msg = ( - "FileHashStore - _validate_properties: Invalid argument -" - + " expected a dictionary." - ) - logging.debug(err_msg) + err_msg = "Invalid argument expected a dictionary." + self.fhs_logger.error(err_msg) raise ValueError(err_msg) # New dictionary for validated properties @@ -505,20 +439,14 @@ def _validate_properties( for key in self.property_required_keys: if key not in properties: - err_msg = ( - "FileHashStore - _validate_properties: Missing required" - + f" key: {key}." - ) - logging.debug(err_msg) + err_msg = "Missing required key: {key}." + self.fhs_logger.error(err_msg) raise KeyError(err_msg) value = properties.get(key) if value is None: - err_msg = ( - "FileHashStore - _validate_properties: Value for key:" - + f" {key} is none." - ) - logging.debug(err_msg) + err_msg = "Value for key: {key} is none." + self.fhs_logger.error(err_msg) raise ValueError(err_msg) # Add key and values to checked_properties @@ -528,11 +456,10 @@ def _validate_properties( checked_properties[key] = int(value) except Exception as err: err_msg = ( - "FileHashStore - _validate_properties: Unexpected exception when" - " attempting to ensure store depth and width are integers. Details: " - + str(err) + "Unexpected exception when attempting to ensure store depth and width " + f"are integers. Details: {err}" ) - logging.debug(err_msg) + self.fhs_logger.error(err_msg) raise ValueError(err_msg) else: checked_properties[key] = value @@ -556,11 +483,8 @@ def lookup_algo(algo_to_translate): return dataone_algo_translation[algo_to_translate] if not os.path.isfile(self.hashstore_configuration_yaml): - err_msg = ( - "FileHashStore - set_default_algorithms: hashstore.yaml not found" - + " in store root path." - ) - logging.critical(err_msg) + err_msg = "hashstore.yaml not found in store root path." + self.fhs_logger.critical(err_msg) raise FileNotFoundError(err_msg) with open( From 692703b735b186a965b13b1a373538de827c44a0 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 16:40:22 -0700 Subject: [PATCH 05/12] Clean-up logging in 'filehashstore' interface methods and add missing logging messages in 'delete_object' --- src/hashstore/filehashstore.py | 324 ++++++++++++++------------------- 1 file changed, 132 insertions(+), 192 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index d6e9eb0c..34523608 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -517,17 +517,14 @@ def store_object( ) -> "ObjectMetadata": if pid is None and self._check_arg_data(data): # If no pid is supplied, store the object only without tagging - logging.debug("FileHashStore - store_object: Request to store data only.") + logging.debug("Request to store data only received.") object_metadata = self._store_data_only(data) - logging.info( - "FileHashStore - store_object: Successfully stored object for cid: %s", - object_metadata.cid, + self.fhs_logger.info( + "Successfully stored object for cid: %s", object_metadata.cid ) else: # Else the object will be stored and tagged - logging.debug( - "FileHashStore - store_object: Request to store object for pid: %s", pid - ) + self.fhs_logger.debug("Request to store object for pid: %s", pid) # Validate input parameters self._check_string(pid, "pid") self._check_arg_data(data) @@ -539,34 +536,26 @@ def store_object( additional_algorithm, checksum, checksum_algorithm ) - sync_begin_debug_msg = ( - f"FileHashStore - store_object: Adding pid ({pid}) to locked list." - ) - err_msg = ( - f"FileHashStore - store_object: Duplicate object request encountered for pid: " - f"{pid}" + ". Already in progress." - ) + sync_begin_debug_msg = f"Adding pid ({pid}) to locked list." + err_msg = f"Duplicate object request encountered for pid: {pid}. Already in progress." if self.use_multiprocessing: with self.object_pid_condition_mp: # Wait for the pid to release if it's in use if pid in self.object_locked_pids_mp: - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise StoreObjectForPidAlreadyInProgress(err_msg) # Modify object_locked_pids consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_pids_mp.append(pid) else: with self.object_pid_condition_th: if pid in self.object_locked_pids_th: logging.error(err_msg) raise StoreObjectForPidAlreadyInProgress(err_msg) - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_pids_th.append(pid) try: - logging.debug( - "FileHashStore - store_object: Attempting to store object for pid: %s", - pid, - ) + self.fhs_logger.debug("Attempting to store object for pid: %s", pid) object_metadata = self._store_and_validate_data( pid, data, @@ -575,23 +564,16 @@ def store_object( checksum_algorithm=checksum_algorithm_checked, file_size_to_validate=expected_object_size, ) - logging.debug( - "FileHashStore - store_object: Attempting to tag object for pid: %s", - pid, - ) + self.fhs_logger.debug("Attempting to tag object for pid: %s", pid) cid = object_metadata.cid self.tag_object(pid, cid) - logging.info( - "FileHashStore - store_object: Successfully stored object for pid: %s", - pid, - ) + self.fhs_logger.info("Successfully stored object for pid: %s", pid) except Exception as err: err_msg = ( - f"FileHashStore - store_object: failed to store object for pid: {pid}." - + " Reference files will not be created or tagged. Unexpected error: " - + str(err) + f"failed to store object for pid: {pid}. Reference files will not be created " + f"or tagged. Unexpected error: {err})" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise err finally: # Release pid @@ -600,27 +582,19 @@ def store_object( return object_metadata def tag_object(self, pid: str, cid: str) -> None: - logging.debug( - "FileHashStore - tag_object: Tagging object cid: %s with pid: %s.", - cid, - pid, - ) + logging.debug("Tagging object cid: %s with pid: %s.", cid, pid) self._check_string(pid, "pid") self._check_string(cid, "cid") try: self._store_hashstore_refs_files(pid, cid) except HashStoreRefsAlreadyExists as hrae: - err_msg = ( - f"FileHashStore - tag_object: reference files for pid: {pid} and {cid} " - "already exist. " + str(hrae) - ) + err_msg = f"Reference files for pid: {pid} and {cid} already exist. Details: {hrae}" + self.fhs_logger.error(err_msg) raise HashStoreRefsAlreadyExists(err_msg) except PidRefsAlreadyExistsError as praee: - err_msg = ( - f"FileHashStore - tag_object: A pid can only reference one cid. " - + str(praee) - ) + err_msg = f"A pid can only reference one cid. Details: {praee}" + self.fhs_logger.error(err_msg) raise PidRefsAlreadyExistsError(err_msg) def delete_if_invalid_object( @@ -635,15 +609,13 @@ def delete_if_invalid_object( self._check_integer(expected_file_size) if object_metadata is None or not isinstance(object_metadata, ObjectMetadata): err_msg = ( - "FileHashStore - verify_object: 'object_metadata' cannot be None." - + " Must be a 'ObjectMetadata' object." + "'object_metadata' cannot be None. Must be a 'ObjectMetadata' object." ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise ValueError(err_msg) else: - logging.info( - "FileHashStore - verify_object: Called to verify object with id: %s", - object_metadata.cid, + self.fhs_logger.info( + "Called to verify object with id: %s", object_metadata.cid ) object_metadata_hex_digests = object_metadata.hex_digests object_metadata_file_size = object_metadata.obj_size @@ -668,17 +640,14 @@ def delete_if_invalid_object( except NonMatchingChecksum as mmce: self._delete_object_only(object_metadata.cid) raise mmce - logging.info( - "FileHashStore - verify_object: object has been validated for cid: %s", - object_metadata.cid, + self.fhs_logger.info( + "Object has been validated for cid: %s", object_metadata.cid ) def store_metadata( self, pid: str, metadata: Union[str, bytes], format_id: Optional[str] = None ) -> str: - logging.debug( - "FileHashStore - store_metadata: Request to store metadata for pid: %s", pid - ) + self.fhs_logger.debug("Request to store metadata for pid: %s", pid) # Validate input parameters self._check_string(pid, "pid") self._check_arg_data(metadata) @@ -686,60 +655,57 @@ def store_metadata( pid_doc = self._computehash(pid + checked_format_id) sync_begin_debug_msg = ( - f"FileHashStore - store_metadata: Adding pid: {pid} to locked list, " - + f"with format_id: {checked_format_id} with doc name: {pid_doc}" + f" Adding pid: {pid} to locked list, with format_id: {checked_format_id} with doc " + f"name: {pid_doc}" ) sync_wait_msg = ( - f"FileHashStore - store_metadata: Pid: {pid} is locked for format_id:" - + f" {checked_format_id} with doc name: {pid_doc}. Waiting." + f"Pid: {pid} is locked for format_id: {checked_format_id} with doc name: {pid_doc}. " + f"Waiting." ) if self.use_multiprocessing: with self.metadata_condition_mp: # Wait for the pid to release if it's in use while pid_doc in self.metadata_locked_docs_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_mp.wait() # Modify metadata_locked_docs consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_mp.append(pid_doc) else: with self.metadata_condition_th: while pid_doc in self.metadata_locked_docs_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_th.append(pid_doc) try: metadata_cid = self._put_metadata(metadata, pid, pid_doc) info_msg = ( - "FileHashStore - store_metadata: Successfully stored metadata for" - + f" pid: {pid} with format_id: {checked_format_id}" + f"Successfully stored metadata for pid: {pid} with format_id: " + + checked_format_id ) - logging.info(info_msg) + self.fhs_logger.info(info_msg) return str(metadata_cid) finally: # Release pid end_sync_debug_msg = ( - f"FileHashStore - store_metadata: Releasing pid doc ({pid_doc})" - + f" from locked list for pid: {pid} with format_id: {checked_format_id}" + f"Releasing pid doc ({pid_doc}) from locked list for pid: {pid} with format_id: " + + checked_format_id ) if self.use_multiprocessing: with self.metadata_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_mp.remove(pid_doc) self.metadata_condition_mp.notify() else: with self.metadata_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_th.remove(pid_doc) self.metadata_condition_th.notify() def retrieve_object(self, pid: str) -> IO[bytes]: - logging.debug( - "FileHashStore - retrieve_object: Request to retrieve object for pid: %s", - pid, - ) + self.fhs_logger.debug("Request to retrieve object for pid: %s", pid) self._check_string(pid, "pid") object_info_dict = self._find_object(pid) @@ -747,26 +713,20 @@ def retrieve_object(self, pid: str) -> IO[bytes]: entity = "objects" if object_cid: - logging.debug( - "FileHashStore - retrieve_object: Metadata exists for pid: %s, retrieving object.", - pid, + self.fhs_logger.debug( + "Metadata exists for pid: %s, retrieving object.", pid ) obj_stream = self._open(entity, object_cid) else: - err_msg = f"FileHashStore - retrieve_object: No object found for pid: {pid}" - logging.error(err_msg) + err_msg = f"No object found for pid: {pid}" + self.fhs_logger.error(err_msg) raise ValueError(err_msg) - logging.info( - "FileHashStore - retrieve_object: Retrieved object for pid: %s", pid - ) + self.fhs_logger.info("Retrieved object for pid: %s", pid) return obj_stream def retrieve_metadata(self, pid: str, format_id: Optional[str] = None) -> IO[bytes]: - logging.debug( - "FileHashStore - retrieve_metadata: Request to retrieve metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Request to retrieve metadata for pid: %s", pid) self._check_string(pid, "pid") checked_format_id = self._check_arg_format_id(format_id, "retrieve_metadata") @@ -783,21 +743,15 @@ def retrieve_metadata(self, pid: str, format_id: Optional[str] = None) -> IO[byt if metadata_exists: metadata_stream = self._open(entity, str(metadata_rel_path)) - logging.info( - "FileHashStore - retrieve_metadata: Retrieved metadata for pid: %s", pid - ) + self.fhs_logger.info("Retrieved metadata for pid: %s", pid) return metadata_stream else: - err_msg = ( - f"FileHashStore - retrieve_metadata: No metadata found for pid: {pid}" - ) - logging.error(err_msg) + err_msg = f"No metadata found for pid: {pid}" + self.fhs_logger.error(err_msg) raise ValueError(err_msg) def delete_object(self, pid: str) -> None: - logging.debug( - "FileHashStore - delete_object: Request to delete object for id: %s", pid - ) + self.fhs_logger.debug("Request to delete object for id: %s", pid) self._check_string(pid, "pid") objects_to_delete = [] @@ -805,27 +759,23 @@ def delete_object(self, pid: str) -> None: # Storing and deleting objects are synchronized together # Duplicate store object requests for a pid are rejected, but deleting an object # will wait for a pid to be released if it's found to be in use before proceeding. - sync_begin_debug_msg = ( - f"FileHashStore - delete_object: Pid ({pid}) to locked list." - ) - sync_wait_msg = ( - f"FileHashStore - delete_object: Pid ({pid}) is locked. Waiting." - ) + sync_begin_debug_msg = f"Pid ({pid}) to locked list." + sync_wait_msg = f"Pid ({pid}) is locked. Waiting." if self.use_multiprocessing: with self.object_pid_condition_mp: # Wait for the pid to release if it's in use while pid in self.object_locked_pids_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.object_pid_condition_mp.wait() # Modify object_locked_pids consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_pids_mp.append(pid) else: with self.object_pid_condition_th: while pid in self.object_locked_pids_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.object_pid_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_pids_th.append(pid) try: @@ -839,28 +789,23 @@ def delete_object(self, pid: str) -> None: # Proceed with next steps - cid has been retrieved without any issues # We must synchronize here based on the `cid` because multiple threads may # try to access the `cid_reference_file` - sync_begin_debug_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) to locked list." - ) - sync_wait_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) is locked." - + " Waiting." - ) + sync_begin_debug_msg = f"Cid ({cid}) to locked list." + sync_wait_msg = f"Cid ({cid}) is locked. Waiting." if self.use_multiprocessing: with self.object_cid_condition_mp: # Wait for the cid to release if it's in use while cid in self.object_locked_cids_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.object_cid_condition_mp.wait() # Modify reference_locked_cids consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_cids_mp.append(cid) else: with self.object_cid_condition_th: while cid in self.object_locked_cids_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.object_cid_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_cids_th.append(cid) try: @@ -875,10 +820,10 @@ def delete_object(self, pid: str) -> None: # Delete cid reference file and object only if the cid refs file is empty if os.path.getsize(cid_ref_abs_path) == 0: debug_msg = ( - "FileHashStore - delete_object: cid_refs_file is empty (size == 0):" - + f" {cid_ref_abs_path} - deleting cid refs file and data object." + f"Cid reference file is empty (size == 0): {cid_ref_abs_path} - " + + "deleting cid reference file and data object." ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) objects_to_delete.append( self._rename_path_for_deletion(cid_ref_abs_path) ) @@ -893,36 +838,32 @@ def delete_object(self, pid: str) -> None: self.delete_metadata(pid) info_string = ( - "FileHashStore - delete_object: Successfully deleted references," - + f" metadata and object associated with pid: {pid}" + f"Successfully deleted references, metadata and object associated" + + f" with pid: {pid}" ) - logging.info(info_string) + self.fhs_logger.info(info_string) return finally: # Release cid - end_sync_debug_msg = ( - f"FileHashStore - delete_object: Releasing cid ({cid})" - + " from locked list" - ) + end_sync_debug_msg = f"Releasing cid ({cid}) from locked list" if self.use_multiprocessing: with self.object_cid_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.object_locked_cids_mp.remove(cid) self.object_cid_condition_mp.notify() else: with self.object_cid_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.object_locked_cids_th.remove(cid) self.object_cid_condition_th.notify() except PidRefsDoesNotExist: warn_msg = ( - "FileHashStore - delete_object: pid refs file does not exist for pid: " - + pid - + ". Skipping object deletion. Deleting pid metadata documents." + f"Pid reference file does not exist for pid: {pid} Skipping object deletion. " + + "Deleting pid metadata documents." ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) # Remove metadata files if they exist self.delete_metadata(pid) @@ -931,6 +872,12 @@ def delete_object(self, pid: str) -> None: self._delete_marked_files(objects_to_delete) return except OrphanPidRefsFileFound: + warn_msg = ( + f"Orphan pid reference file found for pid: {pid}. Skipping object deletion. " + + "Deleting pid reference file and related metadata documents." + ) + self.fhs_logger.warning(warn_msg) + # Delete pid refs file pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) objects_to_delete.append( @@ -942,6 +889,13 @@ def delete_object(self, pid: str) -> None: self._delete_marked_files(objects_to_delete) return except RefsFileExistsButCidObjMissing: + warn_msg = ( + f"Reference files exist for pid: {pid}, but the data object is missing. " + + "Deleting pid reference file & related metadata documents. Handling cid " + + "reference file." + ) + self.fhs_logger.warning(warn_msg) + # Add pid refs file to be permanently deleted pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) objects_to_delete.append( @@ -959,6 +913,12 @@ def delete_object(self, pid: str) -> None: self._delete_marked_files(objects_to_delete) return except PidNotFoundInCidRefsFile: + warn_msg = ( + f"Pid {pid} not found in cid reference file. Deleting pid reference " + + "file and related metadata documents." + ) + self.fhs_logger.warning(warn_msg) + # Add pid refs file to be permanently deleted pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) objects_to_delete.append( @@ -971,27 +931,21 @@ def delete_object(self, pid: str) -> None: return finally: # Release pid - end_sync_debug_msg = ( - f"FileHashStore - delete_object: Releasing pid ({pid})" - + " from locked list" - ) + end_sync_debug_msg = f"Releasing pid ({pid}) from locked list" if self.use_multiprocessing: with self.object_pid_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.object_locked_pids_mp.remove(pid) self.object_pid_condition_mp.notify() else: # Release pid with self.object_pid_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.object_locked_pids_th.remove(pid) self.object_pid_condition_th.notify() def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: - logging.debug( - "FileHashStore - delete_metadata: Request to delete metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Request to delete metadata for pid: %s", pid) self._check_string(pid, "pid") checked_format_id = self._check_arg_format_id(format_id, "delete_metadata") metadata_directory = self._computehash(pid) @@ -1010,28 +964,28 @@ def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: # Synchronize based on doc name # Wait for the pid to release if it's in use sync_begin_debug_msg = ( - f"FileHashStore - delete_metadata: Adding pid: {pid} to locked list, " - + f"with format_id: {checked_format_id} with doc name: {pid_doc}" + f"Adding pid: {pid} to locked list, with format_id: {checked_format_id} " + + f"with doc name: {pid_doc}" ) sync_wait_msg = ( - f"FileHashStore - delete_metadata: Pid: {pid} is locked for format_id:" - + f" {checked_format_id} with doc name: {pid_doc}. Waiting." + f"Pid: {pid} is locked for format_id: {checked_format_id} with doc name:" + + f" {pid_doc}. Waiting." ) if self.use_multiprocessing: with self.metadata_condition_mp: # Wait for the pid to release if it's in use while pid in self.metadata_locked_docs_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_mp.wait() # Modify metadata_locked_docs consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_mp.append(pid_doc) else: with self.metadata_condition_th: while pid in self.metadata_locked_docs_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_th.append(pid_doc) try: # Mark metadata doc for deletion @@ -1039,87 +993,76 @@ def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: finally: # Release pid end_sync_debug_msg = ( - f"FileHashStore - delete_metadata: Releasing pid doc ({pid_doc})" - + f" from locked list for pid: {pid} with format_id:" - + checked_format_id + f"Releasing pid doc ({pid_doc}) from locked list for pid: {pid} with " + + f"format_id: {checked_format_id}" ) if self.use_multiprocessing: with self.metadata_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_mp.remove(pid_doc) self.metadata_condition_mp.notify() else: with self.metadata_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_th.remove(pid_doc) self.metadata_condition_th.notify() # Delete metadata objects self._delete_marked_files(objects_to_delete) - info_string = ( - "FileHashStore - delete_metadata: Successfully deleted all metadata" - + f"for pid: {pid}", - ) - logging.info(info_string) + info_string = ("Successfully deleted all metadata for pid: {pid}",) + self.fhs_logger.info(info_string) else: # Delete a specific metadata file pid_doc = self._computehash(pid + checked_format_id) # Wait for the pid to release if it's in use sync_begin_debug_msg = ( - f"FileHashStore - delete_metadata: Adding pid: {pid} to locked list, " - + f"with format_id: {checked_format_id} with doc name: {pid_doc}" + f"Adding pid: {pid} to locked list, with format_id: {checked_format_id} with doc " + + f"name: {pid_doc}" ) sync_wait_msg = ( - f"FileHashStore - delete_metadata: Pid: {pid} is locked for format_id:" - + f" {checked_format_id} with doc name: {pid_doc}. Waiting." + f"Pid: {pid} is locked for format_id: {checked_format_id} with doc name:" + + f" {pid_doc}. Waiting." ) if self.use_multiprocessing: with self.metadata_condition_mp: # Wait for the pid to release if it's in use while pid in self.metadata_locked_docs_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_mp.wait() # Modify metadata_locked_docs consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_mp.append(pid_doc) else: with self.metadata_condition_th: while pid in self.metadata_locked_docs_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_th.append(pid_doc) try: full_path_without_directory = Path(self.metadata / rel_path / pid_doc) self._delete("metadata", full_path_without_directory) - info_string = ( - "FileHashStore - delete_metadata: Successfully deleted metadata for pid:" - + f" {pid} for format_id: {format_id}" - ) - logging.info(info_string) + info_string = f"Successfully deleted metadata for pid: {pid} for format_id: {format_id}" + self.fhs_logger.info(info_string) finally: # Release pid end_sync_debug_msg = ( - f"FileHashStore - delete_metadata: Releasing pid doc ({pid_doc})" - + f" from locked list for pid: {pid} with format_id:" - + checked_format_id + f"Releasing pid doc ({pid_doc}) from locked list for pid: {pid} with " + f"format_id: {checked_format_id}" ) if self.use_multiprocessing: with self.metadata_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_mp.remove(pid_doc) self.metadata_condition_mp.notify() else: with self.metadata_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_th.remove(pid_doc) self.metadata_condition_th.notify() def get_hex_digest(self, pid: str, algorithm: str) -> str: - logging.debug( - "FileHashStore - get_hex_digest: Request to get hex digest for object with pid: %s", - pid, - ) + self.fhs_logger.debug("Request to get hex digest for object with pid: %s", pid) self._check_string(pid, "pid") self._check_string(algorithm, "algorithm") @@ -1127,16 +1070,13 @@ def get_hex_digest(self, pid: str, algorithm: str) -> str: algorithm = self._clean_algorithm(algorithm) object_cid = self._find_object(pid).get("cid") if not self._exists(entity, object_cid): - err_msg = f"FileHashStore - get_hex_digest: No object found for pid: {pid}" - logging.error(err_msg) + err_msg = f"No object found for pid: {pid}" + self.fhs_logger.error(err_msg) raise ValueError(err_msg) cid_stream = self._open(entity, object_cid) hex_digest = self._computehash(cid_stream, algorithm=algorithm) - info_string = ( - f"FileHashStore - get_hex_digest: Successfully calculated hex digest for pid: {pid}." - + f" Hex Digest: {hex_digest}", - ) + info_string = f"Successfully calculated hex digest for pid: {pid}. Hex Digest: {hex_digest}" logging.info(info_string) return hex_digest From 47f6556f4edef297506256bacf75ffd3533118d2 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 16:57:06 -0700 Subject: [PATCH 06/12] Fix bug in 'delete_object' where synchronization call was not made in try block, and another bug where a cid was not locked during an exception scenario --- src/hashstore/filehashstore.py | 121 +++++++++++++-------------------- 1 file changed, 47 insertions(+), 74 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 34523608..a4267546 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -761,27 +761,13 @@ def delete_object(self, pid: str) -> None: # will wait for a pid to be released if it's found to be in use before proceeding. sync_begin_debug_msg = f"Pid ({pid}) to locked list." sync_wait_msg = f"Pid ({pid}) is locked. Waiting." - if self.use_multiprocessing: - with self.object_pid_condition_mp: - # Wait for the pid to release if it's in use - while pid in self.object_locked_pids_mp: - self.fhs_logger.debug(sync_wait_msg) - self.object_pid_condition_mp.wait() - # Modify object_locked_pids consecutively - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_pids_mp.append(pid) - else: - with self.object_pid_condition_th: - while pid in self.object_locked_pids_th: - self.fhs_logger.debug(sync_wait_msg) - self.object_pid_condition_th.wait() - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_pids_th.append(pid) try: # Before we begin deletion process, we look for the `cid` by calling # `find_object` which will throw custom exceptions if there is an issue with # the reference files, which help us determine the path to proceed with. + self._synchronize_object_locked_pids(pid) + try: object_info_dict = self._find_object(pid) cid = object_info_dict.get("cid") @@ -789,24 +775,7 @@ def delete_object(self, pid: str) -> None: # Proceed with next steps - cid has been retrieved without any issues # We must synchronize here based on the `cid` because multiple threads may # try to access the `cid_reference_file` - sync_begin_debug_msg = f"Cid ({cid}) to locked list." - sync_wait_msg = f"Cid ({cid}) is locked. Waiting." - if self.use_multiprocessing: - with self.object_cid_condition_mp: - # Wait for the cid to release if it's in use - while cid in self.object_locked_cids_mp: - self.fhs_logger.debug(sync_wait_msg) - self.object_cid_condition_mp.wait() - # Modify reference_locked_cids consecutively - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_cids_mp.append(cid) - else: - with self.object_cid_condition_th: - while cid in self.object_locked_cids_th: - self.fhs_logger.debug(sync_wait_msg) - self.object_cid_condition_th.wait() - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_cids_th.append(cid) + self._synchronize_object_locked_cids(cid) try: cid_ref_abs_path = object_info_dict.get("cid_refs_path") @@ -846,31 +815,8 @@ def delete_object(self, pid: str) -> None: finally: # Release cid - end_sync_debug_msg = f"Releasing cid ({cid}) from locked list" - if self.use_multiprocessing: - with self.object_cid_condition_mp: - self.fhs_logger.debug(end_sync_debug_msg) - self.object_locked_cids_mp.remove(cid) - self.object_cid_condition_mp.notify() - else: - with self.object_cid_condition_th: - self.fhs_logger.debug(end_sync_debug_msg) - self.object_locked_cids_th.remove(cid) - self.object_cid_condition_th.notify() - - except PidRefsDoesNotExist: - warn_msg = ( - f"Pid reference file does not exist for pid: {pid} Skipping object deletion. " - + "Deleting pid metadata documents." - ) - self.fhs_logger.warning(warn_msg) + self._release_object_locked_cids(cid) - # Remove metadata files if they exist - self.delete_metadata(pid) - - # Remove all files confirmed for deletion - self._delete_marked_files(objects_to_delete) - return except OrphanPidRefsFileFound: warn_msg = ( f"Orphan pid reference file found for pid: {pid}. Skipping object deletion. " @@ -903,10 +849,16 @@ def delete_object(self, pid: str) -> None: ) # Remove pid from cid refs file pid_refs_cid = self._read_small_file_content(pid_ref_abs_path) - cid_ref_abs_path = self._get_hashstore_cid_refs_path(pid_refs_cid) - # Remove if the pid refs is found - if self._is_string_in_refs_file(pid, cid_ref_abs_path): - self._update_refs_file(cid_ref_abs_path, pid, "remove") + try: + self._synchronize_object_locked_cids(pid_refs_cid) + + cid_ref_abs_path = self._get_hashstore_cid_refs_path(pid_refs_cid) + # Remove if the pid refs is found + if self._is_string_in_refs_file(pid, cid_ref_abs_path): + self._update_refs_file(cid_ref_abs_path, pid, "remove") + finally: + self._release_object_locked_cids(pid_refs_cid) + # Remove metadata files if they exist self.delete_metadata(pid) # Remove all files confirmed for deletion @@ -931,18 +883,7 @@ def delete_object(self, pid: str) -> None: return finally: # Release pid - end_sync_debug_msg = f"Releasing pid ({pid}) from locked list" - if self.use_multiprocessing: - with self.object_pid_condition_mp: - self.fhs_logger.debug(end_sync_debug_msg) - self.object_locked_pids_mp.remove(pid) - self.object_pid_condition_mp.notify() - else: - # Release pid - with self.object_pid_condition_th: - self.fhs_logger.debug(end_sync_debug_msg) - self.object_locked_pids_th.remove(pid) - self.object_pid_condition_th.notify() + self._release_object_locked_pids(pid) def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: self.fhs_logger.debug("Request to delete metadata for pid: %s", pid) @@ -2681,6 +2622,38 @@ def _get_hashstore_cid_refs_path(self, cid: str) -> Path: # Synchronization Methods + def _synchronize_object_locked_pids(self, pid: str) -> None: + """Threads must work with 'pid's one identifier at a time to ensure thread safety when + handling requests to store, delete or tag pids. + + :param str pid: Persistent or authority-based identifier + """ + if self.use_multiprocessing: + with self.object_pid_condition_mp: + # Wait for the cid to release if it's being tagged + while pid in self.object_locked_pids_mp: + logging.debug( + f"_synchronize_object_locked_pids: Pid ({pid}) is locked. Waiting." + ) + self.object_pid_condition_mp.wait() + self.object_locked_pids_mp.append(pid) + logging.debug( + f"_synchronize_object_locked_pids: Synchronizing object_locked_pids_mp for" + + f" pid: {pid}" + ) + else: + with self.object_pid_condition_th: + while pid in self.object_locked_pids_th: + logging.debug( + f"_synchronize_object_locked_pids: Pid ({pid}) is locked. Waiting." + ) + self.object_pid_condition_th.wait() + self.object_locked_pids_th.append(pid) + logging.debug( + f"_synchronize_object_locked_pids: Synchronizing object_locked_pids_th for" + + f" cid: {pid}" + ) + def _release_object_locked_pids(self, pid: str) -> None: """Remove the given persistent identifier from 'object_locked_pids' and notify other waiting threads or processes. From f70243e57d40ca74f4a7cc2d7ad55b4cd460f3cd Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Thu, 26 Sep 2024 17:23:23 -0700 Subject: [PATCH 07/12] Fix typo in logging message in '_synchronize_object_locked_pids' --- src/hashstore/filehashstore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index a4267546..0213e499 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -2651,7 +2651,7 @@ def _synchronize_object_locked_pids(self, pid: str) -> None: self.object_locked_pids_th.append(pid) logging.debug( f"_synchronize_object_locked_pids: Synchronizing object_locked_pids_th for" - + f" cid: {pid}" + + f" pid: {pid}" ) def _release_object_locked_pids(self, pid: str) -> None: From 276c6e74769ed9dbcc16b92ee85d32dd2f9a2aeb Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Fri, 27 Sep 2024 09:15:54 -0700 Subject: [PATCH 08/12] Clean-up logging in 'filehashstore' supporting and core methods part. 1 --- src/hashstore/filehashstore.py | 397 ++++++++++++--------------------- 1 file changed, 148 insertions(+), 249 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 0213e499..14fb9cb5 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -1037,9 +1037,7 @@ def _find_object(self, pid: str) -> Dict[str, str]: - pid_refs_path: path to the pid refs file - sysmeta_path: path to the sysmeta file """ - logging.debug( - "FileHashStore - find_object: Request to find object for for pid: %s", pid - ) + self.fhs_logger.debug("Request to find object for for pid: %s", pid) self._check_string(pid, "pid") pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) @@ -1055,11 +1053,10 @@ def _find_object(self, pid: str) -> Dict[str, str]: # Object must also exist in order to return the cid retrieved if not self._exists("objects", pid_refs_cid): err_msg = ( - f"FileHashStore - find_object: Refs file found for pid ({pid}) at" - + str(pid_ref_abs_path) + f"Reference file found for pid ({pid}) at {pid_ref_abs_path}" + f", but object referenced does not exist, cid: {pid_refs_cid}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise RefsFileExistsButCidObjMissing(err_msg) else: sysmeta_doc_name = self._computehash(pid + self.sysmeta_ns) @@ -1087,25 +1084,23 @@ def _find_object(self, pid: str) -> Dict[str, str]: else: # If not, it is an orphan pid refs file err_msg = ( - "FileHashStore - find_object: pid refs file exists with cid: " - + f"{pid_refs_cid} for pid: {pid} but is missing from cid refs file:" - + str(cid_ref_abs_path) + f"Pid reference file exists with cid: {pid_refs_cid} for pid: {pid} but " + f"is missing from cid refs file: {cid_ref_abs_path}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise PidNotFoundInCidRefsFile(err_msg) else: err_msg = ( - f"FileHashStore - find_object: pid refs file exists with cid: {pid_refs_cid}" - + f", but cid refs file not found: {cid_ref_abs_path} for pid: {pid}" + f"Pid reference file exists with cid: {pid_refs_cid} but cid reference file " + + f"not found: {cid_ref_abs_path} for pid: {pid}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise OrphanPidRefsFileFound(err_msg) else: err_msg = ( - f"FileHashStore - find_object: pid refs file not found for pid ({pid}): " - + str(pid_ref_abs_path) + f"Pid reference file not found for pid ({pid}): {pid_ref_abs_path}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise PidRefsDoesNotExist(err_msg) def _store_and_validate_data( @@ -1134,9 +1129,7 @@ def _store_and_validate_data( """ stream = Stream(file) - logging.debug( - "FileHashStore - put_object: Request to put object for pid: %s", pid - ) + self.fhs_logger.debug("Request to put object for pid: %s", pid) with closing(stream): ( object_cid, @@ -1154,10 +1147,7 @@ def _store_and_validate_data( object_metadata = ObjectMetadata( pid, object_cid, obj_file_size, hex_digest_dict ) - logging.debug( - "FileHashStore - put_object: Successfully put object for pid: %s", - pid, - ) + self.fhs_logger.debug("Successfully put object for pid: %s", pid) return object_metadata def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": @@ -1174,9 +1164,7 @@ def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": :return: ObjectMetadata - object that contains the object ID, object file size, and hex digest dictionary. """ - logging.debug( - "FileHashStore - _store_data_only: Request to store data object only." - ) + self.fhs_logger.debug("Request to store data object only.") try: # Ensure the data is a stream @@ -1198,18 +1186,12 @@ def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": ) # The permanent address of the data stored is based on the data's checksum cid = hex_digest_dict.get(self.algorithm) - logging.debug( - "FileHashStore - _store_data_only: Successfully stored object with cid: %s", - cid, - ) + self.fhs_logger.debug("Successfully stored object with cid: %s", cid) return object_metadata # pylint: disable=W0718 except Exception as err: - err_msg = ( - "FileHashStore - _store_data_only: failed to store object." - + f" Unexpected {err=}, {type(err)=}" - ) - logging.error(err_msg) + err_msg = f"Failed to store object. Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) raise err def _move_and_get_checksums( @@ -1239,11 +1221,8 @@ def _move_and_get_checksums( :return: tuple - Object ID, object file size, and hex digest dictionary. """ - debug_msg = ( - "FileHashStore - _move_and_get_checksums: Creating temp" - + f" file and calculating checksums for pid: {pid}" - ) - logging.debug(debug_msg) + debug_msg = f"Creating temp file and calculating checksums for pid: {pid}" + self.fhs_logger.debug(debug_msg) ( hex_digests, tmp_file_name, @@ -1251,10 +1230,7 @@ def _move_and_get_checksums( ) = self._write_to_tmp_file_and_get_hex_digests( stream, additional_algorithm, checksum_algorithm ) - logging.debug( - "FileHashStore - _move_and_get_checksums: Temp file created: %s", - tmp_file_name, - ) + self.fhs_logger.debug("Temp file created: %s", tmp_file_name) # Objects are stored with their content identifier based on the store algorithm object_cid = hex_digests.get(self.algorithm) @@ -1276,60 +1252,46 @@ def _move_and_get_checksums( ) self._create_path(Path(os.path.dirname(abs_file_path))) try: - debug_msg = ( - "FileHashStore - _move_and_get_checksums: Moving temp file to permanent" - + f" location: {abs_file_path}", - ) - logging.debug(debug_msg) + debug_msg = f"Moving temp file to permanent location: {abs_file_path}" + self.fhs_logger.debug(debug_msg) shutil.move(tmp_file_name, abs_file_path) except Exception as err: # Revert storage process - err_msg = ( - "FileHashStore - _move_and_get_checksums:" - + f" Unexpected Error: {err}" - ) - logging.warning(err_msg) + err_msg = f" Unexpected Error: {err}" + self.fhs_logger.warning(err_msg) if os.path.isfile(abs_file_path): # Check to see if object exists before determining whether to delete debug_msg = ( - "FileHashStore - _move_and_get_checksums: Permanent file" - + f" found during exception, checking hex digest for pid: {pid}" + f"Permanent file found, checking hex digest for pid: {pid}" ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) pid_checksum = self.get_hex_digest(pid, self.algorithm) if pid_checksum == hex_digests.get(self.algorithm): # If the checksums match, return and log warning err_msg = ( - "FileHashStore - _move_and_get_checksums: Object exists at:" - + f" {abs_file_path} but an unexpected issue has been encountered." - + " Reference files will not be created and/or tagged." + f"Object exists at: {abs_file_path} but an unexpected issue has been " + + "encountered. Reference files will not be created and/or tagged." ) - logging.warning(err_msg) + self.fhs_logger.warning(err_msg) raise err else: debug_msg = ( - "FileHashStore - _move_and_get_checksums: Object exists at" - + f"{abs_file_path} but the pid object checksum provided does not" - + " match what has been calculated. Deleting object. References will" - + " not be created and/or tagged.", + f"Object exists at {abs_file_path} but the pid object checksum " + + "provided does not match what has been calculated. Deleting object. " + + "References will not be created and/or tagged.", ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) self._delete("objects", abs_file_path) raise err else: - logging.debug( - "FileHashStore - _move_and_get_checksums: Deleting temporary file: %s", - tmp_file_name, - ) + self.fhs_logger.debug("Deleting temporary file: %s", tmp_file_name) self._delete("tmp", tmp_file_name) err_msg = ( f"Object has not been stored for pid: {pid} - an unexpected error has " - f"occurred when moving tmp file to: {object_cid}. Reference files will " - f"not be created and/or tagged. Error: {err}" - ) - logging.warning( - "FileHashStore - _move_and_get_checksums: %s", err_msg + + f"occurred when moving tmp file to: {object_cid}. Reference files will " + + f"not be created and/or tagged. Error: {err}" ) + self.fhs_logger.warning(err_msg) raise else: # If the data object already exists, do not move the file but attempt to verify it @@ -1347,20 +1309,20 @@ def _move_and_get_checksums( except NonMatchingObjSize as nmose: # If any exception is thrown during validation, we do not tag. err_msg = ( - f"FileHashStore - _move_and_get_checksums: Object already exists for pid: {pid}" - + " , deleting temp file. Reference files will not be created and/or tagged" - + f" due to an issue with the supplied pid object metadata. {str(nmose)}" + f"Object already exists for pid: {pid}, deleting temp file. Reference files " + + "will not be created and/or tagged due to an issue with the supplied pid " + + f"object metadata. {str(nmose)}" ) - logging.debug(err_msg) + self.fhs_logger.debug(err_msg) raise NonMatchingObjSize(err_msg) from nmose except NonMatchingChecksum as nmce: # If any exception is thrown during validation, we do not tag. err_msg = ( - f"FileHashStore - _move_and_get_checksums: Object already exists for pid: {pid}" - + " , deleting temp file. Reference files will not be created and/or tagged" - + f" due to an issue with the supplied pid object metadata. {str(nmce)}" + f"Object already exists for pid: {pid}, deleting temp file. Reference files " + + "will not be created and/or tagged due to an issue with the supplied pid " + + f"object metadata. {str(nmce)}" ) - logging.debug(err_msg) + self.fhs_logger.debug(err_msg) raise NonMatchingChecksum(err_msg) from nmce finally: # Ensure that the tmp file has been removed, the data object already exists, so it @@ -1397,10 +1359,8 @@ def _write_to_tmp_file_and_get_hex_digests( tmp_root_path = self._get_store_path("objects") / "tmp" tmp = self._mktmpfile(tmp_root_path) - logging.debug( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests: tmp file created:" - + " %s, calculating hex digests.", - tmp.name, + self.fhs_logger.debug( + "Tmp file created: %s, calculating hex digests.", tmp.name ) tmp_file_completion_flag = False @@ -1416,10 +1376,8 @@ def _write_to_tmp_file_and_get_hex_digests( for hash_algorithm in hash_algorithms: hash_algorithm.update(self._cast_to_bytes(data)) - logging.debug( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests: Object stream" - + " successfully written to tmp file: %s", - tmp.name, + self.fhs_logger.debug( + "Object stream successfully written to tmp file: %s", tmp.name ) hex_digest_list = [ @@ -1430,25 +1388,17 @@ def _write_to_tmp_file_and_get_hex_digests( # Ready for validation and atomic move tmp_file_completion_flag = True - logging.debug( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests: Hex digests calculated." - ) + self.fhs_logger.debug("Hex digests calculated.") return hex_digest_dict, tmp.name, tmp_file_size # pylint: disable=W0718 except Exception as err: - err_msg = ( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" - + f" Unexpected {err=}, {type(err)=}" - ) - logging.error(err_msg) + err_msg = f"Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) # pylint: disable=W0707,W0719 raise Exception(err_msg) except KeyboardInterrupt: - err_msg = ( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" - + " Keyboard interruption by user." - ) - logging.error(err_msg) + err_msg = "Keyboard interruption by user." + self.fhs_logger.error(err_msg) if os.path.isfile(tmp.name): os.remove(tmp.name) finally: @@ -1459,11 +1409,10 @@ def _write_to_tmp_file_and_get_hex_digests( # pylint: disable=W0718 except Exception as err: err_msg = ( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" - + f"Unexpected {err=} while attempting to" - + f" delete tmp file: {tmp.name}, {type(err)=}" + f"Unexpected {err=} while attempting to delete tmp file: " + + f"{tmp.name}, {type(err)=}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) def _mktmpfile(self, path: Path) -> IO[bytes]: """Create a temporary file at the given path ready to be written. @@ -1519,8 +1468,7 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: # are expected to be and throw an exception to inform the client that everything # is in place - and include other issues for context err_msg = ( - f"FileHashStore - store_hashstore_refs_files: Object with cid: {cid}" - f" already exists and is tagged with pid: {pid}." + f"Object with cid: {cid} exists and is tagged with pid: {pid}." ) try: self._verify_hashstore_references( @@ -1530,11 +1478,11 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: cid_refs_path, "Refs file already exists, verifying.", ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise HashStoreRefsAlreadyExists(err_msg) except Exception as e: rev_msg = err_msg + " " + str(e) - logging.error(rev_msg) + self.fhs_logger.error(rev_msg) raise HashStoreRefsAlreadyExists(err_msg) elif os.path.isfile(pid_refs_path) and not os.path.isfile( @@ -1542,21 +1490,18 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: ): # If pid refs exists, the pid has already been claimed and cannot be tagged we # throw an exception immediately - error_msg = ( - f"FileHashStore - store_hashstore_refs_files: Pid refs file already exists" - f" for pid: {pid}." - ) - logging.error(error_msg) + error_msg = f"Pid refs file already exists for pid: {pid}." + self.fhs_logger.error(error_msg) raise PidRefsAlreadyExistsError(error_msg) elif not os.path.isfile(pid_refs_path) and os.path.isfile( cid_refs_path ): debug_msg = ( - f"FileHashStore - store_hashstore_refs_files: pid refs file does not exist" - f" for pid {pid} but cid refs file found at: {cid_refs_path} for cid: {cid}" + f"Pid reference file does not exist for pid {pid} but cid refs file " + + f"found at: {cid_refs_path} for cid: {cid}" ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) # Move the pid refs file pid_tmp_file_path = self._write_refs_file(tmp_root_path, cid, "pid") shutil.move(pid_tmp_file_path, pid_refs_path) @@ -1570,11 +1515,8 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: cid_refs_path, f"Updated existing cid refs file: {cid_refs_path} with pid: {pid}", ) - info_msg = ( - "FileHashStore - store_hashstore_refs_files: Successfully updated " - f"cid: {cid} with pid: {pid}" - ) - logging.info(info_msg) + info_msg = f"Successfully updated cid: {cid} with pid: {pid}" + self.fhs_logger.info(info_msg) return # Move both files after checking the existing status of refs files @@ -1586,11 +1528,8 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: self._verify_hashstore_references( pid, cid, pid_refs_path, cid_refs_path, log_msg ) - info_msg = ( - "FileHashStore - store_hashstore_refs_files: Successfully updated " - f"cid: {cid} with pid: {pid}" - ) - logging.info(info_msg) + info_msg = f"Successfully updated cid: {cid} with pid: {pid}" + self.fhs_logger.info(info_msg) except ( HashStoreRefsAlreadyExists, @@ -1598,11 +1537,13 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: ) as expected_exceptions: raise expected_exceptions - except Exception as unexpected_exception: + except Exception as ue: # For all other unexpected exceptions, we are to revert the tagging process as # much as possible. No exceptions from the reverting process will be thrown. + err_msg = f"Unexpected exception: {ue}, reverting tagging process (untag obj)." + self.fhs_logger.error(err_msg) self._untag_object(pid, cid) - raise unexpected_exception + raise ue finally: # Release cid @@ -1647,8 +1588,8 @@ def _untag_object(self, pid: str, cid: str) -> None: ) # Remove all files confirmed for deletion self._delete_marked_files(untag_obj_delete_list) - info_msg = f"_untag_object: Untagged pid: {pid} with cid: {cid}" - logging.info(info_msg) + info_msg = f"Untagged pid: {pid} with cid: {cid}" + self.fhs_logger.info(info_msg) except OrphanPidRefsFileFound as oprff: # `find_object` throws this exception when the cid refs file doesn't exist, @@ -1664,11 +1605,10 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"_untag_object: Cid refs file does not exist for pid: {pid}." - + " Deleted orphan pid refs file. Additional info: " - + str(oprff) + f"Cid refs file does not exist for pid: {pid}. Deleted orphan pid refs file. " + f"Additional info: {oprff}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) except RefsFileExistsButCidObjMissing as rfebcom: # `find_object` throws this exception when both pid/cid refs files exist but the @@ -1690,11 +1630,11 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"_untag_object: data object for cid: {cid_read}. does not exist, but pid and cid " - f"references files found for pid: {pid}, Deleted pid and cid refs files. " - f"Additional info: " + str(rfebcom) + f"data object for cid: {cid_read}. does not exist, but pid and cid references " + + f"files found for pid: {pid}, Deleted pid and cid refs files. " + + f"Additional info: {rfebcom}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) except PidNotFoundInCidRefsFile as pnficrf: # `find_object` throws this exception when both the pid and cid refs file exists @@ -1710,11 +1650,10 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"_untag_object: pid not found in expected cid refs file for pid: {pid}. " - + "Deleted orphan pid refs file. Additional info: " - + str(pnficrf) + f"Pid not found in expected cid refs file for pid: {pid}. Deleted orphan pid refs " + f"file. Additional info: {pnficrf}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) except PidRefsDoesNotExist as prdne: # `find_object` throws this exception if the pid refs file is not found @@ -1730,10 +1669,10 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"Pid refs file not found, removed pid from cid refs file for cid: {cid}" - + str(prdne) + "Pid refs file not found, removed pid from cid reference file for cid:" + + f" {cid}. Additional info: {prdne}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) def _put_metadata( self, metadata: Union[str, bytes], pid: str, metadata_doc_name: str @@ -1747,9 +1686,7 @@ def _put_metadata( :return: Address of the metadata document. """ - logging.debug( - "FileHashStore - _put_metadata: Request to put metadata for pid: %s", pid - ) + self.fhs_logger.debug("Request to put metadata for pid: %s", pid) # Create metadata tmp file and write to it metadata_stream = Stream(metadata) with closing(metadata_stream): @@ -1768,30 +1705,22 @@ def _put_metadata( parent.mkdir(parents=True, exist_ok=True) # Metadata will be replaced if it exists shutil.move(metadata_tmp, full_path) - logging.debug( - "FileHashStore - _put_metadata: Successfully put metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Successfully put metadata for pid: %s", pid) return full_path except Exception as err: - err_msg = ( - f"FileHashStore - _put_metadata: Unexpected {err=}, {type(err)=}" - ) - logging.error(err_msg) + err_msg = f"Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) if os.path.isfile(metadata_tmp): # Remove tmp metadata, calling app must re-upload - logging.debug( - "FileHashStore - _put_metadata: Deleting metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Deleting metadata for pid: %s", pid) self._delete("metadata", metadata_tmp) raise else: err_msg = ( - f"FileHashStore - _put_metadata: Attempt to move metadata for pid: {pid}" - + f", but metadata temp file not found: {metadata_tmp}" + f"Attempted to move metadata for pid: {pid}, but metadata temp file not found:" + + f" {metadata_tmp}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise FileNotFoundError(err_msg) def _mktmpmetadata(self, stream: "Stream") -> str: @@ -1806,18 +1735,12 @@ def _mktmpmetadata(self, stream: "Stream") -> str: tmp = self._mktmpfile(tmp_root_path) # tmp is a file-like object that is already opened for writing by default - logging.debug( - "FileHashStore - _mktmpmetadata: Writing stream to tmp metadata file: %s", - tmp.name, - ) + self.fhs_logger.debug("Writing stream to tmp metadata file: %s", tmp.name) with tmp as tmp_file: for data in stream: tmp_file.write(self._cast_to_bytes(data)) - logging.debug( - "FileHashStore - _mktmpmetadata: Successfully written to tmp metadata file: %s", - tmp.name, - ) + self.fhs_logger.debug("Successfully written to tmp metadata file: %s", tmp.name) return tmp.name # FileHashStore Utility & Supporting Methods @@ -1836,7 +1759,7 @@ def _delete_marked_files(delete_list: list[str]) -> None: warn_msg = f"Unable to remove {obj} in given delete_list. " + str(e) logging.warning(warn_msg) else: - raise ValueError("delete_marked_files: list cannot be None") + raise ValueError("list cannot be None") def _mark_pid_refs_file_for_deletion( self, pid: str, delete_list: List[str], pid_refs_path: Path @@ -1851,11 +1774,8 @@ def _mark_pid_refs_file_for_deletion( delete_list.append(self._rename_path_for_deletion(pid_refs_path)) except Exception as e: - err_msg = ( - f"Unable to delete pid refs file: {pid_refs_path} for pid: {pid}. " - + str(e) - ) - logging.error(err_msg) + err_msg = f"Unable to delete pid refs file: {pid_refs_path} for pid: {pid}. Details: {e}" + self.fhs_logger.error(err_msg) def _remove_pid_and_handle_cid_refs_deletion( self, pid: str, delete_list: List[str], cid_refs_path: Path @@ -1879,7 +1799,7 @@ def _remove_pid_and_handle_cid_refs_deletion( f"Unable to delete remove pid from cid refs file: {cid_refs_path} for pid:" f" {pid}. " + str(e) ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) def _validate_and_check_cid_lock( self, pid: str, cid: str, cid_to_check: str @@ -1899,6 +1819,7 @@ def _validate_and_check_cid_lock( f"_validate_and_check_cid_lock: cid provided: {cid_to_check} does not " f"match untag request for cid: {cid} and pid: {pid}" ) + self.fhs_logger.error(err_msg) raise ValueError(err_msg) self._check_object_locked_cids(cid) @@ -1914,11 +1835,7 @@ def _write_refs_file(self, path: Path, ref_id: str, ref_type: str) -> str: :return: tmp_file_path - Path to the tmp refs file """ - logging.debug( - "FileHashStore - _write_refs_file: Writing id (%s) into a tmp file in: %s", - ref_id, - path, - ) + self.fhs_logger.debug("Writing id (%s) into a tmp file in: %s", ref_id, path) try: with self._mktmpfile(path) as tmp_file: tmp_file_path = tmp_file.name @@ -1931,10 +1848,10 @@ def _write_refs_file(self, path: Path, ref_id: str, ref_type: str) -> str: except Exception as err: err_msg = ( - "FileHashStore - _write_refs_file: failed to write cid refs file for pid:" - + f" {ref_id} into path: {path}. Unexpected {err=}, {type(err)=}" + f"Failed to write cid refs file for pid: {ref_id} into path: {path}. " + + f"Unexpected error: {err=}, {type(err)=}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise err def _update_refs_file( @@ -1946,17 +1863,14 @@ def _update_refs_file( :param str ref_id: Authority-based or persistent identifier of the object. :param str update_type: 'add' or 'remove' """ - debug_msg = ( - f"FileHashStore - _update_refs_file: Updating ({update_type}) for ref_id: {ref_id}" - + f" at refs file: {refs_file_path}." - ) - logging.debug(debug_msg) + debug_msg = f"Updating ({update_type}) for ref_id: {ref_id} at refs file: {refs_file_path}." + self.fhs_logger.debug(debug_msg) if not os.path.isfile(refs_file_path): err_msg = ( - f"FileHashStore - _update_refs_file: {refs_file_path} does not exist." - + f" Cannot {update_type} ref_id: {ref_id}" + f"Refs file: {refs_file_path} does not exist." + + f"Cannot {update_type} ref_id: {ref_id}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise FileNotFoundError(err_msg) try: if update_type == "add": @@ -1982,16 +1896,16 @@ def _update_refs_file( ref_file.writelines(new_pid_lines) ref_file.truncate() debug_msg = ( - f"FileHashStore - _update_refs_file: Update ({update_type}) for ref_id: {ref_id}" - + f" completed on refs file: {refs_file_path}." + f"Update ({update_type}) for ref_id: {ref_id} " + + f"completed on refs file: {refs_file_path}." ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) except Exception as err: err_msg = ( - f"FileHashStore - _update_refs_file: failed to {update_type} for ref_id: {ref_id}" + f"Failed to {update_type} for ref_id: {ref_id}" + f" at refs file: {refs_file_path}. Unexpected {err=}, {type(err)=}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise err @staticmethod @@ -2037,20 +1951,18 @@ def _verify_object_information( if file_size_to_validate is not None and file_size_to_validate > 0: if file_size_to_validate != tmp_file_size: err_msg = ( - "FileHashStore - _verify_object_information: Object file size calculated: " - + f" {tmp_file_size} does not match with expected size:" - + f" {file_size_to_validate}." + f"Object file size calculated: {tmp_file_size} does not match with expected " + f"size: {file_size_to_validate}." ) if pid is not None: self._delete(entity, tmp_file_name) err_msg_for_pid = ( - err_msg - + f" Tmp file deleted and file not stored for pid: {pid}" + f"{err_msg} Tmp file deleted and file not stored for pid: {pid}" ) - logging.debug(err_msg_for_pid) + self.fhs_logger.debug(err_msg_for_pid) raise NonMatchingObjSize(err_msg_for_pid) else: - logging.debug(err_msg) + self.fhs_logger.debug(err_msg) raise NonMatchingObjSize(err_msg) if checksum_algorithm is not None and checksum is not None: if checksum_algorithm not in hex_digests: @@ -2071,21 +1983,19 @@ def _verify_object_information( ) if hex_digest_calculated != checksum: err_msg = ( - "FileHashStore - _verify_object_information: checksum_algorithm" - + f" ({checksum_algorithm}) cannot be found in the default hex digests" - + f" dict, but is supported. New checksum calculated: " - f"{hex_digest_calculated}, does not match what has been provided: " + f"Checksum_algorithm ({checksum_algorithm}) cannot be found in the " + + "default hex digests dict, but is supported. New checksum calculated: " + + f"{hex_digest_calculated}, does not match what has been provided: " + checksum ) - logging.debug(err_msg) + self.fhs_logger.debug(err_msg) raise NonMatchingChecksum(err_msg) else: hex_digest_stored = hex_digests[checksum_algorithm] if hex_digest_stored != checksum.lower(): err_msg = ( - "FileHashStore - _verify_object_information: Hex digest and checksum" - + f" do not match - file not stored for pid: {pid}. Algorithm:" - + f" {checksum_algorithm}. Checksum provided: {checksum} !=" + f"Hex digest and checksum do not match - file not stored for pid: {pid}. " + + f"Algorithm: {checksum_algorithm}. Checksum provided: {checksum} !=" + f" HexDigest: {hex_digest_stored}." ) if pid is not None: @@ -2094,10 +2004,10 @@ def _verify_object_information( err_msg_for_pid = ( err_msg + f" Tmp file ({tmp_file_name}) deleted." ) - logging.debug(err_msg_for_pid) + self.fhs_logger.error(err_msg_for_pid) raise NonMatchingChecksum(err_msg_for_pid) else: - logging.debug(err_msg) + self.fhs_logger.error(err_msg) raise NonMatchingChecksum(err_msg) def _verify_hashstore_references( @@ -2117,11 +2027,8 @@ def _verify_hashstore_references( :param path cid_refs_path: Path to cid refs file :param str additional_log_string: String to append to exception statement """ - debug_msg = ( - f"FileHashStore - _verify_hashstore_references: verifying pid ({pid})" - + f" and cid ({cid}) refs files. Additional Note: {additional_log_string}" - ) - logging.debug(debug_msg) + debug_msg = f"Verifying pid ({pid}) and cid ({cid}) refs files. Note: {additional_log_string}" + self.fhs_logger.debug(debug_msg) if pid_refs_path is None: pid_refs_path = self._get_hashstore_pid_refs_path(pid) if cid_refs_path is None: @@ -2129,41 +2036,33 @@ def _verify_hashstore_references( # Check that reference files were created if not os.path.isfile(pid_refs_path): - err_msg = ( - "FileHashStore - _verify_hashstore_references: Pid refs file missing: " - + str(pid_refs_path) - + f" . Additional Context: {additional_log_string}" - ) - logging.error(err_msg) + err_msg = f" Pid refs file missing: {pid_refs_path}. Note: {additional_log_string}" + self.fhs_logger.error(err_msg) raise PidRefsFileNotFound(err_msg) if not os.path.isfile(cid_refs_path): err_msg = ( - "FileHashStore - _verify_hashstore_references: Cid refs file missing: " - + str(cid_refs_path) - + f" . Additional Context: {additional_log_string}" + f"Cid refs file missing: {cid_refs_path}. Note: {additional_log_string}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise CidRefsFileNotFound(err_msg) # Check the content of the reference files # Start with the cid retrieved_cid = self._read_small_file_content(pid_refs_path) if retrieved_cid != cid: err_msg = ( - "FileHashStore - _verify_hashstore_references: Pid refs file exists" - + f" ({pid_refs_path}) but cid ({cid}) does not match." - + f" Additional Context: {additional_log_string}" + f"Pid refs file exists ({pid_refs_path}) but cid ({cid}) does not match." + + f" Note: {additional_log_string}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise PidRefsContentError(err_msg) # Then the pid pid_found = self._is_string_in_refs_file(pid, cid_refs_path) if not pid_found: err_msg = ( - "FileHashStore - _verify_hashstore_references: Cid refs file exists" - + f" ({cid_refs_path}) but pid ({pid}) not found." - + f" Additional Context: {additional_log_string}" + f"Cid refs file exists ({cid_refs_path}) but pid ({pid}) not found." + + f" Note: {additional_log_string}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise CidRefsContentError(err_msg) def _delete_object_only(self, cid: str) -> None: @@ -2185,17 +2084,17 @@ def _delete_object_only(self, cid: str) -> None: with self.object_cid_condition_mp: # Wait for the cid to release if it's in use while cid in self.object_locked_cids_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.object_cid_condition_mp.wait() # Modify reference_locked_cids consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_cids_mp.append(cid) else: with self.object_cid_condition_th: while cid in self.object_locked_cids_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.object_cid_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.object_locked_cids_th.append(cid) try: @@ -2208,12 +2107,12 @@ def _delete_object_only(self, cid: str) -> None: ) if self.use_multiprocessing: with self.object_cid_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.object_locked_cids_mp.remove(cid) self.object_cid_condition_mp.notify() else: with self.object_cid_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.object_locked_cids_th.remove(cid) self.object_cid_condition_th.notify() From d6c3c694194a36ae4429368db53ce188ab9e0875 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Fri, 27 Sep 2024 09:22:16 -0700 Subject: [PATCH 09/12] Refactor '_delete_object_only', add missing logging statements and fix potential dead lock due to sync being outside try block --- src/hashstore/filehashstore.py | 57 +++++++++------------------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 14fb9cb5..7cda82c1 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -2071,50 +2071,23 @@ def _delete_object_only(self, cid: str) -> None: :param str cid: Content identifier """ - cid_refs_abs_path = self._get_hashstore_cid_refs_path(cid) - # If the refs file still exists, do not delete the object - if not os.path.isfile(cid_refs_abs_path): - sync_begin_debug_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) to locked list." - ) - sync_wait_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) is locked. Waiting." - ) - if self.use_multiprocessing: - with self.object_cid_condition_mp: - # Wait for the cid to release if it's in use - while cid in self.object_locked_cids_mp: - self.fhs_logger.debug(sync_wait_msg) - self.object_cid_condition_mp.wait() - # Modify reference_locked_cids consecutively - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_cids_mp.append(cid) - else: - with self.object_cid_condition_th: - while cid in self.object_locked_cids_th: - self.fhs_logger.debug(sync_wait_msg) - self.object_cid_condition_th.wait() - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_cids_th.append(cid) + try: + cid_refs_abs_path = self._get_hashstore_cid_refs_path(cid) + # If the refs file still exists, do not delete the object + self._synchronize_object_locked_cids(cid) + if os.path.isfile(cid_refs_abs_path): + debug_msg = ( + f"Cid reference file exists for: {cid}, skipping delete request." + ) + self.fhs_logger.debug(debug_msg) - try: + else: self._delete("objects", cid) - finally: - # Release cid - end_sync_debug_msg = ( - f"FileHashStore - delete_object: Releasing cid ({cid})" - + " from locked list" - ) - if self.use_multiprocessing: - with self.object_cid_condition_mp: - self.fhs_logger.debug(end_sync_debug_msg) - self.object_locked_cids_mp.remove(cid) - self.object_cid_condition_mp.notify() - else: - with self.object_cid_condition_th: - self.fhs_logger.debug(end_sync_debug_msg) - self.object_locked_cids_th.remove(cid) - self.object_cid_condition_th.notify() + info_msg = f"Deleted object only for cid: {cid}" + self.fhs_logger.info(info_msg) + + finally: + self._release_object_locked_cids(cid) def _check_arg_algorithms_and_checksum( self, From dbd57de2b019749f9fb09a98b6fa763337c2ae62 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Fri, 27 Sep 2024 10:24:24 -0700 Subject: [PATCH 10/12] Clean-up logging in 'filehashstore' remaining supporting and core methods, and optimize sync method logging calls, add missing logging statements --- src/hashstore/filehashstore.py | 139 ++++++++++++--------------------- 1 file changed, 50 insertions(+), 89 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 7cda82c1..380e521a 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -2132,7 +2132,7 @@ def _check_arg_format_id(self, format_id: str, method: str) -> str: """ if format_id and not format_id.strip(): err_msg = f"FileHashStore - {method}: Format_id cannot be empty." - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise ValueError(err_msg) elif format_id is None: # Use default value set by hashstore config @@ -2156,19 +2156,19 @@ def _refine_algorithm_list( self._clean_algorithm(checksum_algorithm) if checksum_algorithm in self.other_algo_list: debug_additional_other_algo_str = ( - f"FileHashStore - _refine_algorithm_list: checksum algo: {checksum_algorithm}" - + " found in other_algo_lists, adding to list of algorithms to calculate." + f"Checksum algo: {checksum_algorithm} found in other_algo_lists, adding to " + + f"list of algorithms to calculate." ) - logging.debug(debug_additional_other_algo_str) + self.fhs_logger.debug(debug_additional_other_algo_str) algorithm_list_to_calculate.append(checksum_algorithm) if additional_algorithm is not None: self._clean_algorithm(additional_algorithm) if additional_algorithm in self.other_algo_list: debug_additional_other_algo_str = ( - f"FileHashStore - _refine_algorithm_list: addit algo: {additional_algorithm}" - + " found in other_algo_lists, adding to list of algorithms to calculate." + f"Additional algo: {additional_algorithm} found in other_algo_lists, " + + f"adding to list of algorithms to calculate." ) - logging.debug(debug_additional_other_algo_str) + self.fhs_logger.debug(debug_additional_other_algo_str) algorithm_list_to_calculate.append(additional_algorithm) # Remove duplicates @@ -2196,11 +2196,8 @@ def _clean_algorithm(self, algorithm_string: str) -> str: cleaned_string not in self.default_algo_list and cleaned_string not in self.other_algo_list ): - err_msg = ( - "FileHashStore: _clean_algorithm: Algorithm not supported:" - + cleaned_string - ) - logging.error(err_msg) + err_msg = f"Algorithm not supported: {cleaned_string}" + self.fhs_logger.error(err_msg) raise UnsupportedAlgorithm(err_msg) return cleaned_string @@ -2365,7 +2362,7 @@ def _delete(self, entity: str, file: Union[str, Path]) -> None: except Exception as err: err_msg = f"FileHashStore - delete(): Unexpected {err=}, {type(err)=}" - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise err def _create_path(self, path: Path) -> None: @@ -2437,9 +2434,8 @@ def _get_hashstore_data_object_path(self, cid_or_relative_path: str) -> Path: return Path(relpath) else: raise FileNotFoundError( - "FileHashStore - _get_hashstore_data_object_path: could not locate a" - + "data object in '/objects' for the supplied cid_or_relative_path: " - + cid_or_relative_path + "Could not locate a data object in '/objects' for the supplied " + + f"cid_or_relative_path: {cid_or_relative_path}" ) def _get_hashstore_metadata_path(self, metadata_relative_path: str) -> Path: @@ -2460,9 +2456,8 @@ def _get_hashstore_metadata_path(self, metadata_relative_path: str) -> Path: return Path(metadata_relative_path) else: raise FileNotFoundError( - "FileHashStore - _get_hashstore_metadata_path: could not locate a" - + "metadata object in '/metadata' for the supplied metadata_relative_path: " - + str(metadata_relative_path) + "Could not locate a metadata object in '/metadata' for the supplied " + + f"metadata_relative_path: {metadata_relative_path}" ) def _get_hashstore_pid_refs_path(self, pid: str) -> Path: @@ -2504,27 +2499,17 @@ def _synchronize_object_locked_pids(self, pid: str) -> None: with self.object_pid_condition_mp: # Wait for the cid to release if it's being tagged while pid in self.object_locked_pids_mp: - logging.debug( - f"_synchronize_object_locked_pids: Pid ({pid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Pid ({pid}) is locked. Waiting.") self.object_pid_condition_mp.wait() self.object_locked_pids_mp.append(pid) - logging.debug( - f"_synchronize_object_locked_pids: Synchronizing object_locked_pids_mp for" - + f" pid: {pid}" - ) + self.fhs_logger.debug(f"Synchronizing object_locked_pids_mp for pid: {pid}") else: with self.object_pid_condition_th: while pid in self.object_locked_pids_th: - logging.debug( - f"_synchronize_object_locked_pids: Pid ({pid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Pid ({pid}) is locked. Waiting.") self.object_pid_condition_th.wait() self.object_locked_pids_th.append(pid) - logging.debug( - f"_synchronize_object_locked_pids: Synchronizing object_locked_pids_th for" - + f" pid: {pid}" - ) + self.fhs_logger.debug(f"Synchronizing object_locked_pids_th for pid: {pid}") def _release_object_locked_pids(self, pid: str) -> None: """Remove the given persistent identifier from 'object_locked_pids' and notify other @@ -2536,11 +2521,15 @@ def _release_object_locked_pids(self, pid: str) -> None: with self.object_pid_condition_mp: self.object_locked_pids_mp.remove(pid) self.object_pid_condition_mp.notify() + end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_mp." + self.fhs_logger.debug(end_sync_debug_msg) else: # Release pid with self.object_pid_condition_th: self.object_locked_pids_th.remove(pid) self.object_pid_condition_th.notify() + end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_th." + self.fhs_logger.debug(end_sync_debug_msg) def _synchronize_object_locked_cids(self, cid: str) -> None: """Multiple threads may access a data object via its 'cid' or the respective 'cid @@ -2553,28 +2542,18 @@ def _synchronize_object_locked_cids(self, cid: str) -> None: with self.object_cid_condition_mp: # Wait for the cid to release if it's being tagged while cid in self.object_locked_cids_mp: - logging.debug( - f"synchronize_referenced_locked_cids: Cid ({cid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Cid ({cid}) is locked. Waiting.") self.object_cid_condition_mp.wait() # Modify reference_locked_cids consecutively self.object_locked_cids_mp.append(cid) - logging.debug( - f"synchronize_referenced_locked_cids: Synchronizing object_locked_cids_mp for" - + f" cid: {cid}" - ) + self.fhs_logger.debug(f"Synchronizing object_locked_cids_mp for cid: {cid}") else: with self.object_cid_condition_th: while cid in self.object_locked_cids_th: - logging.debug( - f"synchronize_referenced_locked_cids: Cid ({cid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Cid ({cid}) is locked. Waiting.") self.object_cid_condition_th.wait() self.object_locked_cids_th.append(cid) - logging.debug( - f"synchronize_referenced_locked_cids: Synchronizing object_locked_cids_th for" - + f" cid: {cid}" - ) + self.fhs_logger.debug(f"Synchronizing object_locked_cids_th for cid: {cid}") def _check_object_locked_cids(self, cid: str) -> None: """Check that a given content identifier is currently locked (found in the @@ -2584,13 +2563,13 @@ def _check_object_locked_cids(self, cid: str) -> None: """ if self.use_multiprocessing: if cid not in self.object_locked_cids_mp: - err_msg = f"_check_object_locked_cids: cid {cid} is not locked." - logging.error(err_msg) + err_msg = f"Cid {cid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) else: if cid not in self.object_locked_cids_th: - err_msg = f"_check_object_locked_cids: cid {cid} is not locked." - logging.error(err_msg) + err_msg = f"Cid {cid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) def _release_object_locked_cids(self, cid: str) -> None: @@ -2603,20 +2582,14 @@ def _release_object_locked_cids(self, cid: str) -> None: with self.object_cid_condition_mp: self.object_locked_cids_mp.remove(cid) self.object_cid_condition_mp.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_object_locked_cids: Releasing cid ({cid}) from" - + " object_cid_condition_mp." - ) - logging.debug(end_sync_debug_msg) + end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_mp." + self.fhs_logger.debug(end_sync_debug_msg) else: with self.object_cid_condition_th: self.object_locked_cids_th.remove(cid) self.object_cid_condition_th.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_object_locked_cids: Releasing cid ({cid}) from" - + " object_cid_condition_th." - ) - logging.debug(end_sync_debug_msg) + end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_th." + self.fhs_logger.debug(end_sync_debug_msg) def _synchronize_referenced_locked_pids(self, pid: str) -> None: """Multiple threads may interact with a pid (to tag, untag, delete) and these actions @@ -2628,28 +2601,22 @@ def _synchronize_referenced_locked_pids(self, pid: str) -> None: with self.reference_pid_condition_mp: # Wait for the pid to release if it's in use while pid in self.reference_locked_pids_mp: - logging.debug( - f"_synchronize_referenced_locked_pids: Pid ({pid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Pid ({pid}) is locked. Waiting.") self.reference_pid_condition_mp.wait() # Modify reference_locked_pids consecutively self.reference_locked_pids_mp.append(pid) - logging.debug( - f"_synchronize_referenced_locked_pids: Synchronizing reference_locked_pids_mp" - + f" for pid: {pid}" - ) + self.fhs_logger.debug( + f"Synchronizing reference_locked_pids_mp for pid: {pid}" + ) else: with self.reference_pid_condition_th: while pid in self.reference_locked_pids_th: - logging.debug( - f"_synchronize_referenced_locked_pids: Pid ({pid}) is locked. Waiting." - ) + logging.debug(f"Pid ({pid}) is locked. Waiting.") self.reference_pid_condition_th.wait() self.reference_locked_pids_th.append(pid) - logging.debug( - f"_synchronize_referenced_locked_pids: Synchronizing reference_locked_pids_th" - + f" for pid: {pid}" - ) + self.fhs_logger.debug( + f"Synchronizing reference_locked_pids_th for pid: {pid}" + ) def _check_reference_locked_pids(self, pid: str) -> None: """Check that a given persistent identifier is currently locked (found in the @@ -2659,13 +2626,13 @@ def _check_reference_locked_pids(self, pid: str) -> None: """ if self.use_multiprocessing: if pid not in self.reference_locked_pids_mp: - err_msg = f"_check_reference_locked_pids: pid {pid} is not locked." - logging.error(err_msg) + err_msg = f"Pid {pid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) else: if pid not in self.reference_locked_pids_th: - err_msg = f"_check_reference_locked_pids: pid {pid} is not locked." - logging.error(err_msg) + err_msg = f"Pid {pid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) def _release_reference_locked_pids(self, pid: str) -> None: @@ -2678,21 +2645,15 @@ def _release_reference_locked_pids(self, pid: str) -> None: with self.reference_pid_condition_mp: self.reference_locked_pids_mp.remove(pid) self.reference_pid_condition_mp.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_reference_locked_pids: Releasing pid ({pid}) from" - + " reference_locked_pids_mp." - ) - logging.debug(end_sync_debug_msg) + end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_mp." + self.fhs_logger.debug(end_sync_debug_msg) else: # Release pid with self.reference_pid_condition_th: self.reference_locked_pids_th.remove(pid) self.reference_pid_condition_th.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_reference_locked_pids: Releasing pid ({pid}) from" - + " reference_locked_pids_th." - ) - logging.debug(end_sync_debug_msg) + end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_th." + self.fhs_logger.debug(end_sync_debug_msg) # Other Static Methods @staticmethod From 6256bcb29d94f2b72a839864b70848537e75d405 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Fri, 27 Sep 2024 11:23:19 -0700 Subject: [PATCH 11/12] Refactor and fix bug in 'store_object' where sync method was not within try block which could lead to dead lock --- src/hashstore/filehashstore.py | 96 +++++++++++++++++----------------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 380e521a..3ae0f3f1 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -536,48 +536,48 @@ def store_object( additional_algorithm, checksum, checksum_algorithm ) - sync_begin_debug_msg = f"Adding pid ({pid}) to locked list." - err_msg = f"Duplicate object request encountered for pid: {pid}. Already in progress." - if self.use_multiprocessing: - with self.object_pid_condition_mp: - # Wait for the pid to release if it's in use - if pid in self.object_locked_pids_mp: - self.fhs_logger.error(err_msg) - raise StoreObjectForPidAlreadyInProgress(err_msg) - # Modify object_locked_pids consecutively - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_pids_mp.append(pid) - else: - with self.object_pid_condition_th: - if pid in self.object_locked_pids_th: - logging.error(err_msg) - raise StoreObjectForPidAlreadyInProgress(err_msg) - self.fhs_logger.debug(sync_begin_debug_msg) - self.object_locked_pids_th.append(pid) try: - self.fhs_logger.debug("Attempting to store object for pid: %s", pid) - object_metadata = self._store_and_validate_data( - pid, - data, - additional_algorithm=additional_algorithm_checked, - checksum=checksum, - checksum_algorithm=checksum_algorithm_checked, - file_size_to_validate=expected_object_size, + err_msg = ( + f"Duplicate object request for pid: {pid}. Already in progress." ) - self.fhs_logger.debug("Attempting to tag object for pid: %s", pid) - cid = object_metadata.cid - self.tag_object(pid, cid) - self.fhs_logger.info("Successfully stored object for pid: %s", pid) + if self.use_multiprocessing: + with self.object_pid_condition_mp: + # Raise exception immediately if pid is in use + if pid in self.object_locked_pids_mp: + self.fhs_logger.error(err_msg) + raise StoreObjectForPidAlreadyInProgress(err_msg) + else: + with self.object_pid_condition_th: + if pid in self.object_locked_pids_th: + logging.error(err_msg) + raise StoreObjectForPidAlreadyInProgress(err_msg) + + try: + self._synchronize_object_locked_pids(pid) + + self.fhs_logger.debug("Attempting to store object for pid: %s", pid) + object_metadata = self._store_and_validate_data( + pid, + data, + additional_algorithm=additional_algorithm_checked, + checksum=checksum, + checksum_algorithm=checksum_algorithm_checked, + file_size_to_validate=expected_object_size, + ) + self.fhs_logger.debug("Attempting to tag object for pid: %s", pid) + cid = object_metadata.cid + self.tag_object(pid, cid) + self.fhs_logger.info("Successfully stored object for pid: %s", pid) + finally: + # Release pid + self._release_object_locked_pids(pid) except Exception as err: err_msg = ( - f"failed to store object for pid: {pid}. Reference files will not be created " - f"or tagged. Unexpected error: {err})" + f"Failed to store object for pid: {pid}. Reference files will not be " + f"created or tagged. Unexpected error: {err})" ) self.fhs_logger.error(err_msg) raise err - finally: - # Release pid - self._release_object_locked_pids(pid) return object_metadata @@ -2521,15 +2521,13 @@ def _release_object_locked_pids(self, pid: str) -> None: with self.object_pid_condition_mp: self.object_locked_pids_mp.remove(pid) self.object_pid_condition_mp.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_mp." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_mp.") else: # Release pid with self.object_pid_condition_th: self.object_locked_pids_th.remove(pid) self.object_pid_condition_th.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from object_locked_pids_th." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_th.") def _synchronize_object_locked_cids(self, cid: str) -> None: """Multiple threads may access a data object via its 'cid' or the respective 'cid @@ -2582,14 +2580,16 @@ def _release_object_locked_cids(self, cid: str) -> None: with self.object_cid_condition_mp: self.object_locked_cids_mp.remove(cid) self.object_cid_condition_mp.notify() - end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_mp." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing cid ({cid}) from object_cid_condition_mp." + ) else: with self.object_cid_condition_th: self.object_locked_cids_th.remove(cid) self.object_cid_condition_th.notify() - end_sync_debug_msg = f"Releasing cid ({cid}) from object_cid_condition_th." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing cid ({cid}) from object_cid_condition_th." + ) def _synchronize_referenced_locked_pids(self, pid: str) -> None: """Multiple threads may interact with a pid (to tag, untag, delete) and these actions @@ -2645,15 +2645,17 @@ def _release_reference_locked_pids(self, pid: str) -> None: with self.reference_pid_condition_mp: self.reference_locked_pids_mp.remove(pid) self.reference_pid_condition_mp.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_mp." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing pid ({pid}) from reference_locked_pids_mp." + ) else: # Release pid with self.reference_pid_condition_th: self.reference_locked_pids_th.remove(pid) self.reference_pid_condition_th.notify() - end_sync_debug_msg = f"Releasing pid ({pid}) from reference_locked_pids_th." - self.fhs_logger.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing pid ({pid}) from reference_locked_pids_th." + ) # Other Static Methods @staticmethod From fe5e7dbe60cfbf024b9fbb2c677f27e4c48f16d8 Mon Sep 17 00:00:00 2001 From: Dou Mok Date: Tue, 1 Oct 2024 10:20:12 -0700 Subject: [PATCH 12/12] Clean-up code/resolve linting errors for missed unused variables and for long lines --- src/hashstore/filehashstore.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 3ae0f3f1..74b9c600 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -759,8 +759,6 @@ def delete_object(self, pid: str) -> None: # Storing and deleting objects are synchronized together # Duplicate store object requests for a pid are rejected, but deleting an object # will wait for a pid to be released if it's found to be in use before proceeding. - sync_begin_debug_msg = f"Pid ({pid}) to locked list." - sync_wait_msg = f"Pid ({pid}) is locked. Waiting." try: # Before we begin deletion process, we look for the `cid` by calling @@ -983,7 +981,10 @@ def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: try: full_path_without_directory = Path(self.metadata / rel_path / pid_doc) self._delete("metadata", full_path_without_directory) - info_string = f"Successfully deleted metadata for pid: {pid} for format_id: {format_id}" + info_string = ( + f"Deleted metadata for pid: {pid} for format_id: {format_id}" + ) + self.fhs_logger.info(info_string) finally: # Release pid @@ -1774,7 +1775,9 @@ def _mark_pid_refs_file_for_deletion( delete_list.append(self._rename_path_for_deletion(pid_refs_path)) except Exception as e: - err_msg = f"Unable to delete pid refs file: {pid_refs_path} for pid: {pid}. Details: {e}" + err_msg = ( + f"Unable to delete pid refs file: {pid_refs_path} for pid: {pid}. {e}" + ) self.fhs_logger.error(err_msg) def _remove_pid_and_handle_cid_refs_deletion( @@ -2027,7 +2030,9 @@ def _verify_hashstore_references( :param path cid_refs_path: Path to cid refs file :param str additional_log_string: String to append to exception statement """ - debug_msg = f"Verifying pid ({pid}) and cid ({cid}) refs files. Note: {additional_log_string}" + debug_msg = ( + f"Verifying pid ({pid}) and cid ({cid}) refs files. {additional_log_string}" + ) self.fhs_logger.debug(debug_msg) if pid_refs_path is None: pid_refs_path = self._get_hashstore_pid_refs_path(pid)