diff --git a/src/hashstore/filehashstore.py b/src/hashstore/filehashstore.py index 8e2f095a..74b9c600 100644 --- a/src/hashstore/filehashstore.py +++ b/src/hashstore/filehashstore.py @@ -80,53 +80,7 @@ class FileHashStore(HashStore): ] def __init__(self, properties=None): - # Variables to orchestrate parallelization - # Check to see whether a multiprocessing or threading sync lock should be used - self.use_multiprocessing = os.getenv("USE_MULTIPROCESSING", "False") == "True" - if self.use_multiprocessing == "True": - # Create multiprocessing synchronization variables - # Synchronization values for object locked pids - self.object_pid_lock_mp = multiprocessing.Lock() - self.object_pid_condition_mp = multiprocessing.Condition( - self.object_pid_lock_mp - ) - self.object_locked_pids_mp = multiprocessing.Manager().list() - # Synchronization values for object locked cids - self.object_cid_lock_mp = multiprocessing.Lock() - self.object_cid_condition_mp = multiprocessing.Condition( - self.object_cid_lock_mp - ) - self.object_locked_cids_mp = multiprocessing.Manager().list() - # Synchronization values for metadata locked documents - self.metadata_lock_mp = multiprocessing.Lock() - self.metadata_condition_mp = multiprocessing.Condition( - self.metadata_lock_mp - ) - self.metadata_locked_docs_mp = multiprocessing.Manager().list() - # Synchronization values for reference locked pids - self.reference_pid_lock_mp = multiprocessing.Lock() - self.reference_pid_condition_mp = multiprocessing.Condition( - self.reference_pid_lock_mp - ) - self.reference_locked_pids_mp = multiprocessing.Manager().list() - else: - # Create threading synchronization variables - # Synchronization values for object locked pids - self.object_pid_lock_th = threading.Lock() - self.object_pid_condition_th = threading.Condition(self.object_pid_lock_th) - self.object_locked_pids_th = [] - # Synchronization values for object locked cids - self.object_cid_lock_th = threading.Lock() - self.object_cid_condition_th = threading.Condition(self.object_cid_lock_th) - self.object_locked_cids_th = [] - # Synchronization values for metadata locked documents - self.metadata_lock_th = threading.Lock() - self.metadata_condition_th = threading.Condition(self.metadata_lock_th) - self.metadata_locked_docs_th = [] - # Synchronization values for reference locked pids - self.reference_pid_lock_th = threading.Lock() - self.reference_pid_condition_th = threading.Condition(self.metadata_lock_th) - self.reference_locked_pids_th = [] + self.fhs_logger = logging.getLogger(__name__) # Now check properties if properties: # Validate properties against existing configuration if present @@ -147,7 +101,7 @@ def __init__(self, properties=None): self._verify_hashstore_properties(properties, prop_store_path) # If no exceptions thrown, FileHashStore ready for initialization - logging.debug("FileHashStore - Initializing, properties verified.") + self.fhs_logger.debug("Initializing, properties verified.") self.root = Path(prop_store_path) self.depth = prop_store_depth self.width = prop_store_width @@ -155,8 +109,8 @@ def __init__(self, properties=None): # Write 'hashstore.yaml' to store path if not os.path.isfile(self.hashstore_configuration_yaml): # pylint: disable=W1201 - logging.debug( - "FileHashStore - HashStore does not exist & configuration file not found." + self.fhs_logger.debug( + "HashStore does not exist & configuration file not found." + " Writing configuration file." ) self._write_properties(properties) @@ -176,17 +130,71 @@ def __init__(self, properties=None): self._create_path(self.refs / "tmp") self._create_path(self.refs / "pids") self._create_path(self.refs / "cids") - logging.debug( - "FileHashStore - Initialization success. Store root: %s", self.root - ) + + # Variables to orchestrate parallelization + # Check to see whether a multiprocessing or threading sync lock should be used + self.use_multiprocessing = ( + os.getenv("USE_MULTIPROCESSING", "False") == "True" + ) + if self.use_multiprocessing == "True": + # Create multiprocessing synchronization variables + # Synchronization values for object locked pids + self.object_pid_lock_mp = multiprocessing.Lock() + self.object_pid_condition_mp = multiprocessing.Condition( + self.object_pid_lock_mp + ) + self.object_locked_pids_mp = multiprocessing.Manager().list() + # Synchronization values for object locked cids + self.object_cid_lock_mp = multiprocessing.Lock() + self.object_cid_condition_mp = multiprocessing.Condition( + self.object_cid_lock_mp + ) + self.object_locked_cids_mp = multiprocessing.Manager().list() + # Synchronization values for metadata locked documents + self.metadata_lock_mp = multiprocessing.Lock() + self.metadata_condition_mp = multiprocessing.Condition( + self.metadata_lock_mp + ) + self.metadata_locked_docs_mp = multiprocessing.Manager().list() + # Synchronization values for reference locked pids + self.reference_pid_lock_mp = multiprocessing.Lock() + self.reference_pid_condition_mp = multiprocessing.Condition( + self.reference_pid_lock_mp + ) + self.reference_locked_pids_mp = multiprocessing.Manager().list() + else: + # Create threading synchronization variables + # Synchronization values for object locked pids + self.object_pid_lock_th = threading.Lock() + self.object_pid_condition_th = threading.Condition( + self.object_pid_lock_th + ) + self.object_locked_pids_th = [] + # Synchronization values for object locked cids + self.object_cid_lock_th = threading.Lock() + self.object_cid_condition_th = threading.Condition( + self.object_cid_lock_th + ) + self.object_locked_cids_th = [] + # Synchronization values for metadata locked documents + self.metadata_lock_th = threading.Lock() + self.metadata_condition_th = threading.Condition(self.metadata_lock_th) + self.metadata_locked_docs_th = [] + # Synchronization values for reference locked pids + self.reference_pid_lock_th = threading.Lock() + self.reference_pid_condition_th = threading.Condition( + self.metadata_lock_th + ) + self.reference_locked_pids_th = [] + + self.fhs_logger.debug("Initialization success. Store root: %s", self.root) else: # Cannot instantiate or initialize FileHashStore without config - exception_string = ( - "FileHashStore - HashStore properties must be supplied." - + f" Properties: {properties}" + err_msg = ( + "HashStore properties must be supplied." + f" Properties: {properties}" ) - logging.debug(exception_string) - raise ValueError(exception_string) + self.fhs_logger.debug(err_msg) + raise ValueError(err_msg) # Configuration and Related Methods @@ -203,12 +211,9 @@ def _load_properties( - store_metadata_namespace (str): Namespace for the HashStore's system metadata. """ if not os.path.isfile(hashstore_yaml_path): - exception_string = ( - "FileHashStore - load_properties: hashstore.yaml not found" - + " in store root path." - ) - logging.critical(exception_string) - raise FileNotFoundError(exception_string) + err_msg = "'hashstore.yaml' not found in store root path." + logging.critical(err_msg) + raise FileNotFoundError(err_msg) # Open file with open(hashstore_yaml_path, "r", encoding="utf-8") as hs_yaml_file: @@ -219,9 +224,7 @@ def _load_properties( for key in hashstore_required_prop_keys: if key != "store_path": hashstore_yaml_dict[key] = yaml_data[key] - logging.debug( - "FileHashStore - load_properties: Successfully retrieved 'hashstore.yaml' properties." - ) + logging.debug("Successfully retrieved 'hashstore.yaml' properties.") return hashstore_yaml_dict def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: @@ -236,12 +239,9 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: """ # If hashstore.yaml already exists, must throw exception and proceed with caution if os.path.isfile(self.hashstore_configuration_yaml): - exception_string = ( - "FileHashStore - write_properties: configuration file 'hashstore.yaml'" - + " already exists." - ) - logging.error(exception_string) - raise FileExistsError(exception_string) + err_msg = "Configuration file 'hashstore.yaml' already exists." + logging.error(err_msg) + raise FileExistsError(err_msg) # Validate properties checked_properties = self._validate_properties(properties) @@ -257,14 +257,13 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: if store_algorithm in accepted_store_algorithms: checked_store_algorithm = store_algorithm else: - exception_string = ( - f"FileHashStore - write_properties: algorithm supplied ({store_algorithm})" - f" cannot be used as default for HashStore. Must be one of: " - + f"{', '.join(accepted_store_algorithms)}" + err_msg = ( + f"Algorithm supplied ({store_algorithm}) cannot be used as default for" + f" HashStore. Must be one of: {', '.join(accepted_store_algorithms)}" f" which are DataONE controlled algorithm values" ) - logging.error(exception_string) - raise ValueError(exception_string) + logging.error(err_msg) + raise ValueError(err_msg) # If given store path doesn't exist yet, create it. if not os.path.exists(self.root): @@ -284,63 +283,10 @@ def _write_properties(self, properties: Dict[str, Union[str, int]]) -> None: hs_yaml_file.write(hashstore_configuration_yaml) logging.debug( - "FileHashStore - write_properties: Configuration file written to: %s", - self.hashstore_configuration_yaml, + "Configuration file written to: %s", self.hashstore_configuration_yaml ) return - @staticmethod - def _build_hashstore_yaml_string_old( - store_depth: int, - store_width: int, - store_algorithm: str, - store_metadata_namespace: str, - ) -> str: - """Build a YAML string representing the configuration for a HashStore. - - :param int store_depth: Depth when sharding an object's hex digest. - :param int store_width: Width of directories when sharding an object's hex digest. - :param str store_algorithm: Hash algorithm used for calculating the object's hex digest. - :param str store_metadata_namespace: Namespace for the HashStore's system metadata. - - :return: A YAML string representing the configuration for a HashStore. - """ - hashstore_configuration_yaml = f""" - # Default configuration variables for HashStore - - ############### Directory Structure ############### - # Desired amount of directories when sharding an object to form the permanent address - store_depth: {store_depth} # WARNING: DO NOT CHANGE UNLESS SETTING UP NEW HASHSTORE - # Width of directories created when sharding an object to form the permanent address - store_width: {store_width} # WARNING: DO NOT CHANGE UNLESS SETTING UP NEW HASHSTORE - # Example: - # Below, objects are shown listed in directories that are 3 levels deep (DIR_DEPTH=3), - # with each directory consisting of 2 characters (DIR_WIDTH=2). - # /var/filehashstore/objects - # ├── 7f - # │ └── 5c - # │ └── c1 - # │ └── 8f0b04e812a3b4c8f686ce34e6fec558804bf61e54b176742a7f6368d6 - - ############### Format of the Metadata ############### - # The default metadata format - store_metadata_namespace: "{store_metadata_namespace}" - - ############### Hash Algorithms ############### - # Hash algorithm to use when calculating object's hex digest for the permanent address - store_algorithm: "{store_algorithm}" - # Algorithm values supported by python hashlib 3.9.0+ for File Hash Store (FHS) - # The default algorithm list includes the hash algorithms calculated when storing an - # object to disk and returned to the caller after successful storage. - store_default_algo_list: - - "MD5" - - "SHA-1" - - "SHA-256" - - "SHA-384" - - "SHA-512" - """ - return hashstore_configuration_yaml - @staticmethod def _build_hashstore_yaml_string( store_depth: int, @@ -432,8 +378,8 @@ def _verify_hashstore_properties( :param str prop_store_path: Store path to check. """ if os.path.isfile(self.hashstore_configuration_yaml): - logging.debug( - "FileHashStore - Config found (hashstore.yaml) at {%s}. Verifying properties.", + self.fhs_logger.debug( + "Config found (hashstore.yaml) at {%s}. Verifying properties.", self.hashstore_configuration_yaml, ) # If 'hashstore.yaml' is found, verify given properties before init @@ -447,13 +393,13 @@ def _verify_hashstore_properties( if key == "store_depth" or key == "store_width": supplied_key = int(properties[key]) if hashstore_yaml_dict[key] != supplied_key: - exception_string = ( - f"FileHashStore - Given properties ({key}: {properties[key]}) does not" - + f" match. HashStore configuration ({key}: {hashstore_yaml_dict[key]})" + err_msg = ( + f"Given properties ({key}: {properties[key]}) does not match." + + f" HashStore configuration ({key}: {hashstore_yaml_dict[key]})" + f" found at: {self.hashstore_configuration_yaml}" ) - logging.critical(exception_string) - raise ValueError(exception_string) + self.fhs_logger.critical(err_msg) + raise ValueError(err_msg) else: if os.path.exists(prop_store_path): # Check if HashStore exists and throw exception if found @@ -462,14 +408,13 @@ def _verify_hashstore_properties( os.path.isdir(os.path.join(prop_store_path, sub)) for sub in subfolders ): - exception_string = ( - "FileHashStore - Unable to initialize HashStore. `hashstore.yaml` is not" - + " present but conflicting HashStore directory exists. Please delete" - + " '/objects', '/metadata' and/or '/refs' at the store path or supply" - + " a new path." + err_msg = ( + "Unable to initialize HashStore. `hashstore.yaml` is not present but " + "conflicting HashStore directory exists. Please delete '/objects', " + "'/metadata' and/or '/refs' at the store path or supply a new path." ) - logging.critical(exception_string) - raise RuntimeError(exception_string) + self.fhs_logger.critical(err_msg) + raise RuntimeError(err_msg) def _validate_properties( self, properties: Dict[str, Union[str, int]] @@ -485,33 +430,24 @@ def _validate_properties( :return: The given properties object (that has been validated). """ if not isinstance(properties, dict): - exception_string = ( - "FileHashStore - _validate_properties: Invalid argument -" - + " expected a dictionary." - ) - logging.debug(exception_string) - raise ValueError(exception_string) + err_msg = "Invalid argument expected a dictionary." + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) # New dictionary for validated properties checked_properties = {} for key in self.property_required_keys: if key not in properties: - exception_string = ( - "FileHashStore - _validate_properties: Missing required" - + f" key: {key}." - ) - logging.debug(exception_string) - raise KeyError(exception_string) + err_msg = "Missing required key: {key}." + self.fhs_logger.error(err_msg) + raise KeyError(err_msg) value = properties.get(key) if value is None: - exception_string = ( - "FileHashStore - _validate_properties: Value for key:" - + f" {key} is none." - ) - logging.debug(exception_string) - raise ValueError(exception_string) + err_msg = "Value for key: {key} is none." + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) # Add key and values to checked_properties if key == "store_depth" or key == "store_width": @@ -519,13 +455,12 @@ def _validate_properties( try: checked_properties[key] = int(value) except Exception as err: - exception_string = ( - "FileHashStore - _validate_properties: Unexpected exception when" - " attempting to ensure store depth and width are integers. Details: " - + str(err) + err_msg = ( + "Unexpected exception when attempting to ensure store depth and width " + f"are integers. Details: {err}" ) - logging.debug(exception_string) - raise ValueError(exception_string) + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) else: checked_properties[key] = value @@ -548,12 +483,9 @@ def lookup_algo(algo_to_translate): return dataone_algo_translation[algo_to_translate] if not os.path.isfile(self.hashstore_configuration_yaml): - exception_string = ( - "FileHashStore - set_default_algorithms: hashstore.yaml not found" - + " in store root path." - ) - logging.critical(exception_string) - raise FileNotFoundError(exception_string) + err_msg = "hashstore.yaml not found in store root path." + self.fhs_logger.critical(err_msg) + raise FileNotFoundError(err_msg) with open( self.hashstore_configuration_yaml, "r", encoding="utf-8" @@ -585,17 +517,14 @@ def store_object( ) -> "ObjectMetadata": if pid is None and self._check_arg_data(data): # If no pid is supplied, store the object only without tagging - logging.debug("FileHashStore - store_object: Request to store data only.") + logging.debug("Request to store data only received.") object_metadata = self._store_data_only(data) - logging.info( - "FileHashStore - store_object: Successfully stored object for cid: %s", - object_metadata.cid, + self.fhs_logger.info( + "Successfully stored object for cid: %s", object_metadata.cid ) else: # Else the object will be stored and tagged - logging.debug( - "FileHashStore - store_object: Request to store object for pid: %s", pid - ) + self.fhs_logger.debug("Request to store object for pid: %s", pid) # Validate input parameters self._check_string(pid, "pid") self._check_arg_data(data) @@ -607,88 +536,65 @@ def store_object( additional_algorithm, checksum, checksum_algorithm ) - sync_begin_debug_msg = ( - f"FileHashStore - store_object: Adding pid ({pid}) to locked list." - ) - err_msg = ( - f"FileHashStore - store_object: Duplicate object request encountered for pid: " - f"{pid}" + ". Already in progress." - ) - if self.use_multiprocessing: - with self.object_pid_condition_mp: - # Wait for the pid to release if it's in use - if pid in self.object_locked_pids_mp: - logging.error(err_msg) - raise StoreObjectForPidAlreadyInProgress(err_msg) - # Modify object_locked_pids consecutively - logging.debug(sync_begin_debug_msg) - self.object_locked_pids_mp.append(pid) - else: - with self.object_pid_condition_th: - if pid in self.object_locked_pids_th: - logging.error(err_msg) - raise StoreObjectForPidAlreadyInProgress(err_msg) - logging.debug(sync_begin_debug_msg) - self.object_locked_pids_th.append(pid) try: - logging.debug( - "FileHashStore - store_object: Attempting to store object for pid: %s", - pid, - ) - object_metadata = self._store_and_validate_data( - pid, - data, - additional_algorithm=additional_algorithm_checked, - checksum=checksum, - checksum_algorithm=checksum_algorithm_checked, - file_size_to_validate=expected_object_size, - ) - logging.debug( - "FileHashStore - store_object: Attempting to tag object for pid: %s", - pid, - ) - cid = object_metadata.cid - self.tag_object(pid, cid) - logging.info( - "FileHashStore - store_object: Successfully stored object for pid: %s", - pid, + err_msg = ( + f"Duplicate object request for pid: {pid}. Already in progress." ) + if self.use_multiprocessing: + with self.object_pid_condition_mp: + # Raise exception immediately if pid is in use + if pid in self.object_locked_pids_mp: + self.fhs_logger.error(err_msg) + raise StoreObjectForPidAlreadyInProgress(err_msg) + else: + with self.object_pid_condition_th: + if pid in self.object_locked_pids_th: + logging.error(err_msg) + raise StoreObjectForPidAlreadyInProgress(err_msg) + + try: + self._synchronize_object_locked_pids(pid) + + self.fhs_logger.debug("Attempting to store object for pid: %s", pid) + object_metadata = self._store_and_validate_data( + pid, + data, + additional_algorithm=additional_algorithm_checked, + checksum=checksum, + checksum_algorithm=checksum_algorithm_checked, + file_size_to_validate=expected_object_size, + ) + self.fhs_logger.debug("Attempting to tag object for pid: %s", pid) + cid = object_metadata.cid + self.tag_object(pid, cid) + self.fhs_logger.info("Successfully stored object for pid: %s", pid) + finally: + # Release pid + self._release_object_locked_pids(pid) except Exception as err: - exception_string = ( - f"FileHashStore - store_object: failed to store object for pid: {pid}." - + " Reference files will not be created or tagged. Unexpected error: " - + str(err) + err_msg = ( + f"Failed to store object for pid: {pid}. Reference files will not be " + f"created or tagged. Unexpected error: {err})" ) - logging.error(exception_string) + self.fhs_logger.error(err_msg) raise err - finally: - # Release pid - self._release_object_locked_pids(pid) return object_metadata def tag_object(self, pid: str, cid: str) -> None: - logging.debug( - "FileHashStore - tag_object: Tagging object cid: %s with pid: %s.", - cid, - pid, - ) + logging.debug("Tagging object cid: %s with pid: %s.", cid, pid) self._check_string(pid, "pid") self._check_string(cid, "cid") try: self._store_hashstore_refs_files(pid, cid) except HashStoreRefsAlreadyExists as hrae: - err_msg = ( - f"FileHashStore - tag_object: reference files for pid: {pid} and {cid} " - "already exist. " + str(hrae) - ) + err_msg = f"Reference files for pid: {pid} and {cid} already exist. Details: {hrae}" + self.fhs_logger.error(err_msg) raise HashStoreRefsAlreadyExists(err_msg) except PidRefsAlreadyExistsError as praee: - err_msg = ( - f"FileHashStore - tag_object: A pid can only reference one cid. " - + str(praee) - ) + err_msg = f"A pid can only reference one cid. Details: {praee}" + self.fhs_logger.error(err_msg) raise PidRefsAlreadyExistsError(err_msg) def delete_if_invalid_object( @@ -702,16 +608,14 @@ def delete_if_invalid_object( self._check_string(checksum_algorithm, "checksum_algorithm") self._check_integer(expected_file_size) if object_metadata is None or not isinstance(object_metadata, ObjectMetadata): - exception_string = ( - "FileHashStore - verify_object: 'object_metadata' cannot be None." - + " Must be a 'ObjectMetadata' object." + err_msg = ( + "'object_metadata' cannot be None. Must be a 'ObjectMetadata' object." ) - logging.error(exception_string) - raise ValueError(exception_string) + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) else: - logging.info( - "FileHashStore - verify_object: Called to verify object with id: %s", - object_metadata.cid, + self.fhs_logger.info( + "Called to verify object with id: %s", object_metadata.cid ) object_metadata_hex_digests = object_metadata.hex_digests object_metadata_file_size = object_metadata.obj_size @@ -736,17 +640,14 @@ def delete_if_invalid_object( except NonMatchingChecksum as mmce: self._delete_object_only(object_metadata.cid) raise mmce - logging.info( - "FileHashStore - verify_object: object has been validated for cid: %s", - object_metadata.cid, + self.fhs_logger.info( + "Object has been validated for cid: %s", object_metadata.cid ) def store_metadata( self, pid: str, metadata: Union[str, bytes], format_id: Optional[str] = None ) -> str: - logging.debug( - "FileHashStore - store_metadata: Request to store metadata for pid: %s", pid - ) + self.fhs_logger.debug("Request to store metadata for pid: %s", pid) # Validate input parameters self._check_string(pid, "pid") self._check_arg_data(metadata) @@ -754,60 +655,57 @@ def store_metadata( pid_doc = self._computehash(pid + checked_format_id) sync_begin_debug_msg = ( - f"FileHashStore - store_metadata: Adding pid: {pid} to locked list, " - + f"with format_id: {checked_format_id} with doc name: {pid_doc}" + f" Adding pid: {pid} to locked list, with format_id: {checked_format_id} with doc " + f"name: {pid_doc}" ) sync_wait_msg = ( - f"FileHashStore - store_metadata: Pid: {pid} is locked for format_id:" - + f" {checked_format_id} with doc name: {pid_doc}. Waiting." + f"Pid: {pid} is locked for format_id: {checked_format_id} with doc name: {pid_doc}. " + f"Waiting." ) if self.use_multiprocessing: with self.metadata_condition_mp: # Wait for the pid to release if it's in use while pid_doc in self.metadata_locked_docs_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_mp.wait() # Modify metadata_locked_docs consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_mp.append(pid_doc) else: with self.metadata_condition_th: while pid_doc in self.metadata_locked_docs_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_th.append(pid_doc) try: metadata_cid = self._put_metadata(metadata, pid, pid_doc) info_msg = ( - "FileHashStore - store_metadata: Successfully stored metadata for" - + f" pid: {pid} with format_id: {checked_format_id}" + f"Successfully stored metadata for pid: {pid} with format_id: " + + checked_format_id ) - logging.info(info_msg) + self.fhs_logger.info(info_msg) return str(metadata_cid) finally: # Release pid end_sync_debug_msg = ( - f"FileHashStore - store_metadata: Releasing pid doc ({pid_doc})" - + f" from locked list for pid: {pid} with format_id: {checked_format_id}" + f"Releasing pid doc ({pid_doc}) from locked list for pid: {pid} with format_id: " + + checked_format_id ) if self.use_multiprocessing: with self.metadata_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_mp.remove(pid_doc) self.metadata_condition_mp.notify() else: with self.metadata_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_th.remove(pid_doc) self.metadata_condition_th.notify() def retrieve_object(self, pid: str) -> IO[bytes]: - logging.debug( - "FileHashStore - retrieve_object: Request to retrieve object for pid: %s", - pid, - ) + self.fhs_logger.debug("Request to retrieve object for pid: %s", pid) self._check_string(pid, "pid") object_info_dict = self._find_object(pid) @@ -815,28 +713,20 @@ def retrieve_object(self, pid: str) -> IO[bytes]: entity = "objects" if object_cid: - logging.debug( - "FileHashStore - retrieve_object: Metadata exists for pid: %s, retrieving object.", - pid, + self.fhs_logger.debug( + "Metadata exists for pid: %s, retrieving object.", pid ) obj_stream = self._open(entity, object_cid) else: - exception_string = ( - f"FileHashStore - retrieve_object: No object found for pid: {pid}" - ) - logging.error(exception_string) - raise ValueError(exception_string) - logging.info( - "FileHashStore - retrieve_object: Retrieved object for pid: %s", pid - ) + err_msg = f"No object found for pid: {pid}" + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) + self.fhs_logger.info("Retrieved object for pid: %s", pid) return obj_stream def retrieve_metadata(self, pid: str, format_id: Optional[str] = None) -> IO[bytes]: - logging.debug( - "FileHashStore - retrieve_metadata: Request to retrieve metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Request to retrieve metadata for pid: %s", pid) self._check_string(pid, "pid") checked_format_id = self._check_arg_format_id(format_id, "retrieve_metadata") @@ -853,21 +743,15 @@ def retrieve_metadata(self, pid: str, format_id: Optional[str] = None) -> IO[byt if metadata_exists: metadata_stream = self._open(entity, str(metadata_rel_path)) - logging.info( - "FileHashStore - retrieve_metadata: Retrieved metadata for pid: %s", pid - ) + self.fhs_logger.info("Retrieved metadata for pid: %s", pid) return metadata_stream else: - exception_string = ( - f"FileHashStore - retrieve_metadata: No metadata found for pid: {pid}" - ) - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = f"No metadata found for pid: {pid}" + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) def delete_object(self, pid: str) -> None: - logging.debug( - "FileHashStore - delete_object: Request to delete object for id: %s", pid - ) + self.fhs_logger.debug("Request to delete object for id: %s", pid) self._check_string(pid, "pid") objects_to_delete = [] @@ -875,33 +759,13 @@ def delete_object(self, pid: str) -> None: # Storing and deleting objects are synchronized together # Duplicate store object requests for a pid are rejected, but deleting an object # will wait for a pid to be released if it's found to be in use before proceeding. - sync_begin_debug_msg = ( - f"FileHashStore - delete_object: Pid ({pid}) to locked list." - ) - sync_wait_msg = ( - f"FileHashStore - delete_object: Pid ({pid}) is locked. Waiting." - ) - if self.use_multiprocessing: - with self.object_pid_condition_mp: - # Wait for the pid to release if it's in use - while pid in self.object_locked_pids_mp: - logging.debug(sync_wait_msg) - self.object_pid_condition_mp.wait() - # Modify object_locked_pids consecutively - logging.debug(sync_begin_debug_msg) - self.object_locked_pids_mp.append(pid) - else: - with self.object_pid_condition_th: - while pid in self.object_locked_pids_th: - logging.debug(sync_wait_msg) - self.object_pid_condition_th.wait() - logging.debug(sync_begin_debug_msg) - self.object_locked_pids_th.append(pid) try: # Before we begin deletion process, we look for the `cid` by calling # `find_object` which will throw custom exceptions if there is an issue with # the reference files, which help us determine the path to proceed with. + self._synchronize_object_locked_pids(pid) + try: object_info_dict = self._find_object(pid) cid = object_info_dict.get("cid") @@ -909,29 +773,7 @@ def delete_object(self, pid: str) -> None: # Proceed with next steps - cid has been retrieved without any issues # We must synchronize here based on the `cid` because multiple threads may # try to access the `cid_reference_file` - sync_begin_debug_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) to locked list." - ) - sync_wait_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) is locked." - + " Waiting." - ) - if self.use_multiprocessing: - with self.object_cid_condition_mp: - # Wait for the cid to release if it's in use - while cid in self.object_locked_cids_mp: - logging.debug(sync_wait_msg) - self.object_cid_condition_mp.wait() - # Modify reference_locked_cids consecutively - logging.debug(sync_begin_debug_msg) - self.object_locked_cids_mp.append(cid) - else: - with self.object_cid_condition_th: - while cid in self.object_locked_cids_th: - logging.debug(sync_wait_msg) - self.object_cid_condition_th.wait() - logging.debug(sync_begin_debug_msg) - self.object_locked_cids_th.append(cid) + self._synchronize_object_locked_cids(cid) try: cid_ref_abs_path = object_info_dict.get("cid_refs_path") @@ -945,10 +787,10 @@ def delete_object(self, pid: str) -> None: # Delete cid reference file and object only if the cid refs file is empty if os.path.getsize(cid_ref_abs_path) == 0: debug_msg = ( - "FileHashStore - delete_object: cid_refs_file is empty (size == 0):" - + f" {cid_ref_abs_path} - deleting cid refs file and data object." + f"Cid reference file is empty (size == 0): {cid_ref_abs_path} - " + + "deleting cid reference file and data object." ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) objects_to_delete.append( self._rename_path_for_deletion(cid_ref_abs_path) ) @@ -963,44 +805,23 @@ def delete_object(self, pid: str) -> None: self.delete_metadata(pid) info_string = ( - "FileHashStore - delete_object: Successfully deleted references," - + f" metadata and object associated with pid: {pid}" + f"Successfully deleted references, metadata and object associated" + + f" with pid: {pid}" ) - logging.info(info_string) + self.fhs_logger.info(info_string) return finally: # Release cid - end_sync_debug_msg = ( - f"FileHashStore - delete_object: Releasing cid ({cid})" - + " from locked list" - ) - if self.use_multiprocessing: - with self.object_cid_condition_mp: - logging.debug(end_sync_debug_msg) - self.object_locked_cids_mp.remove(cid) - self.object_cid_condition_mp.notify() - else: - with self.object_cid_condition_th: - logging.debug(end_sync_debug_msg) - self.object_locked_cids_th.remove(cid) - self.object_cid_condition_th.notify() + self._release_object_locked_cids(cid) - except PidRefsDoesNotExist: + except OrphanPidRefsFileFound: warn_msg = ( - "FileHashStore - delete_object: pid refs file does not exist for pid: " - + pid - + ". Skipping object deletion. Deleting pid metadata documents." + f"Orphan pid reference file found for pid: {pid}. Skipping object deletion. " + + "Deleting pid reference file and related metadata documents." ) - logging.warning(warn_msg) - - # Remove metadata files if they exist - self.delete_metadata(pid) + self.fhs_logger.warning(warn_msg) - # Remove all files confirmed for deletion - self._delete_marked_files(objects_to_delete) - return - except OrphanPidRefsFileFound: # Delete pid refs file pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) objects_to_delete.append( @@ -1012,6 +833,13 @@ def delete_object(self, pid: str) -> None: self._delete_marked_files(objects_to_delete) return except RefsFileExistsButCidObjMissing: + warn_msg = ( + f"Reference files exist for pid: {pid}, but the data object is missing. " + + "Deleting pid reference file & related metadata documents. Handling cid " + + "reference file." + ) + self.fhs_logger.warning(warn_msg) + # Add pid refs file to be permanently deleted pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) objects_to_delete.append( @@ -1019,16 +847,28 @@ def delete_object(self, pid: str) -> None: ) # Remove pid from cid refs file pid_refs_cid = self._read_small_file_content(pid_ref_abs_path) - cid_ref_abs_path = self._get_hashstore_cid_refs_path(pid_refs_cid) - # Remove if the pid refs is found - if self._is_string_in_refs_file(pid, cid_ref_abs_path): - self._update_refs_file(cid_ref_abs_path, pid, "remove") + try: + self._synchronize_object_locked_cids(pid_refs_cid) + + cid_ref_abs_path = self._get_hashstore_cid_refs_path(pid_refs_cid) + # Remove if the pid refs is found + if self._is_string_in_refs_file(pid, cid_ref_abs_path): + self._update_refs_file(cid_ref_abs_path, pid, "remove") + finally: + self._release_object_locked_cids(pid_refs_cid) + # Remove metadata files if they exist self.delete_metadata(pid) # Remove all files confirmed for deletion self._delete_marked_files(objects_to_delete) return except PidNotFoundInCidRefsFile: + warn_msg = ( + f"Pid {pid} not found in cid reference file. Deleting pid reference " + + "file and related metadata documents." + ) + self.fhs_logger.warning(warn_msg) + # Add pid refs file to be permanently deleted pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) objects_to_delete.append( @@ -1041,27 +881,10 @@ def delete_object(self, pid: str) -> None: return finally: # Release pid - end_sync_debug_msg = ( - f"FileHashStore - delete_object: Releasing pid ({pid})" - + " from locked list" - ) - if self.use_multiprocessing: - with self.object_pid_condition_mp: - logging.debug(end_sync_debug_msg) - self.object_locked_pids_mp.remove(pid) - self.object_pid_condition_mp.notify() - else: - # Release pid - with self.object_pid_condition_th: - logging.debug(end_sync_debug_msg) - self.object_locked_pids_th.remove(pid) - self.object_pid_condition_th.notify() + self._release_object_locked_pids(pid) def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: - logging.debug( - "FileHashStore - delete_metadata: Request to delete metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Request to delete metadata for pid: %s", pid) self._check_string(pid, "pid") checked_format_id = self._check_arg_format_id(format_id, "delete_metadata") metadata_directory = self._computehash(pid) @@ -1080,28 +903,28 @@ def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: # Synchronize based on doc name # Wait for the pid to release if it's in use sync_begin_debug_msg = ( - f"FileHashStore - delete_metadata: Adding pid: {pid} to locked list, " - + f"with format_id: {checked_format_id} with doc name: {pid_doc}" + f"Adding pid: {pid} to locked list, with format_id: {checked_format_id} " + + f"with doc name: {pid_doc}" ) sync_wait_msg = ( - f"FileHashStore - delete_metadata: Pid: {pid} is locked for format_id:" - + f" {checked_format_id} with doc name: {pid_doc}. Waiting." + f"Pid: {pid} is locked for format_id: {checked_format_id} with doc name:" + + f" {pid_doc}. Waiting." ) if self.use_multiprocessing: with self.metadata_condition_mp: # Wait for the pid to release if it's in use while pid in self.metadata_locked_docs_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_mp.wait() # Modify metadata_locked_docs consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_mp.append(pid_doc) else: with self.metadata_condition_th: while pid in self.metadata_locked_docs_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_th.append(pid_doc) try: # Mark metadata doc for deletion @@ -1109,87 +932,79 @@ def delete_metadata(self, pid: str, format_id: Optional[str] = None) -> None: finally: # Release pid end_sync_debug_msg = ( - f"FileHashStore - delete_metadata: Releasing pid doc ({pid_doc})" - + f" from locked list for pid: {pid} with format_id:" - + checked_format_id + f"Releasing pid doc ({pid_doc}) from locked list for pid: {pid} with " + + f"format_id: {checked_format_id}" ) if self.use_multiprocessing: with self.metadata_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_mp.remove(pid_doc) self.metadata_condition_mp.notify() else: with self.metadata_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_th.remove(pid_doc) self.metadata_condition_th.notify() # Delete metadata objects self._delete_marked_files(objects_to_delete) - info_string = ( - "FileHashStore - delete_metadata: Successfully deleted all metadata" - + f"for pid: {pid}", - ) - logging.info(info_string) + info_string = ("Successfully deleted all metadata for pid: {pid}",) + self.fhs_logger.info(info_string) else: # Delete a specific metadata file pid_doc = self._computehash(pid + checked_format_id) # Wait for the pid to release if it's in use sync_begin_debug_msg = ( - f"FileHashStore - delete_metadata: Adding pid: {pid} to locked list, " - + f"with format_id: {checked_format_id} with doc name: {pid_doc}" + f"Adding pid: {pid} to locked list, with format_id: {checked_format_id} with doc " + + f"name: {pid_doc}" ) sync_wait_msg = ( - f"FileHashStore - delete_metadata: Pid: {pid} is locked for format_id:" - + f" {checked_format_id} with doc name: {pid_doc}. Waiting." + f"Pid: {pid} is locked for format_id: {checked_format_id} with doc name:" + + f" {pid_doc}. Waiting." ) if self.use_multiprocessing: with self.metadata_condition_mp: # Wait for the pid to release if it's in use while pid in self.metadata_locked_docs_mp: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_mp.wait() # Modify metadata_locked_docs consecutively - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_mp.append(pid_doc) else: with self.metadata_condition_th: while pid in self.metadata_locked_docs_th: - logging.debug(sync_wait_msg) + self.fhs_logger.debug(sync_wait_msg) self.metadata_condition_th.wait() - logging.debug(sync_begin_debug_msg) + self.fhs_logger.debug(sync_begin_debug_msg) self.metadata_locked_docs_th.append(pid_doc) try: full_path_without_directory = Path(self.metadata / rel_path / pid_doc) self._delete("metadata", full_path_without_directory) info_string = ( - "FileHashStore - delete_metadata: Successfully deleted metadata for pid:" - + f" {pid} for format_id: {format_id}" + f"Deleted metadata for pid: {pid} for format_id: {format_id}" ) - logging.info(info_string) + + self.fhs_logger.info(info_string) finally: # Release pid end_sync_debug_msg = ( - f"FileHashStore - delete_metadata: Releasing pid doc ({pid_doc})" - + f" from locked list for pid: {pid} with format_id:" - + checked_format_id + f"Releasing pid doc ({pid_doc}) from locked list for pid: {pid} with " + f"format_id: {checked_format_id}" ) if self.use_multiprocessing: with self.metadata_condition_mp: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_mp.remove(pid_doc) self.metadata_condition_mp.notify() else: with self.metadata_condition_th: - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug(end_sync_debug_msg) self.metadata_locked_docs_th.remove(pid_doc) self.metadata_condition_th.notify() def get_hex_digest(self, pid: str, algorithm: str) -> str: - logging.debug( - "FileHashStore - get_hex_digest: Request to get hex digest for object with pid: %s", - pid, - ) + self.fhs_logger.debug("Request to get hex digest for object with pid: %s", pid) self._check_string(pid, "pid") self._check_string(algorithm, "algorithm") @@ -1197,18 +1012,13 @@ def get_hex_digest(self, pid: str, algorithm: str) -> str: algorithm = self._clean_algorithm(algorithm) object_cid = self._find_object(pid).get("cid") if not self._exists(entity, object_cid): - exception_string = ( - f"FileHashStore - get_hex_digest: No object found for pid: {pid}" - ) - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = f"No object found for pid: {pid}" + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) cid_stream = self._open(entity, object_cid) hex_digest = self._computehash(cid_stream, algorithm=algorithm) - info_string = ( - f"FileHashStore - get_hex_digest: Successfully calculated hex digest for pid: {pid}." - + f" Hex Digest: {hex_digest}", - ) + info_string = f"Successfully calculated hex digest for pid: {pid}. Hex Digest: {hex_digest}" logging.info(info_string) return hex_digest @@ -1228,9 +1038,7 @@ def _find_object(self, pid: str) -> Dict[str, str]: - pid_refs_path: path to the pid refs file - sysmeta_path: path to the sysmeta file """ - logging.debug( - "FileHashStore - find_object: Request to find object for for pid: %s", pid - ) + self.fhs_logger.debug("Request to find object for for pid: %s", pid) self._check_string(pid, "pid") pid_ref_abs_path = self._get_hashstore_pid_refs_path(pid) @@ -1246,11 +1054,10 @@ def _find_object(self, pid: str) -> Dict[str, str]: # Object must also exist in order to return the cid retrieved if not self._exists("objects", pid_refs_cid): err_msg = ( - f"FileHashStore - find_object: Refs file found for pid ({pid}) at" - + str(pid_ref_abs_path) + f"Reference file found for pid ({pid}) at {pid_ref_abs_path}" + f", but object referenced does not exist, cid: {pid_refs_cid}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise RefsFileExistsButCidObjMissing(err_msg) else: sysmeta_doc_name = self._computehash(pid + self.sysmeta_ns) @@ -1278,25 +1085,23 @@ def _find_object(self, pid: str) -> Dict[str, str]: else: # If not, it is an orphan pid refs file err_msg = ( - "FileHashStore - find_object: pid refs file exists with cid: " - + f"{pid_refs_cid} for pid: {pid} but is missing from cid refs file:" - + str(cid_ref_abs_path) + f"Pid reference file exists with cid: {pid_refs_cid} for pid: {pid} but " + f"is missing from cid refs file: {cid_ref_abs_path}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise PidNotFoundInCidRefsFile(err_msg) else: err_msg = ( - f"FileHashStore - find_object: pid refs file exists with cid: {pid_refs_cid}" - + f", but cid refs file not found: {cid_ref_abs_path} for pid: {pid}" + f"Pid reference file exists with cid: {pid_refs_cid} but cid reference file " + + f"not found: {cid_ref_abs_path} for pid: {pid}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise OrphanPidRefsFileFound(err_msg) else: err_msg = ( - f"FileHashStore - find_object: pid refs file not found for pid ({pid}): " - + str(pid_ref_abs_path) + f"Pid reference file not found for pid ({pid}): {pid_ref_abs_path}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise PidRefsDoesNotExist(err_msg) def _store_and_validate_data( @@ -1325,9 +1130,7 @@ def _store_and_validate_data( """ stream = Stream(file) - logging.debug( - "FileHashStore - put_object: Request to put object for pid: %s", pid - ) + self.fhs_logger.debug("Request to put object for pid: %s", pid) with closing(stream): ( object_cid, @@ -1345,10 +1148,7 @@ def _store_and_validate_data( object_metadata = ObjectMetadata( pid, object_cid, obj_file_size, hex_digest_dict ) - logging.debug( - "FileHashStore - put_object: Successfully put object for pid: %s", - pid, - ) + self.fhs_logger.debug("Successfully put object for pid: %s", pid) return object_metadata def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": @@ -1365,9 +1165,7 @@ def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": :return: ObjectMetadata - object that contains the object ID, object file size, and hex digest dictionary. """ - logging.debug( - "FileHashStore - _store_data_only: Request to store data object only." - ) + self.fhs_logger.debug("Request to store data object only.") try: # Ensure the data is a stream @@ -1389,18 +1187,12 @@ def _store_data_only(self, data: Union[str, bytes]) -> "ObjectMetadata": ) # The permanent address of the data stored is based on the data's checksum cid = hex_digest_dict.get(self.algorithm) - logging.debug( - "FileHashStore - _store_data_only: Successfully stored object with cid: %s", - cid, - ) + self.fhs_logger.debug("Successfully stored object with cid: %s", cid) return object_metadata # pylint: disable=W0718 except Exception as err: - exception_string = ( - "FileHashStore - _store_data_only: failed to store object." - + f" Unexpected {err=}, {type(err)=}" - ) - logging.error(exception_string) + err_msg = f"Failed to store object. Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) raise err def _move_and_get_checksums( @@ -1430,11 +1222,8 @@ def _move_and_get_checksums( :return: tuple - Object ID, object file size, and hex digest dictionary. """ - debug_msg = ( - "FileHashStore - _move_and_get_checksums: Creating temp" - + f" file and calculating checksums for pid: {pid}" - ) - logging.debug(debug_msg) + debug_msg = f"Creating temp file and calculating checksums for pid: {pid}" + self.fhs_logger.debug(debug_msg) ( hex_digests, tmp_file_name, @@ -1442,10 +1231,7 @@ def _move_and_get_checksums( ) = self._write_to_tmp_file_and_get_hex_digests( stream, additional_algorithm, checksum_algorithm ) - logging.debug( - "FileHashStore - _move_and_get_checksums: Temp file created: %s", - tmp_file_name, - ) + self.fhs_logger.debug("Temp file created: %s", tmp_file_name) # Objects are stored with their content identifier based on the store algorithm object_cid = hex_digests.get(self.algorithm) @@ -1467,60 +1253,46 @@ def _move_and_get_checksums( ) self._create_path(Path(os.path.dirname(abs_file_path))) try: - debug_msg = ( - "FileHashStore - _move_and_get_checksums: Moving temp file to permanent" - + f" location: {abs_file_path}", - ) - logging.debug(debug_msg) + debug_msg = f"Moving temp file to permanent location: {abs_file_path}" + self.fhs_logger.debug(debug_msg) shutil.move(tmp_file_name, abs_file_path) except Exception as err: # Revert storage process - exception_string = ( - "FileHashStore - _move_and_get_checksums:" - + f" Unexpected Error: {err}" - ) - logging.warning(exception_string) + err_msg = f" Unexpected Error: {err}" + self.fhs_logger.warning(err_msg) if os.path.isfile(abs_file_path): # Check to see if object exists before determining whether to delete debug_msg = ( - "FileHashStore - _move_and_get_checksums: Permanent file" - + f" found during exception, checking hex digest for pid: {pid}" + f"Permanent file found, checking hex digest for pid: {pid}" ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) pid_checksum = self.get_hex_digest(pid, self.algorithm) if pid_checksum == hex_digests.get(self.algorithm): # If the checksums match, return and log warning - exception_string = ( - "FileHashStore - _move_and_get_checksums: Object exists at:" - + f" {abs_file_path} but an unexpected issue has been encountered." - + " Reference files will not be created and/or tagged." + err_msg = ( + f"Object exists at: {abs_file_path} but an unexpected issue has been " + + "encountered. Reference files will not be created and/or tagged." ) - logging.warning(exception_string) + self.fhs_logger.warning(err_msg) raise err else: debug_msg = ( - "FileHashStore - _move_and_get_checksums: Object exists at" - + f"{abs_file_path} but the pid object checksum provided does not" - + " match what has been calculated. Deleting object. References will" - + " not be created and/or tagged.", + f"Object exists at {abs_file_path} but the pid object checksum " + + "provided does not match what has been calculated. Deleting object. " + + "References will not be created and/or tagged.", ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) self._delete("objects", abs_file_path) raise err else: - logging.debug( - "FileHashStore - _move_and_get_checksums: Deleting temporary file: %s", - tmp_file_name, - ) + self.fhs_logger.debug("Deleting temporary file: %s", tmp_file_name) self._delete("tmp", tmp_file_name) err_msg = ( f"Object has not been stored for pid: {pid} - an unexpected error has " - f"occurred when moving tmp file to: {object_cid}. Reference files will " - f"not be created and/or tagged. Error: {err}" - ) - logging.warning( - "FileHashStore - _move_and_get_checksums: %s", err_msg + + f"occurred when moving tmp file to: {object_cid}. Reference files will " + + f"not be created and/or tagged. Error: {err}" ) + self.fhs_logger.warning(err_msg) raise else: # If the data object already exists, do not move the file but attempt to verify it @@ -1537,22 +1309,22 @@ def _move_and_get_checksums( ) except NonMatchingObjSize as nmose: # If any exception is thrown during validation, we do not tag. - exception_string = ( - f"FileHashStore - _move_and_get_checksums: Object already exists for pid: {pid}" - + " , deleting temp file. Reference files will not be created and/or tagged" - + f" due to an issue with the supplied pid object metadata. {str(nmose)}" + err_msg = ( + f"Object already exists for pid: {pid}, deleting temp file. Reference files " + + "will not be created and/or tagged due to an issue with the supplied pid " + + f"object metadata. {str(nmose)}" ) - logging.debug(exception_string) - raise NonMatchingObjSize(exception_string) from nmose + self.fhs_logger.debug(err_msg) + raise NonMatchingObjSize(err_msg) from nmose except NonMatchingChecksum as nmce: # If any exception is thrown during validation, we do not tag. - exception_string = ( - f"FileHashStore - _move_and_get_checksums: Object already exists for pid: {pid}" - + " , deleting temp file. Reference files will not be created and/or tagged" - + f" due to an issue with the supplied pid object metadata. {str(nmce)}" + err_msg = ( + f"Object already exists for pid: {pid}, deleting temp file. Reference files " + + "will not be created and/or tagged due to an issue with the supplied pid " + + f"object metadata. {str(nmce)}" ) - logging.debug(exception_string) - raise NonMatchingChecksum(exception_string) from nmce + self.fhs_logger.debug(err_msg) + raise NonMatchingChecksum(err_msg) from nmce finally: # Ensure that the tmp file has been removed, the data object already exists, so it # is redundant. No exception is thrown so 'store_object' can proceed to tag object @@ -1588,10 +1360,8 @@ def _write_to_tmp_file_and_get_hex_digests( tmp_root_path = self._get_store_path("objects") / "tmp" tmp = self._mktmpfile(tmp_root_path) - logging.debug( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests: tmp file created:" - + " %s, calculating hex digests.", - tmp.name, + self.fhs_logger.debug( + "Tmp file created: %s, calculating hex digests.", tmp.name ) tmp_file_completion_flag = False @@ -1607,10 +1377,8 @@ def _write_to_tmp_file_and_get_hex_digests( for hash_algorithm in hash_algorithms: hash_algorithm.update(self._cast_to_bytes(data)) - logging.debug( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests: Object stream" - + " successfully written to tmp file: %s", - tmp.name, + self.fhs_logger.debug( + "Object stream successfully written to tmp file: %s", tmp.name ) hex_digest_list = [ @@ -1621,25 +1389,17 @@ def _write_to_tmp_file_and_get_hex_digests( # Ready for validation and atomic move tmp_file_completion_flag = True - logging.debug( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests: Hex digests calculated." - ) + self.fhs_logger.debug("Hex digests calculated.") return hex_digest_dict, tmp.name, tmp_file_size # pylint: disable=W0718 except Exception as err: - exception_string = ( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" - + f" Unexpected {err=}, {type(err)=}" - ) - logging.error(exception_string) + err_msg = f"Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) # pylint: disable=W0707,W0719 - raise Exception(exception_string) + raise Exception(err_msg) except KeyboardInterrupt: - exception_string = ( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" - + " Keyboard interruption by user." - ) - logging.error(exception_string) + err_msg = "Keyboard interruption by user." + self.fhs_logger.error(err_msg) if os.path.isfile(tmp.name): os.remove(tmp.name) finally: @@ -1649,12 +1409,11 @@ def _write_to_tmp_file_and_get_hex_digests( os.remove(tmp.name) # pylint: disable=W0718 except Exception as err: - exception_string = ( - "FileHashStore - _write_to_tmp_file_and_get_hex_digests:" - + f"Unexpected {err=} while attempting to" - + f" delete tmp file: {tmp.name}, {type(err)=}" + err_msg = ( + f"Unexpected {err=} while attempting to delete tmp file: " + + f"{tmp.name}, {type(err)=}" ) - logging.error(exception_string) + self.fhs_logger.error(err_msg) def _mktmpfile(self, path: Path) -> IO[bytes]: """Create a temporary file at the given path ready to be written. @@ -1710,8 +1469,7 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: # are expected to be and throw an exception to inform the client that everything # is in place - and include other issues for context err_msg = ( - f"FileHashStore - store_hashstore_refs_files: Object with cid: {cid}" - f" already exists and is tagged with pid: {pid}." + f"Object with cid: {cid} exists and is tagged with pid: {pid}." ) try: self._verify_hashstore_references( @@ -1721,11 +1479,11 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: cid_refs_path, "Refs file already exists, verifying.", ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) raise HashStoreRefsAlreadyExists(err_msg) except Exception as e: rev_msg = err_msg + " " + str(e) - logging.error(rev_msg) + self.fhs_logger.error(rev_msg) raise HashStoreRefsAlreadyExists(err_msg) elif os.path.isfile(pid_refs_path) and not os.path.isfile( @@ -1733,21 +1491,18 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: ): # If pid refs exists, the pid has already been claimed and cannot be tagged we # throw an exception immediately - error_msg = ( - f"FileHashStore - store_hashstore_refs_files: Pid refs file already exists" - f" for pid: {pid}." - ) - logging.error(error_msg) + error_msg = f"Pid refs file already exists for pid: {pid}." + self.fhs_logger.error(error_msg) raise PidRefsAlreadyExistsError(error_msg) elif not os.path.isfile(pid_refs_path) and os.path.isfile( cid_refs_path ): debug_msg = ( - f"FileHashStore - store_hashstore_refs_files: pid refs file does not exist" - f" for pid {pid} but cid refs file found at: {cid_refs_path} for cid: {cid}" + f"Pid reference file does not exist for pid {pid} but cid refs file " + + f"found at: {cid_refs_path} for cid: {cid}" ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) # Move the pid refs file pid_tmp_file_path = self._write_refs_file(tmp_root_path, cid, "pid") shutil.move(pid_tmp_file_path, pid_refs_path) @@ -1761,11 +1516,8 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: cid_refs_path, f"Updated existing cid refs file: {cid_refs_path} with pid: {pid}", ) - info_msg = ( - "FileHashStore - store_hashstore_refs_files: Successfully updated " - f"cid: {cid} with pid: {pid}" - ) - logging.info(info_msg) + info_msg = f"Successfully updated cid: {cid} with pid: {pid}" + self.fhs_logger.info(info_msg) return # Move both files after checking the existing status of refs files @@ -1777,11 +1529,8 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: self._verify_hashstore_references( pid, cid, pid_refs_path, cid_refs_path, log_msg ) - info_msg = ( - "FileHashStore - store_hashstore_refs_files: Successfully updated " - f"cid: {cid} with pid: {pid}" - ) - logging.info(info_msg) + info_msg = f"Successfully updated cid: {cid} with pid: {pid}" + self.fhs_logger.info(info_msg) except ( HashStoreRefsAlreadyExists, @@ -1789,11 +1538,13 @@ def _store_hashstore_refs_files(self, pid: str, cid: str) -> None: ) as expected_exceptions: raise expected_exceptions - except Exception as unexpected_exception: + except Exception as ue: # For all other unexpected exceptions, we are to revert the tagging process as # much as possible. No exceptions from the reverting process will be thrown. + err_msg = f"Unexpected exception: {ue}, reverting tagging process (untag obj)." + self.fhs_logger.error(err_msg) self._untag_object(pid, cid) - raise unexpected_exception + raise ue finally: # Release cid @@ -1838,8 +1589,8 @@ def _untag_object(self, pid: str, cid: str) -> None: ) # Remove all files confirmed for deletion self._delete_marked_files(untag_obj_delete_list) - info_msg = f"_untag_object: Untagged pid: {pid} with cid: {cid}" - logging.info(info_msg) + info_msg = f"Untagged pid: {pid} with cid: {cid}" + self.fhs_logger.info(info_msg) except OrphanPidRefsFileFound as oprff: # `find_object` throws this exception when the cid refs file doesn't exist, @@ -1855,11 +1606,10 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"_untag_object: Cid refs file does not exist for pid: {pid}." - + " Deleted orphan pid refs file. Additional info: " - + str(oprff) + f"Cid refs file does not exist for pid: {pid}. Deleted orphan pid refs file. " + f"Additional info: {oprff}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) except RefsFileExistsButCidObjMissing as rfebcom: # `find_object` throws this exception when both pid/cid refs files exist but the @@ -1881,11 +1631,11 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"_untag_object: data object for cid: {cid_read}. does not exist, but pid and cid " - f"references files found for pid: {pid}, Deleted pid and cid refs files. " - f"Additional info: " + str(rfebcom) + f"data object for cid: {cid_read}. does not exist, but pid and cid references " + + f"files found for pid: {pid}, Deleted pid and cid refs files. " + + f"Additional info: {rfebcom}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) except PidNotFoundInCidRefsFile as pnficrf: # `find_object` throws this exception when both the pid and cid refs file exists @@ -1901,11 +1651,10 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"_untag_object: pid not found in expected cid refs file for pid: {pid}. " - + "Deleted orphan pid refs file. Additional info: " - + str(pnficrf) + f"Pid not found in expected cid refs file for pid: {pid}. Deleted orphan pid refs " + f"file. Additional info: {pnficrf}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) except PidRefsDoesNotExist as prdne: # `find_object` throws this exception if the pid refs file is not found @@ -1921,10 +1670,10 @@ def _untag_object(self, pid: str, cid: str) -> None: self._delete_marked_files(untag_obj_delete_list) warn_msg = ( - f"Pid refs file not found, removed pid from cid refs file for cid: {cid}" - + str(prdne) + "Pid refs file not found, removed pid from cid reference file for cid:" + + f" {cid}. Additional info: {prdne}" ) - logging.warning(warn_msg) + self.fhs_logger.warning(warn_msg) def _put_metadata( self, metadata: Union[str, bytes], pid: str, metadata_doc_name: str @@ -1938,9 +1687,7 @@ def _put_metadata( :return: Address of the metadata document. """ - logging.debug( - "FileHashStore - _put_metadata: Request to put metadata for pid: %s", pid - ) + self.fhs_logger.debug("Request to put metadata for pid: %s", pid) # Create metadata tmp file and write to it metadata_stream = Stream(metadata) with closing(metadata_stream): @@ -1959,31 +1706,23 @@ def _put_metadata( parent.mkdir(parents=True, exist_ok=True) # Metadata will be replaced if it exists shutil.move(metadata_tmp, full_path) - logging.debug( - "FileHashStore - _put_metadata: Successfully put metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Successfully put metadata for pid: %s", pid) return full_path except Exception as err: - exception_string = ( - f"FileHashStore - _put_metadata: Unexpected {err=}, {type(err)=}" - ) - logging.error(exception_string) + err_msg = f"Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) if os.path.isfile(metadata_tmp): # Remove tmp metadata, calling app must re-upload - logging.debug( - "FileHashStore - _put_metadata: Deleting metadata for pid: %s", - pid, - ) + self.fhs_logger.debug("Deleting metadata for pid: %s", pid) self._delete("metadata", metadata_tmp) raise else: - exception_string = ( - f"FileHashStore - _put_metadata: Attempt to move metadata for pid: {pid}" - + f", but metadata temp file not found: {metadata_tmp}" + err_msg = ( + f"Attempted to move metadata for pid: {pid}, but metadata temp file not found:" + + f" {metadata_tmp}" ) - logging.error(exception_string) - raise FileNotFoundError(exception_string) + self.fhs_logger.error(err_msg) + raise FileNotFoundError(err_msg) def _mktmpmetadata(self, stream: "Stream") -> str: """Create a named temporary file with `stream` (metadata). @@ -1997,18 +1736,12 @@ def _mktmpmetadata(self, stream: "Stream") -> str: tmp = self._mktmpfile(tmp_root_path) # tmp is a file-like object that is already opened for writing by default - logging.debug( - "FileHashStore - _mktmpmetadata: Writing stream to tmp metadata file: %s", - tmp.name, - ) + self.fhs_logger.debug("Writing stream to tmp metadata file: %s", tmp.name) with tmp as tmp_file: for data in stream: tmp_file.write(self._cast_to_bytes(data)) - logging.debug( - "FileHashStore - _mktmpmetadata: Successfully written to tmp metadata file: %s", - tmp.name, - ) + self.fhs_logger.debug("Successfully written to tmp metadata file: %s", tmp.name) return tmp.name # FileHashStore Utility & Supporting Methods @@ -2027,7 +1760,7 @@ def _delete_marked_files(delete_list: list[str]) -> None: warn_msg = f"Unable to remove {obj} in given delete_list. " + str(e) logging.warning(warn_msg) else: - raise ValueError("delete_marked_files: list cannot be None") + raise ValueError("list cannot be None") def _mark_pid_refs_file_for_deletion( self, pid: str, delete_list: List[str], pid_refs_path: Path @@ -2043,10 +1776,9 @@ def _mark_pid_refs_file_for_deletion( except Exception as e: err_msg = ( - f"Unable to delete pid refs file: {pid_refs_path} for pid: {pid}. " - + str(e) + f"Unable to delete pid refs file: {pid_refs_path} for pid: {pid}. {e}" ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) def _remove_pid_and_handle_cid_refs_deletion( self, pid: str, delete_list: List[str], cid_refs_path: Path @@ -2070,7 +1802,7 @@ def _remove_pid_and_handle_cid_refs_deletion( f"Unable to delete remove pid from cid refs file: {cid_refs_path} for pid:" f" {pid}. " + str(e) ) - logging.error(err_msg) + self.fhs_logger.error(err_msg) def _validate_and_check_cid_lock( self, pid: str, cid: str, cid_to_check: str @@ -2090,6 +1822,7 @@ def _validate_and_check_cid_lock( f"_validate_and_check_cid_lock: cid provided: {cid_to_check} does not " f"match untag request for cid: {cid} and pid: {pid}" ) + self.fhs_logger.error(err_msg) raise ValueError(err_msg) self._check_object_locked_cids(cid) @@ -2105,11 +1838,7 @@ def _write_refs_file(self, path: Path, ref_id: str, ref_type: str) -> str: :return: tmp_file_path - Path to the tmp refs file """ - logging.debug( - "FileHashStore - _write_refs_file: Writing id (%s) into a tmp file in: %s", - ref_id, - path, - ) + self.fhs_logger.debug("Writing id (%s) into a tmp file in: %s", ref_id, path) try: with self._mktmpfile(path) as tmp_file: tmp_file_path = tmp_file.name @@ -2121,11 +1850,11 @@ def _write_refs_file(self, path: Path, ref_id: str, ref_type: str) -> str: return tmp_file_path except Exception as err: - exception_string = ( - "FileHashStore - _write_refs_file: failed to write cid refs file for pid:" - + f" {ref_id} into path: {path}. Unexpected {err=}, {type(err)=}" + err_msg = ( + f"Failed to write cid refs file for pid: {ref_id} into path: {path}. " + + f"Unexpected error: {err=}, {type(err)=}" ) - logging.error(exception_string) + self.fhs_logger.error(err_msg) raise err def _update_refs_file( @@ -2137,18 +1866,15 @@ def _update_refs_file( :param str ref_id: Authority-based or persistent identifier of the object. :param str update_type: 'add' or 'remove' """ - debug_msg = ( - f"FileHashStore - _update_refs_file: Updating ({update_type}) for ref_id: {ref_id}" - + f" at refs file: {refs_file_path}." - ) - logging.debug(debug_msg) + debug_msg = f"Updating ({update_type}) for ref_id: {ref_id} at refs file: {refs_file_path}." + self.fhs_logger.debug(debug_msg) if not os.path.isfile(refs_file_path): - exception_string = ( - f"FileHashStore - _update_refs_file: {refs_file_path} does not exist." - + f" Cannot {update_type} ref_id: {ref_id}" + err_msg = ( + f"Refs file: {refs_file_path} does not exist." + + f"Cannot {update_type} ref_id: {ref_id}" ) - logging.error(exception_string) - raise FileNotFoundError(exception_string) + self.fhs_logger.error(err_msg) + raise FileNotFoundError(err_msg) try: if update_type == "add": pid_found = self._is_string_in_refs_file(ref_id, refs_file_path) @@ -2173,16 +1899,16 @@ def _update_refs_file( ref_file.writelines(new_pid_lines) ref_file.truncate() debug_msg = ( - f"FileHashStore - _update_refs_file: Update ({update_type}) for ref_id: {ref_id}" - + f" completed on refs file: {refs_file_path}." + f"Update ({update_type}) for ref_id: {ref_id} " + + f"completed on refs file: {refs_file_path}." ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) except Exception as err: - exception_string = ( - f"FileHashStore - _update_refs_file: failed to {update_type} for ref_id: {ref_id}" + err_msg = ( + f"Failed to {update_type} for ref_id: {ref_id}" + f" at refs file: {refs_file_path}. Unexpected {err=}, {type(err)=}" ) - logging.error(exception_string) + self.fhs_logger.error(err_msg) raise err @staticmethod @@ -2227,22 +1953,20 @@ def _verify_object_information( """ if file_size_to_validate is not None and file_size_to_validate > 0: if file_size_to_validate != tmp_file_size: - exception_string = ( - "FileHashStore - _verify_object_information: Object file size calculated: " - + f" {tmp_file_size} does not match with expected size:" - + f" {file_size_to_validate}." + err_msg = ( + f"Object file size calculated: {tmp_file_size} does not match with expected " + f"size: {file_size_to_validate}." ) if pid is not None: self._delete(entity, tmp_file_name) - exception_string_for_pid = ( - exception_string - + f" Tmp file deleted and file not stored for pid: {pid}" + err_msg_for_pid = ( + f"{err_msg} Tmp file deleted and file not stored for pid: {pid}" ) - logging.debug(exception_string_for_pid) - raise NonMatchingObjSize(exception_string_for_pid) + self.fhs_logger.debug(err_msg_for_pid) + raise NonMatchingObjSize(err_msg_for_pid) else: - logging.debug(exception_string) - raise NonMatchingObjSize(exception_string) + self.fhs_logger.debug(err_msg) + raise NonMatchingObjSize(err_msg) if checksum_algorithm is not None and checksum is not None: if checksum_algorithm not in hex_digests: # Check to see if it is a supported algorithm @@ -2261,35 +1985,33 @@ def _verify_object_information( cid_stream, algorithm=checksum_algorithm ) if hex_digest_calculated != checksum: - exception_string = ( - "FileHashStore - _verify_object_information: checksum_algorithm" - + f" ({checksum_algorithm}) cannot be found in the default hex digests" - + f" dict, but is supported. New checksum calculated: " - f"{hex_digest_calculated}, does not match what has been provided: " + err_msg = ( + f"Checksum_algorithm ({checksum_algorithm}) cannot be found in the " + + "default hex digests dict, but is supported. New checksum calculated: " + + f"{hex_digest_calculated}, does not match what has been provided: " + checksum ) - logging.debug(exception_string) - raise NonMatchingChecksum(exception_string) + self.fhs_logger.debug(err_msg) + raise NonMatchingChecksum(err_msg) else: hex_digest_stored = hex_digests[checksum_algorithm] if hex_digest_stored != checksum.lower(): - exception_string = ( - "FileHashStore - _verify_object_information: Hex digest and checksum" - + f" do not match - file not stored for pid: {pid}. Algorithm:" - + f" {checksum_algorithm}. Checksum provided: {checksum} !=" + err_msg = ( + f"Hex digest and checksum do not match - file not stored for pid: {pid}. " + + f"Algorithm: {checksum_algorithm}. Checksum provided: {checksum} !=" + f" HexDigest: {hex_digest_stored}." ) if pid is not None: # Delete the tmp file self._delete(entity, tmp_file_name) - exception_string_for_pid = ( - exception_string + f" Tmp file ({tmp_file_name}) deleted." + err_msg_for_pid = ( + err_msg + f" Tmp file ({tmp_file_name}) deleted." ) - logging.debug(exception_string_for_pid) - raise NonMatchingChecksum(exception_string_for_pid) + self.fhs_logger.error(err_msg_for_pid) + raise NonMatchingChecksum(err_msg_for_pid) else: - logging.debug(exception_string) - raise NonMatchingChecksum(exception_string) + self.fhs_logger.error(err_msg) + raise NonMatchingChecksum(err_msg) def _verify_hashstore_references( self, @@ -2309,10 +2031,9 @@ def _verify_hashstore_references( :param str additional_log_string: String to append to exception statement """ debug_msg = ( - f"FileHashStore - _verify_hashstore_references: verifying pid ({pid})" - + f" and cid ({cid}) refs files. Additional Note: {additional_log_string}" + f"Verifying pid ({pid}) and cid ({cid}) refs files. {additional_log_string}" ) - logging.debug(debug_msg) + self.fhs_logger.debug(debug_msg) if pid_refs_path is None: pid_refs_path = self._get_hashstore_pid_refs_path(pid) if cid_refs_path is None: @@ -2320,42 +2041,34 @@ def _verify_hashstore_references( # Check that reference files were created if not os.path.isfile(pid_refs_path): - exception_string = ( - "FileHashStore - _verify_hashstore_references: Pid refs file missing: " - + str(pid_refs_path) - + f" . Additional Context: {additional_log_string}" - ) - logging.error(exception_string) - raise PidRefsFileNotFound(exception_string) + err_msg = f" Pid refs file missing: {pid_refs_path}. Note: {additional_log_string}" + self.fhs_logger.error(err_msg) + raise PidRefsFileNotFound(err_msg) if not os.path.isfile(cid_refs_path): - exception_string = ( - "FileHashStore - _verify_hashstore_references: Cid refs file missing: " - + str(cid_refs_path) - + f" . Additional Context: {additional_log_string}" + err_msg = ( + f"Cid refs file missing: {cid_refs_path}. Note: {additional_log_string}" ) - logging.error(exception_string) - raise CidRefsFileNotFound(exception_string) + self.fhs_logger.error(err_msg) + raise CidRefsFileNotFound(err_msg) # Check the content of the reference files # Start with the cid retrieved_cid = self._read_small_file_content(pid_refs_path) if retrieved_cid != cid: - exception_string = ( - "FileHashStore - _verify_hashstore_references: Pid refs file exists" - + f" ({pid_refs_path}) but cid ({cid}) does not match." - + f" Additional Context: {additional_log_string}" + err_msg = ( + f"Pid refs file exists ({pid_refs_path}) but cid ({cid}) does not match." + + f" Note: {additional_log_string}" ) - logging.error(exception_string) - raise PidRefsContentError(exception_string) + self.fhs_logger.error(err_msg) + raise PidRefsContentError(err_msg) # Then the pid pid_found = self._is_string_in_refs_file(pid, cid_refs_path) if not pid_found: - exception_string = ( - "FileHashStore - _verify_hashstore_references: Cid refs file exists" - + f" ({cid_refs_path}) but pid ({pid}) not found." - + f" Additional Context: {additional_log_string}" + err_msg = ( + f"Cid refs file exists ({cid_refs_path}) but pid ({pid}) not found." + + f" Note: {additional_log_string}" ) - logging.error(exception_string) - raise CidRefsContentError(exception_string) + self.fhs_logger.error(err_msg) + raise CidRefsContentError(err_msg) def _delete_object_only(self, cid: str) -> None: """Attempt to delete an object based on the given content identifier (cid). If the object @@ -2363,50 +2076,23 @@ def _delete_object_only(self, cid: str) -> None: :param str cid: Content identifier """ - cid_refs_abs_path = self._get_hashstore_cid_refs_path(cid) - # If the refs file still exists, do not delete the object - if not os.path.isfile(cid_refs_abs_path): - sync_begin_debug_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) to locked list." - ) - sync_wait_msg = ( - f"FileHashStore - delete_object: Cid ({cid}) is locked. Waiting." - ) - if self.use_multiprocessing: - with self.object_cid_condition_mp: - # Wait for the cid to release if it's in use - while cid in self.object_locked_cids_mp: - logging.debug(sync_wait_msg) - self.object_cid_condition_mp.wait() - # Modify reference_locked_cids consecutively - logging.debug(sync_begin_debug_msg) - self.object_locked_cids_mp.append(cid) - else: - with self.object_cid_condition_th: - while cid in self.object_locked_cids_th: - logging.debug(sync_wait_msg) - self.object_cid_condition_th.wait() - logging.debug(sync_begin_debug_msg) - self.object_locked_cids_th.append(cid) + try: + cid_refs_abs_path = self._get_hashstore_cid_refs_path(cid) + # If the refs file still exists, do not delete the object + self._synchronize_object_locked_cids(cid) + if os.path.isfile(cid_refs_abs_path): + debug_msg = ( + f"Cid reference file exists for: {cid}, skipping delete request." + ) + self.fhs_logger.debug(debug_msg) - try: + else: self._delete("objects", cid) - finally: - # Release cid - end_sync_debug_msg = ( - f"FileHashStore - delete_object: Releasing cid ({cid})" - + " from locked list" - ) - if self.use_multiprocessing: - with self.object_cid_condition_mp: - logging.debug(end_sync_debug_msg) - self.object_locked_cids_mp.remove(cid) - self.object_cid_condition_mp.notify() - else: - with self.object_cid_condition_th: - logging.debug(end_sync_debug_msg) - self.object_locked_cids_th.remove(cid) - self.object_cid_condition_th.notify() + info_msg = f"Deleted object only for cid: {cid}" + self.fhs_logger.info(info_msg) + + finally: + self._release_object_locked_cids(cid) def _check_arg_algorithms_and_checksum( self, @@ -2450,9 +2136,9 @@ def _check_arg_format_id(self, format_id: str, method: str) -> str: :return: Valid metadata namespace. """ if format_id and not format_id.strip(): - exception_string = f"FileHashStore - {method}: Format_id cannot be empty." - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = f"FileHashStore - {method}: Format_id cannot be empty." + self.fhs_logger.error(err_msg) + raise ValueError(err_msg) elif format_id is None: # Use default value set by hashstore config checked_format_id = self.sysmeta_ns @@ -2475,19 +2161,19 @@ def _refine_algorithm_list( self._clean_algorithm(checksum_algorithm) if checksum_algorithm in self.other_algo_list: debug_additional_other_algo_str = ( - f"FileHashStore - _refine_algorithm_list: checksum algo: {checksum_algorithm}" - + " found in other_algo_lists, adding to list of algorithms to calculate." + f"Checksum algo: {checksum_algorithm} found in other_algo_lists, adding to " + + f"list of algorithms to calculate." ) - logging.debug(debug_additional_other_algo_str) + self.fhs_logger.debug(debug_additional_other_algo_str) algorithm_list_to_calculate.append(checksum_algorithm) if additional_algorithm is not None: self._clean_algorithm(additional_algorithm) if additional_algorithm in self.other_algo_list: debug_additional_other_algo_str = ( - f"FileHashStore - _refine_algorithm_list: addit algo: {additional_algorithm}" - + " found in other_algo_lists, adding to list of algorithms to calculate." + f"Additional algo: {additional_algorithm} found in other_algo_lists, " + + f"adding to list of algorithms to calculate." ) - logging.debug(debug_additional_other_algo_str) + self.fhs_logger.debug(debug_additional_other_algo_str) algorithm_list_to_calculate.append(additional_algorithm) # Remove duplicates @@ -2515,12 +2201,9 @@ def _clean_algorithm(self, algorithm_string: str) -> str: cleaned_string not in self.default_algo_list and cleaned_string not in self.other_algo_list ): - exception_string = ( - "FileHashStore: _clean_algorithm: Algorithm not supported:" - + cleaned_string - ) - logging.error(exception_string) - raise UnsupportedAlgorithm(exception_string) + err_msg = f"Algorithm not supported: {cleaned_string}" + self.fhs_logger.error(err_msg) + raise UnsupportedAlgorithm(err_msg) return cleaned_string def _computehash( @@ -2683,10 +2366,8 @@ def _delete(self, entity: str, file: Union[str, Path]) -> None: os.remove(realpath) except Exception as err: - exception_string = ( - f"FileHashStore - delete(): Unexpected {err=}, {type(err)=}" - ) - logging.error(exception_string) + err_msg = f"FileHashStore - delete(): Unexpected {err=}, {type(err)=}" + self.fhs_logger.error(err_msg) raise err def _create_path(self, path: Path) -> None: @@ -2758,9 +2439,8 @@ def _get_hashstore_data_object_path(self, cid_or_relative_path: str) -> Path: return Path(relpath) else: raise FileNotFoundError( - "FileHashStore - _get_hashstore_data_object_path: could not locate a" - + "data object in '/objects' for the supplied cid_or_relative_path: " - + cid_or_relative_path + "Could not locate a data object in '/objects' for the supplied " + + f"cid_or_relative_path: {cid_or_relative_path}" ) def _get_hashstore_metadata_path(self, metadata_relative_path: str) -> Path: @@ -2781,9 +2461,8 @@ def _get_hashstore_metadata_path(self, metadata_relative_path: str) -> Path: return Path(metadata_relative_path) else: raise FileNotFoundError( - "FileHashStore - _get_hashstore_metadata_path: could not locate a" - + "metadata object in '/metadata' for the supplied metadata_relative_path: " - + str(metadata_relative_path) + "Could not locate a metadata object in '/metadata' for the supplied " + + f"metadata_relative_path: {metadata_relative_path}" ) def _get_hashstore_pid_refs_path(self, pid: str) -> Path: @@ -2815,6 +2494,28 @@ def _get_hashstore_cid_refs_path(self, cid: str) -> Path: # Synchronization Methods + def _synchronize_object_locked_pids(self, pid: str) -> None: + """Threads must work with 'pid's one identifier at a time to ensure thread safety when + handling requests to store, delete or tag pids. + + :param str pid: Persistent or authority-based identifier + """ + if self.use_multiprocessing: + with self.object_pid_condition_mp: + # Wait for the cid to release if it's being tagged + while pid in self.object_locked_pids_mp: + self.fhs_logger.debug(f"Pid ({pid}) is locked. Waiting.") + self.object_pid_condition_mp.wait() + self.object_locked_pids_mp.append(pid) + self.fhs_logger.debug(f"Synchronizing object_locked_pids_mp for pid: {pid}") + else: + with self.object_pid_condition_th: + while pid in self.object_locked_pids_th: + self.fhs_logger.debug(f"Pid ({pid}) is locked. Waiting.") + self.object_pid_condition_th.wait() + self.object_locked_pids_th.append(pid) + self.fhs_logger.debug(f"Synchronizing object_locked_pids_th for pid: {pid}") + def _release_object_locked_pids(self, pid: str) -> None: """Remove the given persistent identifier from 'object_locked_pids' and notify other waiting threads or processes. @@ -2825,11 +2526,13 @@ def _release_object_locked_pids(self, pid: str) -> None: with self.object_pid_condition_mp: self.object_locked_pids_mp.remove(pid) self.object_pid_condition_mp.notify() + self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_mp.") else: # Release pid with self.object_pid_condition_th: self.object_locked_pids_th.remove(pid) self.object_pid_condition_th.notify() + self.fhs_logger.debug(f"Releasing pid ({pid}) from object_locked_pids_th.") def _synchronize_object_locked_cids(self, cid: str) -> None: """Multiple threads may access a data object via its 'cid' or the respective 'cid @@ -2842,28 +2545,18 @@ def _synchronize_object_locked_cids(self, cid: str) -> None: with self.object_cid_condition_mp: # Wait for the cid to release if it's being tagged while cid in self.object_locked_cids_mp: - logging.debug( - f"synchronize_referenced_locked_cids: Cid ({cid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Cid ({cid}) is locked. Waiting.") self.object_cid_condition_mp.wait() # Modify reference_locked_cids consecutively self.object_locked_cids_mp.append(cid) - logging.debug( - f"synchronize_referenced_locked_cids: Synchronizing object_locked_cids_mp for" - + f" cid: {cid}" - ) + self.fhs_logger.debug(f"Synchronizing object_locked_cids_mp for cid: {cid}") else: with self.object_cid_condition_th: while cid in self.object_locked_cids_th: - logging.debug( - f"synchronize_referenced_locked_cids: Cid ({cid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Cid ({cid}) is locked. Waiting.") self.object_cid_condition_th.wait() self.object_locked_cids_th.append(cid) - logging.debug( - f"synchronize_referenced_locked_cids: Synchronizing object_locked_cids_th for" - + f" cid: {cid}" - ) + self.fhs_logger.debug(f"Synchronizing object_locked_cids_th for cid: {cid}") def _check_object_locked_cids(self, cid: str) -> None: """Check that a given content identifier is currently locked (found in the @@ -2873,13 +2566,13 @@ def _check_object_locked_cids(self, cid: str) -> None: """ if self.use_multiprocessing: if cid not in self.object_locked_cids_mp: - err_msg = f"_check_object_locked_cids: cid {cid} is not locked." - logging.error(err_msg) + err_msg = f"Cid {cid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) else: if cid not in self.object_locked_cids_th: - err_msg = f"_check_object_locked_cids: cid {cid} is not locked." - logging.error(err_msg) + err_msg = f"Cid {cid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) def _release_object_locked_cids(self, cid: str) -> None: @@ -2892,20 +2585,16 @@ def _release_object_locked_cids(self, cid: str) -> None: with self.object_cid_condition_mp: self.object_locked_cids_mp.remove(cid) self.object_cid_condition_mp.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_object_locked_cids: Releasing cid ({cid}) from" - + " object_cid_condition_mp." - ) - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing cid ({cid}) from object_cid_condition_mp." + ) else: with self.object_cid_condition_th: self.object_locked_cids_th.remove(cid) self.object_cid_condition_th.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_object_locked_cids: Releasing cid ({cid}) from" - + " object_cid_condition_th." - ) - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing cid ({cid}) from object_cid_condition_th." + ) def _synchronize_referenced_locked_pids(self, pid: str) -> None: """Multiple threads may interact with a pid (to tag, untag, delete) and these actions @@ -2917,28 +2606,22 @@ def _synchronize_referenced_locked_pids(self, pid: str) -> None: with self.reference_pid_condition_mp: # Wait for the pid to release if it's in use while pid in self.reference_locked_pids_mp: - logging.debug( - f"_synchronize_referenced_locked_pids: Pid ({pid}) is locked. Waiting." - ) + self.fhs_logger.debug(f"Pid ({pid}) is locked. Waiting.") self.reference_pid_condition_mp.wait() # Modify reference_locked_pids consecutively self.reference_locked_pids_mp.append(pid) - logging.debug( - f"_synchronize_referenced_locked_pids: Synchronizing reference_locked_pids_mp" - + f" for pid: {pid}" - ) + self.fhs_logger.debug( + f"Synchronizing reference_locked_pids_mp for pid: {pid}" + ) else: with self.reference_pid_condition_th: while pid in self.reference_locked_pids_th: - logging.debug( - f"_synchronize_referenced_locked_pids: Pid ({pid}) is locked. Waiting." - ) + logging.debug(f"Pid ({pid}) is locked. Waiting.") self.reference_pid_condition_th.wait() self.reference_locked_pids_th.append(pid) - logging.debug( - f"_synchronize_referenced_locked_pids: Synchronizing reference_locked_pids_th" - + f" for pid: {pid}" - ) + self.fhs_logger.debug( + f"Synchronizing reference_locked_pids_th for pid: {pid}" + ) def _check_reference_locked_pids(self, pid: str) -> None: """Check that a given persistent identifier is currently locked (found in the @@ -2948,13 +2631,13 @@ def _check_reference_locked_pids(self, pid: str) -> None: """ if self.use_multiprocessing: if pid not in self.reference_locked_pids_mp: - err_msg = f"_check_reference_locked_pids: pid {pid} is not locked." - logging.error(err_msg) + err_msg = f"Pid {pid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) else: if pid not in self.reference_locked_pids_th: - err_msg = f"_check_reference_locked_pids: pid {pid} is not locked." - logging.error(err_msg) + err_msg = f"Pid {pid} is not locked." + self.fhs_logger.error(err_msg) raise IdentifierNotLocked(err_msg) def _release_reference_locked_pids(self, pid: str) -> None: @@ -2967,21 +2650,17 @@ def _release_reference_locked_pids(self, pid: str) -> None: with self.reference_pid_condition_mp: self.reference_locked_pids_mp.remove(pid) self.reference_pid_condition_mp.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_reference_locked_pids: Releasing pid ({pid}) from" - + " reference_locked_pids_mp." - ) - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing pid ({pid}) from reference_locked_pids_mp." + ) else: # Release pid with self.reference_pid_condition_th: self.reference_locked_pids_th.remove(pid) self.reference_pid_condition_th.notify() - end_sync_debug_msg = ( - f"FileHashStore - _release_reference_locked_pids: Releasing pid ({pid}) from" - + " reference_locked_pids_th." - ) - logging.debug(end_sync_debug_msg) + self.fhs_logger.debug( + f"Releasing pid ({pid}) from reference_locked_pids_th." + ) # Other Static Methods @staticmethod @@ -3046,19 +2725,19 @@ def _check_arg_data(data: Union[str, os.PathLike, io.BufferedReader]) -> bool: and not isinstance(data, Path) and not isinstance(data, io.BufferedIOBase) ): - exception_string = ( + err_msg = ( "FileHashStore - _validate_arg_data: Data must be a path, string or buffered" + f" stream type. Data type supplied: {type(data)}" ) - logging.error(exception_string) - raise TypeError(exception_string) + logging.error(err_msg) + raise TypeError(err_msg) if isinstance(data, str): if data.strip() == "": - exception_string = ( + err_msg = ( "FileHashStore - _validate_arg_data: Data string cannot be empty." ) - logging.error(exception_string) - raise TypeError(exception_string) + logging.error(err_msg) + raise TypeError(err_msg) return True @staticmethod @@ -3070,18 +2749,16 @@ def _check_integer(file_size: int) -> None: """ if file_size is not None: if not isinstance(file_size, int): - exception_string = ( + err_msg = ( "FileHashStore - _check_integer: size given must be an integer." + f" File size: {file_size}. Arg Type: {type(file_size)}." ) - logging.error(exception_string) - raise TypeError(exception_string) + logging.error(err_msg) + raise TypeError(err_msg) if file_size < 1: - exception_string = ( - "FileHashStore - _check_integer: size given must be > 0" - ) - logging.error(exception_string) - raise ValueError(exception_string) + err_msg = "FileHashStore - _check_integer: size given must be > 0" + logging.error(err_msg) + raise ValueError(err_msg) @staticmethod def _check_string(string: str, arg: str) -> None: @@ -3093,12 +2770,12 @@ def _check_string(string: str, arg: str) -> None: """ if string is None or string.strip() == "" or any(ch.isspace() for ch in string): method = inspect.stack()[1].function - exception_string = ( + err_msg = ( f"FileHashStore - {method}: {arg} cannot be None" + f" or empty, {arg}: {string}." ) - logging.error(exception_string) - raise ValueError(exception_string) + logging.error(err_msg) + raise ValueError(err_msg) @staticmethod def _cast_to_bytes(text: any) -> bytes: