From 1d00f4a3d26a831175a54534eae7c8ad960bd683 Mon Sep 17 00:00:00 2001 From: Giovanni Pellerano Date: Thu, 19 Dec 2024 22:18:45 +0100 Subject: [PATCH] Perform optimization rewrite and extend testing --- README.md | 21 +- src/globaleaks_eph_fs/__init__.py | 351 ++++++++++++++++-------------- tests/test.py | 317 ++++++++++++++++++++++----- 3 files changed, 461 insertions(+), 228 deletions(-) diff --git a/README.md b/README.md index 63da8d3..a80d233 100644 --- a/README.md +++ b/README.md @@ -26,33 +26,20 @@ globaleaks-eph-fs [--storage_directory ] - ``: The path where the filesystem will be mounted. - `--storage_directory` (optional): The directory used for storage. If not provided, a temporary directory will be used. +- `--unmount`, `-u` (optional): If this flag is specified, the program will attempt to unmount the filesystem from the specified ``. ### Python API You can also use `globaleaks-eph-fs` within your Python code. Here's an example: ```python -import argparse -from globaleaks_eph_fs import EphemeralFS +from globaleaks_eph_fs import mount_globaleaks_eph_fs -def main(): - parser = argparse.ArgumentParser(description="GLOBALEAKS EPHEMERAL FS") - parser.add_argument('mount_point', help="Path to mount the filesystem") - parser.add_argument('--storage_directory', '-s', help="Optional storage directory. Defaults to a temporary directory.") - args = parser.parse_args() +eph_fs_process = mount_globaleaks_eph_fs(/mnt/globaleaks-eph-fs) - EphemeralFS(args.mount_point, args.storage_directory, nothreads=True, foreground=True) - - -if __name__ == '__main__': - main() +eph_fs_process.wait() ``` -### Arguments - -- `mount_point` (required): The directory where the encrypted filesystem will be mounted. -- `--storage_directory` (optional): Specify a custom storage directory for the filesystem. If not provided, a temporary directory will be used. - ## Features - **ChaCha20 Encryption**: All data stored in the filesystem is encrypted with ChaCha20. diff --git a/src/globaleaks_eph_fs/__init__.py b/src/globaleaks_eph_fs/__init__.py index b6b69e6..3e2fd62 100644 --- a/src/globaleaks_eph_fs/__init__.py +++ b/src/globaleaks_eph_fs/__init__.py @@ -1,7 +1,10 @@ import argparse import errno import os +import re +import sys import stat +import subprocess import uuid import threading from fuse import FUSE, FuseOSError, Operations @@ -9,44 +12,84 @@ from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.ciphers.algorithms import ChaCha20 +CHUNK_SIZE = 4096 + +UUID4_PATTERN = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', re.IGNORECASE) + +def is_valid_uuid4(filename): + """ + Validates if the given filename follows the UUIDv4 format. + + :param filename: The name of the file. + :return: True if the filename is a valid UUIDv4, otherwise False. + """ + return bool(UUID4_PATTERN.match(filename)) + +def is_mount_point(path): + """ + Checks if the given path is a mount point. + + A mount point is a directory where a filesystem is attached. This function checks + if the provided path is currently being used as a mount point by querying the + system's mount information. + + :param path: The directory path to check if it is a mount point. + :return: True if the given path is a mount point, otherwise False. + :raises Exception: If there is an error while running the 'mount' command or parsing the result. + """ + try: + abs_path = os.path.abspath(path) + result = subprocess.run(['mount'], capture_output=True, text=True) + return any(abs_path in line for line in result.stdout.splitlines()) + except: + return False + class EphemeralFile: - def __init__(self, filesdir): + def __init__(self, filesdir, filename=None): """ Initializes an ephemeral file with ChaCha20 encryption. Creates a new random file path and generates a unique encryption key and nonce. :param filesdir: The directory where the ephemeral file will be stored. + :param filenames: Optional filename. If not provided, a UUID4 is used. """ - self.fd = None - self.filename = str(uuid.uuid4()) # UUID as a string for the file name - self.filepath = os.path.join(filesdir, self.filename) - self.nonce = os.urandom(16) # 128-bit nonce - self.key = os.urandom(32) # 256-bit key - self.cipher = Cipher(ChaCha20(self.key, self.nonce), mode=None) - self.size = 0 - self.position = 0 + filename = filename or str(uuid.uuid4()) # If filenames is None, generate a random UUID as a string + self.filepath = os.path.join(filesdir, filename) + self.cipher = Cipher(ChaCha20(os.urandom(32), uuid.UUID(filename).bytes[:16]), mode=None) self.enc = self.cipher.encryptor() - self.dec = None - self.creation_time = os.path.getctime(self.filepath) if os.path.exists(self.filepath) else None - self.last_access_time = None - self.last_mod_time = None - self.mutex = threading.Lock() + self.dec = self.cipher.decryptor() + + self.fd = None - def open(self, mode): + def __getattribute__(self, name): + """ + Intercepts attribute access for the `EphemeralFile` class. + + If the attribute being accessed is 'size', it returns the size of the file + by checking the file's attributes using os.stat. For all other attributes, + it defers to the default behavior of `__getattribute__`, allowing normal + attribute access. + + :param name: The name of the attribute being accessed. + :return: The value of the requested attribute. If 'size' is requested, + the size of the file is returned. Otherwise, the default + behavior for attribute access is used. + """ + if name == "size": + return os.stat(self.filepath).st_size + + # For everything else, defer to the default behavior + return super().__getattribute__(name) + + def open(self, flags, mode=0o660): """ Opens the ephemeral file for reading or writing. :param mode: 'w' for writing, 'r' for reading. :return: The file object. """ - with self.mutex: - if mode == 'w': - self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND) - self.dec = None - else: - self.fd = os.open(self.filepath, os.O_RDONLY) - self.dec = self.cipher.decryptor() - self.last_access_time = os.path.getatime(self.filepath) + self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND, mode) + os.chmod(self.filepath, mode) return self def write(self, data): @@ -55,14 +98,9 @@ def write(self, data): :param data: Data to write to the file, can be a string or bytes. """ - with self.mutex: - if isinstance(data, str): - data = data.encode('utf-8') - encrypted_data = self.enc.update(data) - os.write(self.fd, encrypted_data) - self.size += len(data) - self.position += len(data) - self.last_mod_time = os.path.getmtime(self.filepath) + if isinstance(data, str): + data = data.encode('utf-8') + os.write(self.fd, self.enc.update(data)) def read(self, size=None): """ @@ -71,27 +109,22 @@ def read(self, size=None): :param size: The number of bytes to read. If None, reads until the end of the file. :return: The decrypted data read from the file. """ - with self.mutex: - data = b"" - bytes_read = 0 - - while True: - # Determine how much to read in this chunk - chunk_size = min(4096, size - bytes_read) if size is not None else 4096 + data = b"" + bytes_read = 0 - chunk = os.read(self.fd, chunk_size) - if not chunk: # End of file - break + while True: + # Determine how much to read in this chunk + chunk_size = min(CHUNK_SIZE, size - bytes_read) if size is not None else CHUNK_SIZE - data += self.dec.update(chunk) - bytes_read += len(chunk) + chunk = os.read(self.fd, chunk_size) + if not chunk: # End of file + break - if size is not None and bytes_read >= size: - break + data += self.dec.update(chunk) + bytes_read += len(chunk) - # Update the last access time and position - self.last_access_time = os.path.getatime(self.filepath) - self.position += bytes_read # Update position after read + if size is not None and bytes_read >= size: + break return data @@ -101,34 +134,32 @@ def seek(self, offset): :param offset: The offset to seek to. """ - with self.mutex: - if self.position != offset: - if offset < self.position: - self.dec = self.cipher.decryptor() # Reuse the existing cipher instance - os.lseek(self.fd, 0, os.SEEK_SET) - self.position = 0 - - discard_size = offset - self.position - chunk_size = 4096 - while discard_size > 0: - to_read = min(discard_size, chunk_size) - self.dec.update(os.read(self.fd, to_read)) # Discard the data - discard_size -= to_read - - self.position = offset # Update position after seek + position = 0 + self.dec = self.cipher.decryptor() + self.enc = self.cipher.encryptor() + os.lseek(self.fd, 0, os.SEEK_SET) + discard_size = offset - position + while discard_size > 0: + to_read = min(CHUNK_SIZE, discard_size) + data = self.dec.update(os.read(self.fd, to_read)) + data = self.enc.update(data) + discard_size -= to_read def tell(self): - with self.mutex: - return self.position + """ + Returns the current position in the file. + + :return: The current position in the file. + """ + return os.lseek(self.fd, 0, os.SEEK_CUR) def close(self): """ Closes the file descriptor. """ - with self.mutex: - if self.fd is not None: - os.close(self.fd) - self.fd = None + if self.fd is not None: + os.close(self.fd) + self.fd = None def __enter__(self): """ @@ -148,7 +179,7 @@ def __del__(self): """ self.close() try: - os.remove(self.filepath) + os.unlink(self.filepath) except FileNotFoundError: pass @@ -159,15 +190,16 @@ def __init__(self, storage_directory=None): :param storage_directory: The directory to store the files. Defaults to a temporary directory. """ - if storage_directory is None: - storage_directory = mkdtemp() - self.storage_directory = storage_directory + self.storage_directory = storage_directory if storage_directory is not None else mkdtemp() self.files = {} # Track open files and their secure temporary file handlers - self.default_permissions = 0o660 # Default permissions for files (user read/write) - self.uid = os.getuid() # Current user's UID - self.gid = os.getgid() # Current user's GID self.mutex = threading.Lock() + def get_file(self, path): + file = self.files.get(path) + if file is None: + raise FuseOSError(errno.ENOENT) + return file + def getattr(self, path, fh=None): """ Retrieves file or directory attributes. @@ -178,21 +210,18 @@ def getattr(self, path, fh=None): """ with self.mutex: if path == '/': - st = {'st_mode': (stat.S_IFDIR | 0o750), 'st_nlink': 2} - else: - file = self.files.get(path) - if file is None or not os.path.exists(file.filepath): - raise OSError(errno.ENOENT, "No such file or directory", path) - st = { - 'st_mode': (stat.S_IFREG | 0o660), - 'st_size': file.size, - 'st_nlink': 1, - 'st_uid': self.uid, - 'st_gid': self.gid, - 'st_atime': file.last_access_time if file.last_access_time else os.path.getatime(file.filepath), - 'st_mtime': file.last_mod_time if file.last_mod_time else os.path.getmtime(file.filepath), - 'st_ctime': file.creation_time if file.creation_time else os.path.getctime(file.filepath), - } + return {'st_mode': (stat.S_IFDIR | 0o750), 'st_nlink': 2} + + file = self.get_file(path) + + file_stat = os.stat(file.filepath) + + st = {key: getattr(file_stat, key) for key in dir(file_stat) if not key.startswith('__')} + + st['st_mode'] |= stat.S_IFDIR if stat.S_ISDIR(file_stat.st_mode) else 0 + st['st_mode'] |= stat.S_IFREG if stat.S_ISREG(file_stat.st_mode) else 0 + st['st_mode'] |= stat.S_IFLNK if stat.S_ISLNK(file_stat.st_mode) else 0 + return st def readdir(self, path, fh=None): @@ -214,11 +243,13 @@ def create(self, path, mode): :param mode: The mode in which the file will be opened. :return: The file descriptor. """ + filename = os.path.basename(path) + if not is_valid_uuid4(filename): + raise FuseOSError(errno.ENOENT) + with self.mutex: - file = EphemeralFile(self.storage_directory) - file.open('w') - os.chmod(file.filepath, self.default_permissions) - os.chown(file.filepath, self.uid, self.gid) + file = EphemeralFile(self.storage_directory, filename) + file.open('w', mode) self.files[path] = file return file.fd @@ -231,11 +262,8 @@ def open(self, path, flags): :return: The file descriptor. """ with self.mutex: - file = self.files.get(path) - if path not in self.files: - raise FuseOSError(errno.ENOENT) - mode = 'w' if (flags & os.O_RDWR or flags & os.O_WRONLY) else 'r' - file.open(mode) + file = self.get_file(path) + file.open('w' if (flags & os.O_RDWR or flags & os.O_WRONLY) else 'r') return file.fd def read(self, path, size, offset, fh=None): @@ -249,9 +277,7 @@ def read(self, path, size, offset, fh=None): :return: The data read from the file. """ with self.mutex: - file = self.files.get(path) - if file is None: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) file.seek(offset) return file.read(size) @@ -266,7 +292,7 @@ def write(self, path, data, offset, fh=None): :return: The number of bytes written. """ with self.mutex: - file = self.files.get(path) + file = self.get_file(path) file.write(data) return len(data) @@ -280,7 +306,7 @@ def unlink(self, path): file = self.files.pop(path, None) if file: file.close() - os.remove(file.filepath) + os.unlink(file.filepath) def release(self, path, fh=None): """ @@ -290,9 +316,7 @@ def release(self, path, fh=None): :param fh: File handle (not used here). """ with self.mutex: - file = self.files.get(path) - if file: - file.close() + self.get_file(path).close() def truncate(self, path, length, fh=None): """ @@ -305,70 +329,64 @@ def truncate(self, path, length, fh=None): :param length: The new size of the file. """ with self.mutex: - original_file = self.files.get(path) - if original_file is None: - raise FuseOSError(errno.ENOENT) - - if length < original_file.size: - original_file.open('r') + file = self.get_file(path) - truncated_file = EphemeralFile(self.storage_directory) - truncated_file.open('w') - os.chmod(truncated_file.filepath, self.default_permissions) - os.chown(truncated_file.filepath, self.uid, self.gid) + if length < file.size: + os.truncate(file.filepath, length) - truncated_file.open('w') + file.seek(length) + if length > file.size: + length = length - file.size bytes_written = 0 - chunk_size = 4096 while bytes_written < length: - remaining = length - bytes_written - to_read = min(chunk_size, remaining) - truncated_file.write(original_file.read(to_read)) - bytes_written += to_read - - del original_file - self.files[path] = truncated_file - elif length > original_file.size: - length = length - original_file.size - original_file.open('w') - bytes_written = 0 - chunk_size = 4096 - while bytes_written < length: - remaining = length - bytes_written - to_write = min(chunk_size, remaining) - original_file.write(b'\0' * to_write) + to_write = min(CHUNK_SIZE, length - bytes_written) + file.write(b'\0' * to_write) bytes_written += to_write -class EphemeralFS(FUSE): - """ - A class that mounts an ephemeral filesystem at the given mount point. - Inherits from FUSE to provide the filesystem mounting functionality. + def chmod(self, path, mode): + """ + Changes the permissions of the file at the specified path. - Args: - mount_point (str): The path where the filesystem will be mounted. - storage_directory (str, optional): The directory used for storage. If None, uses a temporary directory. - **fuse_args: Additional arguments to pass to the FUSE constructor. - """ - def __init__(self, mount_point, storage_directory=None, **fuse_args): + :param path: The file path whose permissions will be changed. + :param mode: The new permissions mode (e.g., 0o777 for full permissions). + :raises FuseOSError: If the file does not exist. """ - Initializes and mounts the ephemeral filesystem. + file = self.get_file(path) + return os.chmod(file.filepath, mode) - :param mount_point: The path where the filesystem will be mounted. - :param storage_directory: The directory to store the files (optional). + def chown(self, path, uid, gid): """ - self.mount_point = mount_point - self.storage_directory = storage_directory + Changes the ownership of the file at the specified path. + + :param path: The file path whose ownership will be changed. + :param uid: The user ID (uid) to set as the new owner. + :param gid: The group ID (gid) to set as the new group owner. + :raises FuseOSError: If the file does not exist. + """ + file = self.get_file(path) + return os.chown(file.filepath, uid, gid) + +def mount_globaleaks_eph_fs(mount_point, storage_directory=None, foreground=False): + """ + Initializes and mounts the ephemeral filesystem. + + :param mount_point: The path where the filesystem will be mounted. + :param storage_directory: The directory to store the files (optional). + :return: A `FUSE` object that represents the mounted filesystem. + """ + # Create the mount point directory if it does not exist + os.makedirs(mount_point) - # Create the mount point directory if it does not exist - os.makedirs(self.mount_point, exist_ok=True) + # If a storage directory is specified, create it as well + if storage_directory: + os.makedirs(storage_directory) - # If a storage directory is specified, create it as well - if self.storage_directory: - os.makedirs(self.storage_directory, exist_ok=True) + code = f"""from fuse import FUSE\n""" + \ + f"""from globaleaks_eph_fs import EphemeralOperations\n""" + \ + f"""FUSE(EphemeralOperations('{storage_directory}'), '{mount_point}', foreground={foreground})""" - # Initialize the FUSE mount with the EphemeralFS - super().__init__(EphemeralOperations(self.storage_directory), self.mount_point, **fuse_args) + return subprocess.Popen([sys.executable, '-c', code]) def main(): """ @@ -377,8 +395,23 @@ def main(): parser = argparse.ArgumentParser(description="GLOBALEAKS EPH FS") parser.add_argument('mount_point', help="Path to mount the filesystem") parser.add_argument('--storage_directory', '-s', help="Optional storage directory. Defaults to a temporary directory.") + parser.add_argument('--unmount', '-u', action='store_true', help="Unmount the filesystem if it's mounted.") args = parser.parse_args() - EphemeralFS(args.mount_point, args.storage_directory, foreground=True) + + if is_mount_point(args.mount_point): + os.system(f"fusermount -u {args.mount_point}") + + if args.unmount: + return + + try: + print(f"Mounting GLOBALEAKS EPH FS at {args.mount_point}") + mount_globaleaks_eph_fs(args.mount_point, args.storage_directory, True).wait() + + except KeyboardInterrupt: + sys.exit(0) + except: + sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/test.py b/tests/test.py index 4b92574..6c34838 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,15 +1,26 @@ import errno import os import shutil +import sys import stat +import tempfile import unittest +import uuid + +from contextlib import redirect_stdout, redirect_stderr +from fuse import FUSE, FuseOSError from tempfile import mkdtemp -from fuse import FuseOSError -from globaleaks_eph_fs import EphemeralFile, EphemeralOperations +from unittest.mock import patch, MagicMock + +from globaleaks_eph_fs import EphemeralFile, EphemeralOperations, mount_globaleaks_eph_fs, is_mount_point, main -TEST_PATH = 'TESTFILE.TXT' + +TEST_PATH = str(uuid.uuid4()) TEST_DATA = b"Hello, world! This is a test data for writing, seeking and reading operations." +ORIGINAL_SIZE = len(TEST_DATA) +EXTENDED_SIZE = ORIGINAL_SIZE*2 +REDUCED_SIZE = ORIGINAL_SIZE//2 class TestEphemeralFile(unittest.TestCase): def setUp(self): @@ -46,116 +57,318 @@ def test_encryption_and_decryption(self): self.assertEqual(read_data, expected) # Verify the data matches the expected value def test_file_cleanup(self): - TEST_PATH = self.ephemeral_file.filepath + path_copy = self.ephemeral_file.filepath del self.ephemeral_file - self.assertFalse(os.path.exists(TEST_PATH)) + self.assertFalse(os.path.exists(path_copy)) class TestEphemeralOperations(unittest.TestCase): def setUp(self): self.storage_dir = mkdtemp() - self.operations = EphemeralOperations(self.storage_dir) + self.fs = EphemeralOperations(self.storage_dir) + + # Get current user's UID and GID + self.current_uid = os.getuid() + self.current_gid = os.getgid() def tearDown(self): - for file in self.operations.files.values(): + for file in self.fs.files.values(): os.remove(file.filepath) os.rmdir(self.storage_dir) def test_create_file(self): - self.operations.create(TEST_PATH, 0o660) - self.assertIn(TEST_PATH, self.operations.files) + self.fs.create(TEST_PATH, 0o660) + self.assertIn(TEST_PATH, self.fs.files) + + def test_create_file_with_arbitrary_name(self): + with self.assertRaises(FuseOSError) as context: + self.fs.create('/arbitraryname', os.O_RDONLY) + self.assertEqual(context.exception.errno, errno.ENOENT) def test_open_existing_file(self): - self.operations.create(TEST_PATH, 0o660) - self.operations.open(TEST_PATH, os.O_RDONLY) + self.fs.create(TEST_PATH, 0o660) + self.fs.open(TEST_PATH, os.O_RDONLY) def test_write_and_read_file(self): - self.operations.create(TEST_PATH, 0o660) + self.fs.create(TEST_PATH, 0o660) - self.operations.open(TEST_PATH, os.O_RDWR) - self.operations.write(TEST_PATH, TEST_DATA, 0, None) + self.fs.open(TEST_PATH, os.O_RDWR) + self.fs.write(TEST_PATH, TEST_DATA, 0, None) - self.operations.release(TEST_PATH, None) + self.fs.release(TEST_PATH, None) - self.operations.open(TEST_PATH, os.O_RDONLY) + self.fs.open(TEST_PATH, os.O_RDONLY) - read_data = self.operations.read(TEST_PATH, len(TEST_DATA), 0, None) + read_data = self.fs.read(TEST_PATH, len(TEST_DATA), 0, None) self.assertEqual(read_data, TEST_DATA) - self.operations.release(TEST_PATH, None) + self.fs.release(TEST_PATH, None) def test_unlink_file(self): - self.operations.create(TEST_PATH, 0o660) - self.assertIn(TEST_PATH, self.operations.files) + self.fs.create(TEST_PATH, 0o660) + self.assertIn(TEST_PATH, self.fs.files) - self.operations.unlink(TEST_PATH) - self.assertNotIn(TEST_PATH, self.operations.files) + self.fs.unlink(TEST_PATH) + self.assertNotIn(TEST_PATH, self.fs.files) def test_file_not_found(self): with self.assertRaises(FuseOSError) as context: - self.operations.open('/nonexistentfile', os.O_RDONLY) + self.fs.open('/nonexistentfile', os.O_RDONLY) self.assertEqual(context.exception.errno, errno.ENOENT) def test_getattr_root(self): - attr = self.operations.getattr('/') - self.assertEqual(attr['st_mode'], stat.S_IFDIR | 0o750) + attr = self.fs.getattr('/') + self.assertEqual(stat.S_IFMT(attr['st_mode']), stat.S_IFDIR) + self.assertEqual(attr['st_mode'] & 0o777, 0o750) self.assertEqual(attr['st_nlink'], 2) def test_getattr_file(self): - self.operations.create(TEST_PATH, mode=0o660) + self.fs.create(TEST_PATH, mode=0o660) - attr = self.operations.getattr(TEST_PATH) + attr = self.fs.getattr(TEST_PATH) - self.assertEqual(attr['st_mode'], stat.S_IFREG | 0o660) + self.assertEqual(stat.S_IFMT(attr['st_mode']), stat.S_IFREG) + self.assertEqual(attr['st_mode'] & 0o777, 0o660) self.assertEqual(attr['st_size'], 0) self.assertEqual(attr['st_nlink'], 1) self.assertEqual(attr['st_uid'], os.getuid()) self.assertEqual(attr['st_gid'], os.getgid()) - self.assertIn('st_atime', attr) self.assertIn('st_mtime', attr) self.assertIn('st_ctime', attr) def test_getattr_nonexistent(self): with self.assertRaises(OSError) as _: - self.operations.getattr('/nonexistent') + self.fs.getattr('/nonexistent') def test_truncate(self): - ORIGINAL_SIZE =len(TEST_DATA) - REDUCED_SIZE = len(TEST_DATA)//2 + self.fs.create(TEST_PATH, 0o660) + self.fs.write(TEST_PATH, TEST_DATA, 0, None) - self.operations.create(TEST_PATH, 0o660) - self.operations.write(TEST_PATH, TEST_DATA, 0, None) - - self.operations.truncate(TEST_PATH, REDUCED_SIZE, None) - file_content = self.operations.read(TEST_PATH, ORIGINAL_SIZE, 0, None) + self.fs.truncate(TEST_PATH, REDUCED_SIZE, None) + file_content = self.fs.read(TEST_PATH, ORIGINAL_SIZE, 0, None) self.assertEqual(len(file_content), REDUCED_SIZE) self.assertEqual(file_content, TEST_DATA[:REDUCED_SIZE]) def test_extend(self): - ORIGINAL_SIZE =len(TEST_DATA) - EXTENDED_SIZE = len(TEST_DATA)*2 - - self.operations.create(TEST_PATH, 0o660) - self.operations.write(TEST_PATH, TEST_DATA, 0, None) + self.fs.create(TEST_PATH, 0o660) + self.fs.write(TEST_PATH, TEST_DATA, 0, None) - self.operations.truncate(TEST_PATH, EXTENDED_SIZE, None) - file_content = self.operations.read(TEST_PATH, EXTENDED_SIZE * 2, 0, None) + self.fs.truncate(TEST_PATH, EXTENDED_SIZE, None) + file_content = self.fs.read(TEST_PATH, EXTENDED_SIZE * 2, 0, None) self.assertEqual(file_content[:ORIGINAL_SIZE], TEST_DATA) self.assertEqual(len(file_content), EXTENDED_SIZE) self.assertTrue(all(byte == 0 for byte in file_content[ORIGINAL_SIZE:])) def test_readdir(self): - file_names = ['/file1', '/file2', '/file3'] - for file_name in file_names: - self.operations.create(file_name, 0o660) + file_names = [] + for _ in range(3): + file_names.append(str(uuid.uuid4())) + self.fs.create(file_names[-1], 0o660) + + directory_contents = self.fs.readdir('/', None) + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[1], file_names[2]}) + + self.fs.unlink(file_names[1]) + directory_contents = self.fs.readdir('/', None) + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[2]}) + + @patch("os.chmod") + def test_chmod_success(self, mock_chmod): + self.fs.create(TEST_PATH, 0o660) + mock_chmod.assert_called_with(self.fs.files[TEST_PATH].filepath, 0o660) + self.fs.chmod(TEST_PATH, 0o640) + mock_chmod.assert_called_with(self.fs.files[TEST_PATH].filepath, 0o640) + + def test_chmod_file_not_found(self): + with self.assertRaises(FuseOSError) as context: + self.fs.chmod("/nonexistent", 0o644) + self.assertEqual(context.exception.errno, errno.ENOENT) - directory_contents = self.operations.readdir('/', None) - self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file2', 'file3'}) + @patch("os.chown") + def test_chown_success(self, mock_chown): + self.fs.create(TEST_PATH, 0o660) + self.fs.chown(TEST_PATH, self.current_uid, self.current_gid) + mock_chown.assert_called_once_with(self.fs.files[TEST_PATH].filepath, self.current_uid, self.current_gid) + + def test_chown_file_not_found(self): + with self.assertRaises(FuseOSError) as context: + self.fs.chown("/nonexistent", self.current_uid, self.current_gid) + self.assertEqual(context.exception.errno, errno.ENOENT) - self.operations.unlink('/file2') - directory_contents = self.operations.readdir('/', None) - self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file3'}) + @patch("os.chown", side_effect=PermissionError) + def test_chown_permission_error(self, mock_chown): + self.fs.create(TEST_PATH, 0o660) + with self.assertRaises(PermissionError): + self.fs.chown(TEST_PATH, self.current_uid, self.current_gid) + + @patch('os.makedirs') + @patch('subprocess.Popen') + def test_mount_globaleaks_eph_fs_with_storage_directory(self, mock_popen, mock_makedirs): + mount_point = '/mnt/test' + storage_directory = '/tmp/storage' + foreground = True + + # Call the function + mount_globaleaks_eph_fs(mount_point, storage_directory, foreground) + + # Assert that os.makedirs was called to create the mount point + mock_makedirs.assert_any_call(mount_point) + + # Assert that os.makedirs was called to create the storage directory + mock_makedirs.assert_any_call(storage_directory) + + # Assert that subprocess.Popen was called to run the FUSE command + mock_popen.assert_called_once_with([ + sys.executable, + '-c', + f"from fuse import FUSE\n" + f"from globaleaks_eph_fs import EphemeralOperations\n" + f"FUSE(EphemeralOperations('{storage_directory}'), '{mount_point}', foreground={foreground})" + ]) + + @patch('os.makedirs') + @patch('subprocess.Popen') + def test_mount_globaleaks_eph_fs_without_storage_directory(self, mock_popen, mock_makedirs): + # Define test parameters + mount_point = '/mnt/test' + storage_directory = None + foreground = False + + # Call the function + mount_globaleaks_eph_fs(mount_point, storage_directory, foreground) + + # Assert that os.makedirs was called to create the mount point + mock_makedirs.assert_called_with(mount_point) + + # Assert that subprocess.Popen was called with the correct arguments + mock_popen.assert_called_once_with([ + sys.executable, + '-c', + f"from fuse import FUSE\n" + f"from globaleaks_eph_fs import EphemeralOperations\n" + f"FUSE(EphemeralOperations('None'), '{mount_point}', foreground={foreground})" + ]) + +class TestMainFunction(unittest.TestCase): + @patch('argparse.ArgumentParser.parse_args') + @patch('os.system') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('builtins.print') + def test_main_mount(self, mock_print, mock_is_mount_point, mock_mount, mock_system, mock_parse_args): + # Set up the mock return values + mock_parse_args.return_value = MagicMock( + mount_point='/mnt/test', + storage_directory=None, + unmount=False + ) + mock_is_mount_point.return_value = False # Simulate that the mount point is not already mounted + + # Call the main function + main() + + # Check that the mount command was called with the correct parameters + mock_mount.assert_called_once_with('/mnt/test', None, True) + mock_mount.return_value.wait.assert_called_once() # Ensure .wait() was called + + # Check that os.system was NOT called (since we are not unmounting) + mock_system.assert_not_called() + + @patch('argparse.ArgumentParser.parse_args') + @patch('os.system') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('builtins.print') + def test_main_unmount(self, mock_print, mock_is_mount_point, mock_mount, mock_system, mock_parse_args): + # Set up the mock return values for unmounting + mock_parse_args.return_value = MagicMock( + mount_point='/mnt/test', + storage_directory=None, + unmount=True + ) + mock_is_mount_point.return_value = True + + # Call the main function + main() + + # Check that os.system was called to unmount + mock_system.assert_called_once_with("fusermount -u /mnt/test") + + # Ensure the mount function was not called since we're unmounting + mock_mount.assert_not_called() + + @patch('argparse.ArgumentParser.parse_args') + @patch('os.system') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('builtins.print') + def test_main_with_mount_point_check(self, mock_print, mock_is_mount_point, mock_mount, mock_system, mock_parse_args): + # Set up the mock return values for when the mount point is already mounted + mock_parse_args.return_value = MagicMock( + mount_point='/mnt/test', + storage_directory=None, + unmount=False + ) + mock_is_mount_point.return_value = True # Simulate that the mount point is already mounted + + # Call the main function + main() + + # Ensure fusermount is called to unmount the filesystem before mounting + mock_system.assert_called_once_with("fusermount -u /mnt/test") + + # Check that the mount command was called + mock_mount.assert_called_once_with('/mnt/test', None, True) + mock_mount.return_value.wait.assert_called_once() # Ensure .wait() was called + + @patch('argparse.ArgumentParser.parse_args') + @patch('os.system') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('builtins.print') + def test_main_keyboard_interrupt(self, mock_print, mock_is_mount_point, mock_mount, mock_system, mock_parse_args): + # Set up the mock return values for mounting + mock_parse_args.return_value = MagicMock( + mount_point='/mnt/test', + storage_directory=None, + unmount=False + ) + mock_is_mount_point.return_value = False + + # Simulate a KeyboardInterrupt during the mounting process + mock_mount.side_effect = KeyboardInterrupt + + # Use self.assertRaises to test for the expected exit + with self.assertRaises(SystemExit): + main() + + # Check that sys.exit(0) was called due to the keyboard interrupt + mock_mount.assert_called_once_with('/mnt/test', None, True) + + @patch('argparse.ArgumentParser.parse_args') + @patch('os.system') + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') + @patch('globaleaks_eph_fs.is_mount_point') + @patch('builtins.print') + def test_main_other_exception(self, mock_print, mock_is_mount_point, mock_mount, mock_system, mock_parse_args): + # Set up the mock return values for when an exception occurs + mock_parse_args.return_value = MagicMock( + mount_point='/mnt/test', + storage_directory=None, + unmount=False + ) + mock_is_mount_point.return_value = False + + # Simulate an unexpected exception during the mounting process + mock_mount.side_effect = Exception("Some unexpected error") + + # Use self.assertRaises to test for the expected exit + with self.assertRaises(SystemExit): + main() + + # Ensure that the error caused sys.exit(1) + mock_mount.assert_called_once_with('/mnt/test', None, True) if __name__ == '__main__': unittest.main()