From ab19155cdf0442be4aceea720141cef7b7057889 Mon Sep 17 00:00:00 2001 From: Giovanni Pellerano Date: Thu, 19 Dec 2024 22:18:45 +0100 Subject: [PATCH] Perform optimization rewrite and extend testing --- README.md | 25 +-- src/globaleaks_eph_fs/__init__.py | 356 +++++++++++++++++------------- tests/test.py | 261 +++++++++++++++++----- 3 files changed, 412 insertions(+), 230 deletions(-) diff --git a/README.md b/README.md index 63da8d3..588e0e2 100644 --- a/README.md +++ b/README.md @@ -21,43 +21,30 @@ pip install globaleaks-eph-fs To mount the filesystem from the command line: ```bash -globaleaks-eph-fs [--storage_directory ] +globaleaks-eph-fs [--storage_directory ] ``` +- `--storage_directory STORAGE_DIRECTORY` (optional): The directory used for storage. If not provided, a temporary directory will be used. - ``: The path where the filesystem will be mounted. -- `--storage_directory` (optional): The directory used for storage. If not provided, a temporary directory will be used. ### Python API You can also use `globaleaks-eph-fs` within your Python code. Here's an example: ```python -import argparse -from globaleaks_eph_fs import EphemeralFS +from globaleaks_eph_fs import mount_globaleaks_eph_fs -def main(): - parser = argparse.ArgumentParser(description="GLOBALEAKS EPHEMERAL FS") - parser.add_argument('mount_point', help="Path to mount the filesystem") - parser.add_argument('--storage_directory', '-s', help="Optional storage directory. Defaults to a temporary directory.") - args = parser.parse_args() +eph_fs_thread = mount_globaleaks_eph_fs("/mnt/globaleaks-eph-fs") - EphemeralFS(args.mount_point, args.storage_directory, nothreads=True, foreground=True) - - -if __name__ == '__main__': - main() +eph_fs_thread.join() ``` -### Arguments - -- `mount_point` (required): The directory where the encrypted filesystem will be mounted. -- `--storage_directory` (optional): Specify a custom storage directory for the filesystem. If not provided, a temporary directory will be used. - ## Features - **ChaCha20 Encryption**: All data stored in the filesystem is encrypted with ChaCha20. - **FUSE Integration**: Mount the filesystem as a virtual disk using FUSE. - **Temporary Storage**: The filesystem is ephemeral and can use a temporary directory for storage. +- **Metadata Free**: The filesystem preserve only files content enforcing random uuid4 files' names. ## Requirements diff --git a/src/globaleaks_eph_fs/__init__.py b/src/globaleaks_eph_fs/__init__.py index b6b69e6..4158009 100644 --- a/src/globaleaks_eph_fs/__init__.py +++ b/src/globaleaks_eph_fs/__init__.py @@ -1,7 +1,11 @@ import argparse +import atexit import errno import os +import re import stat +import sys +import subprocess import uuid import threading from fuse import FUSE, FuseOSError, Operations @@ -9,44 +13,89 @@ from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.ciphers.algorithms import ChaCha20 +CHUNK_SIZE = 4096 + +UUID4_PATTERN = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', re.IGNORECASE) + +def is_valid_uuid4(filename): + """ + Validates if the given filename follows the UUIDv4 format. + + :param filename: The name of the file. + :return: True if the filename is a valid UUIDv4, otherwise False. + """ + return bool(UUID4_PATTERN.match(filename)) + +def is_mount_point(path): + """ + Checks if the given path is a mount point. + + A mount point is a directory where a filesystem is attached. This function checks + if the provided path is currently being used as a mount point by querying the + system's mount information. + + :param path: The directory path to check if it is a mount point. + :return: True if the given path is a mount point, otherwise False. + :raises Exception: If there is an error while running the 'mount' command or parsing the result. + """ + result = subprocess.run(['mount'], capture_output=True, text=True) + return any(os.path.abspath(path) in line for line in result.stdout.splitlines()) + +def unmount_if_mounted(path): + """ + Checks if the given path is a mount point and attempts to unmount it. + + :param path: The path to check and unmount if it is a mount point. + """ + if is_mount_point(path): + subprocess.run(['fusermount', '-u', path]) + class EphemeralFile: - def __init__(self, filesdir): + def __init__(self, filesdir, filename=None): """ Initializes an ephemeral file with ChaCha20 encryption. Creates a new random file path and generates a unique encryption key and nonce. :param filesdir: The directory where the ephemeral file will be stored. + :param filenames: Optional filename. If not provided, a UUID4 is used. """ - self.fd = None - self.filename = str(uuid.uuid4()) # UUID as a string for the file name - self.filepath = os.path.join(filesdir, self.filename) - self.nonce = os.urandom(16) # 128-bit nonce - self.key = os.urandom(32) # 256-bit key - self.cipher = Cipher(ChaCha20(self.key, self.nonce), mode=None) - self.size = 0 - self.position = 0 + filename = filename or str(uuid.uuid4()) # If filenames is None, generate a random UUID as a string + self.filepath = os.path.join(filesdir, filename) + self.cipher = Cipher(ChaCha20(os.urandom(32), uuid.UUID(filename).bytes[:16]), mode=None) self.enc = self.cipher.encryptor() - self.dec = None - self.creation_time = os.path.getctime(self.filepath) if os.path.exists(self.filepath) else None - self.last_access_time = None - self.last_mod_time = None - self.mutex = threading.Lock() + self.dec = self.cipher.decryptor() + + self.fd = None + + def __getattribute__(self, name): + """ + Intercepts attribute access for the `EphemeralFile` class. + + If the attribute being accessed is 'size', it returns the size of the file + by checking the file's attributes using os.stat. For all other attributes, + it defers to the default behavior of `__getattribute__`, allowing normal + attribute access. + + :param name: The name of the attribute being accessed. + :return: The value of the requested attribute. If 'size' is requested, + the size of the file is returned. Otherwise, the default + behavior for attribute access is used. + """ + if name == "size": + return os.stat(self.filepath).st_size - def open(self, mode): + # For everything else, defer to the default behavior + return super().__getattribute__(name) + + def open(self, flags, mode=0o660): """ Opens the ephemeral file for reading or writing. :param mode: 'w' for writing, 'r' for reading. :return: The file object. """ - with self.mutex: - if mode == 'w': - self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND) - self.dec = None - else: - self.fd = os.open(self.filepath, os.O_RDONLY) - self.dec = self.cipher.decryptor() - self.last_access_time = os.path.getatime(self.filepath) + self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND, mode) + os.chmod(self.filepath, mode) return self def write(self, data): @@ -55,14 +104,9 @@ def write(self, data): :param data: Data to write to the file, can be a string or bytes. """ - with self.mutex: - if isinstance(data, str): - data = data.encode('utf-8') - encrypted_data = self.enc.update(data) - os.write(self.fd, encrypted_data) - self.size += len(data) - self.position += len(data) - self.last_mod_time = os.path.getmtime(self.filepath) + #if isinstance(data, str): + # data = data.encode('utf-8') + os.write(self.fd, self.enc.update(data)) def read(self, size=None): """ @@ -71,27 +115,22 @@ def read(self, size=None): :param size: The number of bytes to read. If None, reads until the end of the file. :return: The decrypted data read from the file. """ - with self.mutex: - data = b"" - bytes_read = 0 + data = b"" + bytes_read = 0 - while True: - # Determine how much to read in this chunk - chunk_size = min(4096, size - bytes_read) if size is not None else 4096 + while True: + # Determine how much to read in this chunk + chunk_size = min(CHUNK_SIZE, size - bytes_read) if size is not None else CHUNK_SIZE - chunk = os.read(self.fd, chunk_size) - if not chunk: # End of file - break + chunk = os.read(self.fd, chunk_size) + if not chunk: # End of file + break - data += self.dec.update(chunk) - bytes_read += len(chunk) + data += self.dec.update(chunk) + bytes_read += len(chunk) - if size is not None and bytes_read >= size: - break - - # Update the last access time and position - self.last_access_time = os.path.getatime(self.filepath) - self.position += bytes_read # Update position after read + if size is not None and bytes_read >= size: + break return data @@ -101,34 +140,32 @@ def seek(self, offset): :param offset: The offset to seek to. """ - with self.mutex: - if self.position != offset: - if offset < self.position: - self.dec = self.cipher.decryptor() # Reuse the existing cipher instance - os.lseek(self.fd, 0, os.SEEK_SET) - self.position = 0 - - discard_size = offset - self.position - chunk_size = 4096 - while discard_size > 0: - to_read = min(discard_size, chunk_size) - self.dec.update(os.read(self.fd, to_read)) # Discard the data - discard_size -= to_read - - self.position = offset # Update position after seek + position = 0 + self.dec = self.cipher.decryptor() + self.enc = self.cipher.encryptor() + os.lseek(self.fd, 0, os.SEEK_SET) + discard_size = offset - position + while discard_size > 0: + to_read = min(CHUNK_SIZE, discard_size) + data = self.dec.update(os.read(self.fd, to_read)) + data = self.enc.update(data) + discard_size -= to_read def tell(self): - with self.mutex: - return self.position + """ + Returns the current position in the file. + + :return: The current position in the file. + """ + return os.lseek(self.fd, 0, os.SEEK_CUR) def close(self): """ Closes the file descriptor. """ - with self.mutex: - if self.fd is not None: - os.close(self.fd) - self.fd = None + if self.fd is not None: + os.close(self.fd) + self.fd = None def __enter__(self): """ @@ -148,26 +185,28 @@ def __del__(self): """ self.close() try: - os.remove(self.filepath) + os.unlink(self.filepath) except FileNotFoundError: pass class EphemeralOperations(Operations): + use_ns = True def __init__(self, storage_directory=None): """ Initializes the operations for the ephemeral filesystem. :param storage_directory: The directory to store the files. Defaults to a temporary directory. """ - if storage_directory is None: - storage_directory = mkdtemp() - self.storage_directory = storage_directory + self.storage_directory = storage_directory if storage_directory is not None else mkdtemp() self.files = {} # Track open files and their secure temporary file handlers - self.default_permissions = 0o660 # Default permissions for files (user read/write) - self.uid = os.getuid() # Current user's UID - self.gid = os.getgid() # Current user's GID self.mutex = threading.Lock() + def get_file(self, path): + file = self.files.get(path) + if file is None: + raise FuseOSError(errno.ENOENT) + return file + def getattr(self, path, fh=None): """ Retrieves file or directory attributes. @@ -178,21 +217,18 @@ def getattr(self, path, fh=None): """ with self.mutex: if path == '/': - st = {'st_mode': (stat.S_IFDIR | 0o750), 'st_nlink': 2} - else: - file = self.files.get(path) - if file is None or not os.path.exists(file.filepath): - raise OSError(errno.ENOENT, "No such file or directory", path) - st = { - 'st_mode': (stat.S_IFREG | 0o660), - 'st_size': file.size, - 'st_nlink': 1, - 'st_uid': self.uid, - 'st_gid': self.gid, - 'st_atime': file.last_access_time if file.last_access_time else os.path.getatime(file.filepath), - 'st_mtime': file.last_mod_time if file.last_mod_time else os.path.getmtime(file.filepath), - 'st_ctime': file.creation_time if file.creation_time else os.path.getctime(file.filepath), - } + return {'st_mode': (stat.S_IFDIR | 0o750), 'st_nlink': 2} + + file = self.get_file(path) + + file_stat = os.stat(file.filepath) + + st = {key: getattr(file_stat, key) for key in dir(file_stat) if not key.startswith('__')} + + st['st_mode'] |= stat.S_IFDIR if stat.S_ISDIR(file_stat.st_mode) else 0 + st['st_mode'] |= stat.S_IFREG if stat.S_ISREG(file_stat.st_mode) else 0 + st['st_mode'] |= stat.S_IFLNK if stat.S_ISLNK(file_stat.st_mode) else 0 + return st def readdir(self, path, fh=None): @@ -214,11 +250,13 @@ def create(self, path, mode): :param mode: The mode in which the file will be opened. :return: The file descriptor. """ + filename = os.path.basename(path) + if not is_valid_uuid4(filename): + raise FuseOSError(errno.ENOENT) + with self.mutex: - file = EphemeralFile(self.storage_directory) - file.open('w') - os.chmod(file.filepath, self.default_permissions) - os.chown(file.filepath, self.uid, self.gid) + file = EphemeralFile(self.storage_directory, filename) + file.open('w', mode) self.files[path] = file return file.fd @@ -231,11 +269,8 @@ def open(self, path, flags): :return: The file descriptor. """ with self.mutex: - file = self.files.get(path) - if path not in self.files: - raise FuseOSError(errno.ENOENT) - mode = 'w' if (flags & os.O_RDWR or flags & os.O_WRONLY) else 'r' - file.open(mode) + file = self.get_file(path) + file.open('w' if (flags & os.O_RDWR or flags & os.O_WRONLY) else 'r') return file.fd def read(self, path, size, offset, fh=None): @@ -249,9 +284,7 @@ def read(self, path, size, offset, fh=None): :return: The data read from the file. """ with self.mutex: - file = self.files.get(path) - if file is None: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) file.seek(offset) return file.read(size) @@ -266,7 +299,7 @@ def write(self, path, data, offset, fh=None): :return: The number of bytes written. """ with self.mutex: - file = self.files.get(path) + file = self.get_file(path) file.write(data) return len(data) @@ -280,7 +313,7 @@ def unlink(self, path): file = self.files.pop(path, None) if file: file.close() - os.remove(file.filepath) + os.unlink(file.filepath) def release(self, path, fh=None): """ @@ -290,9 +323,7 @@ def release(self, path, fh=None): :param fh: File handle (not used here). """ with self.mutex: - file = self.files.get(path) - if file: - file.close() + self.get_file(path).close() def truncate(self, path, length, fh=None): """ @@ -305,70 +336,68 @@ def truncate(self, path, length, fh=None): :param length: The new size of the file. """ with self.mutex: - original_file = self.files.get(path) - if original_file is None: - raise FuseOSError(errno.ENOENT) - - if length < original_file.size: - original_file.open('r') + file = self.get_file(path) - truncated_file = EphemeralFile(self.storage_directory) - truncated_file.open('w') - os.chmod(truncated_file.filepath, self.default_permissions) - os.chown(truncated_file.filepath, self.uid, self.gid) + if length < file.size: + os.truncate(file.filepath, length) - truncated_file.open('w') + file.seek(length) + if length > file.size: + length = length - file.size bytes_written = 0 - chunk_size = 4096 while bytes_written < length: - remaining = length - bytes_written - to_read = min(chunk_size, remaining) - truncated_file.write(original_file.read(to_read)) - bytes_written += to_read - - del original_file - self.files[path] = truncated_file - elif length > original_file.size: - length = length - original_file.size - original_file.open('w') - bytes_written = 0 - chunk_size = 4096 - while bytes_written < length: - remaining = length - bytes_written - to_write = min(chunk_size, remaining) - original_file.write(b'\0' * to_write) + to_write = min(CHUNK_SIZE, length - bytes_written) + file.write(b'\0' * to_write) bytes_written += to_write -class EphemeralFS(FUSE): - """ - A class that mounts an ephemeral filesystem at the given mount point. - Inherits from FUSE to provide the filesystem mounting functionality. + def chmod(self, path, mode): + """ + Changes the permissions of the file at the specified path. - Args: - mount_point (str): The path where the filesystem will be mounted. - storage_directory (str, optional): The directory used for storage. If None, uses a temporary directory. - **fuse_args: Additional arguments to pass to the FUSE constructor. - """ - def __init__(self, mount_point, storage_directory=None, **fuse_args): + :param path: The file path whose permissions will be changed. + :param mode: The new permissions mode (e.g., 0o777 for full permissions). + :raises FuseOSError: If the file does not exist. + """ + file = self.get_file(path) + return os.chmod(file.filepath, mode) + + def chown(self, path, uid, gid): """ - Initializes and mounts the ephemeral filesystem. + Changes the ownership of the file at the specified path. - :param mount_point: The path where the filesystem will be mounted. - :param storage_directory: The directory to store the files (optional). + :param path: The file path whose ownership will be changed. + :param uid: The user ID (uid) to set as the new owner. + :param gid: The group ID (gid) to set as the new group owner. + :raises FuseOSError: If the file does not exist. """ - self.mount_point = mount_point - self.storage_directory = storage_directory + file = self.get_file(path) + return os.chown(file.filepath, uid, gid) + +def mount_globaleaks_eph_fs(mount_point, storage_directory=None, foreground=False): + """ + Initializes and mounts the ephemeral filesystem. + :param mount_point: The path where the filesystem will be mounted. + :param storage_directory: The directory to store the files (optional). + :return: A `FUSE` object that represents the mounted filesystem. + """ + def _mount_globaleaks_eph_fs(mount_point, storage_directory=None, foreground=False): # Create the mount point directory if it does not exist - os.makedirs(self.mount_point, exist_ok=True) + os.makedirs(mount_point, exist_ok=True) # If a storage directory is specified, create it as well - if self.storage_directory: - os.makedirs(self.storage_directory, exist_ok=True) + if storage_directory: + os.makedirs(storage_directory, exist_ok=True) + + return FUSE(EphemeralOperations(storage_directory), mount_point, foreground=foreground) + + thread = threading.Thread(target=_mount_globaleaks_eph_fs, args=(mount_point, storage_directory, foreground)) + thread.start() - # Initialize the FUSE mount with the EphemeralFS - super().__init__(EphemeralOperations(self.storage_directory), self.mount_point, **fuse_args) + atexit.register(unmount_if_mounted, mount_point) + + return thread def main(): """ @@ -378,7 +407,16 @@ def main(): parser.add_argument('mount_point', help="Path to mount the filesystem") parser.add_argument('--storage_directory', '-s', help="Optional storage directory. Defaults to a temporary directory.") args = parser.parse_args() - EphemeralFS(args.mount_point, args.storage_directory, foreground=True) -if __name__ == '__main__': - main() + unmount_if_mounted(args.mount_point) + + try: + print(f"Mounting GLOBALEAKS EPH FS at {args.mount_point}") + mount_globaleaks_eph_fs(args.mount_point, args.storage_directory, True).join() + + except KeyboardInterrupt: + sys.exit(0) + except: + sys.exit(1) + +if __name__ == '__main__': main() diff --git a/tests/test.py b/tests/test.py index 4b92574..bd22480 100644 --- a/tests/test.py +++ b/tests/test.py @@ -2,14 +2,22 @@ import os import shutil import stat +import tempfile import unittest -from tempfile import mkdtemp +import uuid + from fuse import FuseOSError -from globaleaks_eph_fs import EphemeralFile, EphemeralOperations +from tempfile import mkdtemp +from unittest.mock import patch, MagicMock -TEST_PATH = 'TESTFILE.TXT' +from globaleaks_eph_fs import EphemeralFile, EphemeralOperations, mount_globaleaks_eph_fs, main, unmount_if_mounted + +TEST_PATH = str(uuid.uuid4()) TEST_DATA = b"Hello, world! This is a test data for writing, seeking and reading operations." +ORIGINAL_SIZE = len(TEST_DATA) +EXTENDED_SIZE = ORIGINAL_SIZE*2 +REDUCED_SIZE = ORIGINAL_SIZE//2 class TestEphemeralFile(unittest.TestCase): def setUp(self): @@ -46,116 +54,265 @@ def test_encryption_and_decryption(self): self.assertEqual(read_data, expected) # Verify the data matches the expected value def test_file_cleanup(self): - TEST_PATH = self.ephemeral_file.filepath + path_copy = self.ephemeral_file.filepath del self.ephemeral_file - self.assertFalse(os.path.exists(TEST_PATH)) + self.assertFalse(os.path.exists(path_copy)) class TestEphemeralOperations(unittest.TestCase): def setUp(self): self.storage_dir = mkdtemp() - self.operations = EphemeralOperations(self.storage_dir) + self.fs = EphemeralOperations(self.storage_dir) + + # Get current user's UID and GID + self.current_uid = os.getuid() + self.current_gid = os.getgid() def tearDown(self): - for file in self.operations.files.values(): + for file in self.fs.files.values(): os.remove(file.filepath) os.rmdir(self.storage_dir) def test_create_file(self): - self.operations.create(TEST_PATH, 0o660) - self.assertIn(TEST_PATH, self.operations.files) + self.fs.create(TEST_PATH, 0o660) + self.assertIn(TEST_PATH, self.fs.files) + + def test_create_file_with_arbitrary_name(self): + with self.assertRaises(FuseOSError) as context: + self.fs.create('/arbitraryname', os.O_RDONLY) + self.assertEqual(context.exception.errno, errno.ENOENT) def test_open_existing_file(self): - self.operations.create(TEST_PATH, 0o660) - self.operations.open(TEST_PATH, os.O_RDONLY) + self.fs.create(TEST_PATH, 0o660) + self.fs.open(TEST_PATH, os.O_RDONLY) def test_write_and_read_file(self): - self.operations.create(TEST_PATH, 0o660) + self.fs.create(TEST_PATH, 0o660) - self.operations.open(TEST_PATH, os.O_RDWR) - self.operations.write(TEST_PATH, TEST_DATA, 0, None) + self.fs.open(TEST_PATH, os.O_RDWR) + self.fs.write(TEST_PATH, TEST_DATA, 0, None) - self.operations.release(TEST_PATH, None) + self.fs.release(TEST_PATH, None) - self.operations.open(TEST_PATH, os.O_RDONLY) + self.fs.open(TEST_PATH, os.O_RDONLY) - read_data = self.operations.read(TEST_PATH, len(TEST_DATA), 0, None) + read_data = self.fs.read(TEST_PATH, len(TEST_DATA), 0, None) self.assertEqual(read_data, TEST_DATA) - self.operations.release(TEST_PATH, None) + self.fs.release(TEST_PATH, None) def test_unlink_file(self): - self.operations.create(TEST_PATH, 0o660) - self.assertIn(TEST_PATH, self.operations.files) + self.fs.create(TEST_PATH, 0o660) + self.assertIn(TEST_PATH, self.fs.files) - self.operations.unlink(TEST_PATH) - self.assertNotIn(TEST_PATH, self.operations.files) + self.fs.unlink(TEST_PATH) + self.assertNotIn(TEST_PATH, self.fs.files) def test_file_not_found(self): with self.assertRaises(FuseOSError) as context: - self.operations.open('/nonexistentfile', os.O_RDONLY) + self.fs.open('/nonexistentfile', os.O_RDONLY) self.assertEqual(context.exception.errno, errno.ENOENT) def test_getattr_root(self): - attr = self.operations.getattr('/') - self.assertEqual(attr['st_mode'], stat.S_IFDIR | 0o750) + attr = self.fs.getattr('/') + self.assertEqual(stat.S_IFMT(attr['st_mode']), stat.S_IFDIR) + self.assertEqual(attr['st_mode'] & 0o777, 0o750) self.assertEqual(attr['st_nlink'], 2) def test_getattr_file(self): - self.operations.create(TEST_PATH, mode=0o660) + self.fs.create(TEST_PATH, mode=0o660) - attr = self.operations.getattr(TEST_PATH) + attr = self.fs.getattr(TEST_PATH) - self.assertEqual(attr['st_mode'], stat.S_IFREG | 0o660) + self.assertEqual(stat.S_IFMT(attr['st_mode']), stat.S_IFREG) + self.assertEqual(attr['st_mode'] & 0o777, 0o660) self.assertEqual(attr['st_size'], 0) self.assertEqual(attr['st_nlink'], 1) self.assertEqual(attr['st_uid'], os.getuid()) self.assertEqual(attr['st_gid'], os.getgid()) - self.assertIn('st_atime', attr) self.assertIn('st_mtime', attr) self.assertIn('st_ctime', attr) def test_getattr_nonexistent(self): with self.assertRaises(OSError) as _: - self.operations.getattr('/nonexistent') + self.fs.getattr('/nonexistent') def test_truncate(self): - ORIGINAL_SIZE =len(TEST_DATA) - REDUCED_SIZE = len(TEST_DATA)//2 + self.fs.create(TEST_PATH, 0o660) + self.fs.write(TEST_PATH, TEST_DATA, 0, None) - self.operations.create(TEST_PATH, 0o660) - self.operations.write(TEST_PATH, TEST_DATA, 0, None) - - self.operations.truncate(TEST_PATH, REDUCED_SIZE, None) - file_content = self.operations.read(TEST_PATH, ORIGINAL_SIZE, 0, None) + self.fs.truncate(TEST_PATH, REDUCED_SIZE, None) + file_content = self.fs.read(TEST_PATH, ORIGINAL_SIZE, 0, None) self.assertEqual(len(file_content), REDUCED_SIZE) self.assertEqual(file_content, TEST_DATA[:REDUCED_SIZE]) def test_extend(self): - ORIGINAL_SIZE =len(TEST_DATA) - EXTENDED_SIZE = len(TEST_DATA)*2 - - self.operations.create(TEST_PATH, 0o660) - self.operations.write(TEST_PATH, TEST_DATA, 0, None) + self.fs.create(TEST_PATH, 0o660) + self.fs.write(TEST_PATH, TEST_DATA, 0, None) - self.operations.truncate(TEST_PATH, EXTENDED_SIZE, None) - file_content = self.operations.read(TEST_PATH, EXTENDED_SIZE * 2, 0, None) + self.fs.truncate(TEST_PATH, EXTENDED_SIZE, None) + file_content = self.fs.read(TEST_PATH, EXTENDED_SIZE * 2, 0, None) self.assertEqual(file_content[:ORIGINAL_SIZE], TEST_DATA) self.assertEqual(len(file_content), EXTENDED_SIZE) self.assertTrue(all(byte == 0 for byte in file_content[ORIGINAL_SIZE:])) def test_readdir(self): - file_names = ['/file1', '/file2', '/file3'] - for file_name in file_names: - self.operations.create(file_name, 0o660) + file_names = [] + for _ in range(3): + file_names.append(str(uuid.uuid4())) + self.fs.create(file_names[-1], 0o660) + + directory_contents = self.fs.readdir('/', None) + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[1], file_names[2]}) + + self.fs.unlink(file_names[1]) + directory_contents = self.fs.readdir('/', None) + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[2]}) + + @patch("os.chmod") + def test_chmod_success(self, mock_chmod): + self.fs.create(TEST_PATH, 0o660) + mock_chmod.assert_called_with(self.fs.files[TEST_PATH].filepath, 0o660) + self.fs.chmod(TEST_PATH, 0o640) + mock_chmod.assert_called_with(self.fs.files[TEST_PATH].filepath, 0o640) + + def test_chmod_file_not_found(self): + with self.assertRaises(FuseOSError) as context: + self.fs.chmod("/nonexistent", 0o644) + self.assertEqual(context.exception.errno, errno.ENOENT) + + @patch("os.chown") + def test_chown_success(self, mock_chown): + self.fs.create(TEST_PATH, 0o660) + self.fs.chown(TEST_PATH, self.current_uid, self.current_gid) + mock_chown.assert_called_once_with(self.fs.files[TEST_PATH].filepath, self.current_uid, self.current_gid) + + def test_chown_file_not_found(self): + with self.assertRaises(FuseOSError) as context: + self.fs.chown("/nonexistent", self.current_uid, self.current_gid) + self.assertEqual(context.exception.errno, errno.ENOENT) + + @patch("os.chown", side_effect=PermissionError) + def test_chown_permission_error(self, mock_chown): + self.fs.create(TEST_PATH, 0o660) + with self.assertRaises(PermissionError): + self.fs.chown(TEST_PATH, self.current_uid, self.current_gid) + + @patch('atexit.register') + @patch('argparse.ArgumentParser.parse_args') + @patch('globaleaks_eph_fs.subprocess.run') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.FUSE') + @patch('builtins.print') + def test_main_mount(self, mock_print, mock_FUSE, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): + with tempfile.TemporaryDirectory() as mount_point: + mock_parse_args.return_value = MagicMock( + mount_point=mount_point, + storage_directory=None, + unmount=False + ) + + original_mount_function = mount_globaleaks_eph_fs + + def side_effect_func(mount_point, storage_directory, flag): + return original_mount_function(mount_point, storage_directory, False) + + mock_mount.side_effect = side_effect_func + + main() + + mock_mount.assert_called_once_with(mount_point, None, True) + + mock_atexit_register.assert_called_once_with(unmount_if_mounted, mount_point) + + mock_subprocess.assert_called_once_with(['mount'], capture_output=True, text=True) + + @patch('atexit.register') + @patch('argparse.ArgumentParser.parse_args') + @patch('globaleaks_eph_fs.subprocess.run') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('globaleaks_eph_fs.FUSE') + @patch('builtins.print') + def test_main_with_mount_point_check(self, mock_print, mock_FUSE, mock_is_mount_point, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): + with tempfile.TemporaryDirectory() as mount_point: + mock_parse_args.return_value = MagicMock( + mount_point=mount_point, + storage_directory=None, + unmount=False + ) + + mock_is_mount_point.return_value = True + + original_mount_function = mount_globaleaks_eph_fs + + def side_effect_func(mount_point, storage_directory, flag): + return original_mount_function(mount_point, storage_directory, False) + + mock_mount.side_effect = side_effect_func + + main() + + mock_mount.assert_called_once_with(mount_point, None, True) + + mock_atexit_register.assert_called_once_with(unmount_if_mounted, mount_point) + + mock_subprocess.assert_called_once_with(['fusermount', '-u', mount_point]) + + @patch('atexit.register') + @patch('argparse.ArgumentParser.parse_args') + @patch('globaleaks_eph_fs.subprocess.run') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('globaleaks_eph_fs.FUSE') + @patch('builtins.print') + def test_main_keyboard_interrupt(self, mock_print, mock_FUSE, mock_is_mount_point, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): + with tempfile.TemporaryDirectory() as mount_point: + mock_parse_args.return_value = MagicMock( + mount_point=mount_point, + storage_directory=None, + unmount=False + ) + + mock_is_mount_point.return_value = False + + mock_mount.side_effect = KeyboardInterrupt + + with self.assertRaises(SystemExit): + main() + + mock_mount.assert_called_once_with(mount_point, None, True) + + mock_subprocess.assert_not_called() + + @patch('atexit.register') + @patch('argparse.ArgumentParser.parse_args') + @patch('globaleaks_eph_fs.subprocess.run') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('globaleaks_eph_fs.FUSE') + @patch('builtins.print') + def test_main_other_exception(self, mock_print, mock_FUSE, mock_is_mount_point, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): + with tempfile.TemporaryDirectory() as mount_point: + mock_parse_args.return_value = MagicMock( + mount_point=mount_point, + storage_directory=None, + unmount=False + ) + + mock_is_mount_point.return_value = False + + mock_mount.side_effect = Exception("Some unexpected error") + + with self.assertRaises(SystemExit): + main() - directory_contents = self.operations.readdir('/', None) - self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file2', 'file3'}) + mock_mount.assert_called_once_with(mount_point, None, True) + + mock_atexit_register.assert_not_called() - self.operations.unlink('/file2') - directory_contents = self.operations.readdir('/', None) - self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file3'}) + mock_subprocess.assert_not_called() if __name__ == '__main__': unittest.main()