diff --git a/src/globaleaks_eph_fs/__init__.py b/src/globaleaks_eph_fs/__init__.py index 769c884..82c1708 100644 --- a/src/globaleaks_eph_fs/__init__.py +++ b/src/globaleaks_eph_fs/__init__.py @@ -1,6 +1,7 @@ import argparse import errno import os +import re import stat import uuid import threading @@ -11,26 +12,50 @@ CHUNK_SIZE = 4096 +UUID4_PATTERN = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', re.IGNORECASE) + + +def is_valid_uuid4(filename): + """ + Validates if the given filename follows the UUIDv4 format. + + :param filename: The name of the file. + :return: True if the filename is a valid UUIDv4, otherwise False. + """ + return bool(UUID4_PATTERN.match(filename)) + + class EphemeralFile: - def __init__(self, filesdir): + def __init__(self, filesdir, filename=None): """ Initializes an ephemeral file with ChaCha20 encryption. Creates a new random file path and generates a unique encryption key and nonce. :param filesdir: The directory where the ephemeral file will be stored. + :param filenames: Optional filename. If not provided, a UUID4 is used. """ - self.fd = None - self.filename = str(uuid.uuid4()) # UUID as a string for the file name - self.filepath = os.path.join(filesdir, self.filename) - self.nonce = os.urandom(16) # 128-bit nonce - self.key = os.urandom(32) # 256-bit key - self.cipher = Cipher(ChaCha20(self.key, self.nonce), mode=None) - self.position = 0 + filename = filename or str(uuid.uuid4()) # If filenames is None, generate a random UUID as a string + self.filepath = os.path.join(filesdir, filename) + self.cipher = Cipher(ChaCha20(os.urandom(32), uuid.UUID(filename).bytes[:16]), mode=None) self.enc = self.cipher.encryptor() self.dec = self.cipher.decryptor() - self.mutex = threading.Lock() + + self.fd = None def __getattribute__(self, name): + """ + Intercepts attribute access for the `EphemeralFile` class. + + If the attribute being accessed is 'size', it returns the size of the file + by checking the file's attributes using os.stat. For all other attributes, + it defers to the default behavior of `__getattribute__`, allowing normal + attribute access. + + :param name: The name of the attribute being accessed. + :return: The value of the requested attribute. If 'size' is requested, + the size of the file is returned. Otherwise, the default + behavior for attribute access is used. + """ if name == "size": return os.stat(self.filepath).st_size @@ -44,9 +69,8 @@ def open(self, flags, mode=0o660): :param mode: 'w' for writing, 'r' for reading. :return: The file object. """ - with self.mutex: - self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND, mode) - os.chmod(self.filepath, mode) + self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND, mode) + os.chmod(self.filepath, mode) return self def write(self, data): @@ -55,11 +79,9 @@ def write(self, data): :param data: Data to write to the file, can be a string or bytes. """ - with self.mutex: - if isinstance(data, str): - data = data.encode('utf-8') - os.write(self.fd, self.enc.update(data)) - self.position += len(data) + if isinstance(data, str): + data = data.encode('utf-8') + os.write(self.fd, self.enc.update(data)) def read(self, size=None): """ @@ -68,25 +90,22 @@ def read(self, size=None): :param size: The number of bytes to read. If None, reads until the end of the file. :return: The decrypted data read from the file. """ - with self.mutex: - data = b"" - bytes_read = 0 - - while True: - # Determine how much to read in this chunk - chunk_size = min(CHUNK_SIZE, size - bytes_read) if size is not None else CHUNK_SIZE + data = b"" + bytes_read = 0 - chunk = os.read(self.fd, chunk_size) - if not chunk: # End of file - break + while True: + # Determine how much to read in this chunk + chunk_size = min(CHUNK_SIZE, size - bytes_read) if size is not None else CHUNK_SIZE - data += self.dec.update(chunk) - bytes_read += len(chunk) + chunk = os.read(self.fd, chunk_size) + if not chunk: # End of file + break - if size is not None and bytes_read >= size: - break + data += self.dec.update(chunk) + bytes_read += len(chunk) - self.position += bytes_read # Update position after read + if size is not None and bytes_read >= size: + break return data @@ -96,32 +115,32 @@ def seek(self, offset): :param offset: The offset to seek to. """ - with self.mutex: - if self.position != offset: - if offset < self.position: - self.dec = self.cipher.decryptor() # Reuse the existing cipher instance - os.lseek(self.fd, 0, os.SEEK_SET) - self.position = 0 - - discard_size = offset - self.position - while discard_size > 0: - to_read = min(CHUNK_SIZE, discard_size) - self.dec.update(os.read(self.fd, to_read)) # Discard the data - discard_size -= to_read - - self.position = offset # Update position after seek + position = 0 + self.dec = self.cipher.decryptor() + self.enc = self.cipher.encryptor() + os.lseek(self.fd, 0, os.SEEK_SET) + discard_size = offset - position + while discard_size > 0: + to_read = min(CHUNK_SIZE, discard_size) + data = self.dec.update(os.read(self.fd, to_read)) + data = self.enc.update(data) + discard_size -= to_read def tell(self): - return self.position + """ + Returns the current position in the file. + + :return: The current position in the file. + """ + return os.lseek(self.fd, 0, os.SEEK_CUR) def close(self): """ Closes the file descriptor. """ - with self.mutex: - if self.fd is not None: - os.close(self.fd) - self.fd = None + if self.fd is not None: + os.close(self.fd) + self.fd = None def __enter__(self): """ @@ -152,12 +171,16 @@ def __init__(self, storage_directory=None): :param storage_directory: The directory to store the files. Defaults to a temporary directory. """ - if storage_directory is None: - storage_directory = mkdtemp() - self.storage_directory = storage_directory + self.storage_directory = storage_directory if storage_directory is not None else mkdtemp() self.files = {} # Track open files and their secure temporary file handlers self.mutex = threading.Lock() + def get_file(self, path): + file = self.files.get(path) + if file is None: + raise FuseOSError(errno.ENOENT) + return file + def getattr(self, path, fh=None): """ Retrieves file or directory attributes. @@ -170,9 +193,7 @@ def getattr(self, path, fh=None): if path == '/': return {'st_mode': (stat.S_IFDIR | 0o750), 'st_nlink': 2} - file = self.files.get(path) - if file is None or not os.path.exists(file.filepath): - raise OSError(errno.ENOENT, "No such file or directory", path) + file = self.get_file(path) file_stat = os.stat(file.filepath) @@ -206,6 +227,10 @@ def create(self, path, mode): :param mode: The mode in which the file will be opened. :return: The file descriptor. """ + filename = os.path.basename(path) + if not is_valid_uuid4(filename): + raise FuseOSError(errno.ENOENT) + with self.mutex: file = EphemeralFile(self.storage_directory) file.open('w', mode) @@ -221,9 +246,7 @@ def open(self, path, flags): :return: The file descriptor. """ with self.mutex: - file = self.files.get(path) - if path not in self.files: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) file.open('w' if (flags & os.O_RDWR or flags & os.O_WRONLY) else 'r') return file.fd @@ -238,9 +261,7 @@ def read(self, path, size, offset, fh=None): :return: The data read from the file. """ with self.mutex: - file = self.files.get(path) - if file is None: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) file.seek(offset) return file.read(size) @@ -255,7 +276,7 @@ def write(self, path, data, offset, fh=None): :return: The number of bytes written. """ with self.mutex: - file = self.files.get(path) + file = self.get_file(path) file.write(data) return len(data) @@ -279,9 +300,7 @@ def release(self, path, fh=None): :param fh: File handle (not used here). """ with self.mutex: - file = self.files.get(path) - if file: - file.close() + self.get_file(path).close() def truncate(self, path, length, fh=None): """ @@ -294,32 +313,19 @@ def truncate(self, path, length, fh=None): :param length: The new size of the file. """ with self.mutex: - original_file = self.files.get(path) - if original_file is None: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) - if length < original_file.size: - original_file.open('r') + if length < file.size: + os.truncate(file.filepath, length) - truncated_file = EphemeralFile(self.storage_directory) - truncated_file.open('w') + file.seek(length) + if length > file.size: + length = length - file.size bytes_written = 0 while bytes_written < length: - remaining = length - bytes_written - to_read = min(CHUNK_SIZE, remaining) - truncated_file.write(original_file.read(to_read)) - bytes_written += to_read - - del original_file - self.files[path] = truncated_file - elif length > original_file.size: - length = length - original_file.size - bytes_written = 0 - while bytes_written < length: - remaining = length - bytes_written - to_write = min(CHUNK_SIZE, remaining) - original_file.write(b'\0' * to_write) + to_write = min(CHUNK_SIZE, length - bytes_written) + file.write(b'\0' * to_write) bytes_written += to_write def chmod(self, path, mode): @@ -330,9 +336,7 @@ def chmod(self, path, mode): :param mode: The new permissions mode (e.g., 0o777 for full permissions). :raises FuseOSError: If the file does not exist. """ - file = self.files.get(path) - if not file: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) return os.chmod(file.filepath, mode) def chown(self, path, uid, gid): @@ -344,9 +348,7 @@ def chown(self, path, uid, gid): :param gid: The group ID (gid) to set as the new group owner. :raises FuseOSError: If the file does not exist. """ - file = self.files.get(path) - if not file: - raise FuseOSError(errno.ENOENT) + file = self.get_file(path) return os.chown(file.filepath, uid, gid) class EphemeralFS(FUSE): diff --git a/tests/test.py b/tests/test.py index 11358c2..22ec891 100644 --- a/tests/test.py +++ b/tests/test.py @@ -4,6 +4,7 @@ import stat import tempfile import unittest +import uuid from contextlib import redirect_stdout, redirect_stderr from fuse import FUSE, FuseOSError @@ -12,7 +13,7 @@ from globaleaks_eph_fs import EphemeralFS, EphemeralFile, EphemeralOperations -TEST_PATH = 'TESTFILE.TXT' +TEST_PATH = str(uuid.uuid4()) TEST_DATA = b"Hello, world! This is a test data for writing, seeking and reading operations." ORIGINAL_SIZE = len(TEST_DATA) @@ -153,16 +154,17 @@ def test_extend(self): self.assertTrue(all(byte == 0 for byte in file_content[ORIGINAL_SIZE:])) def test_readdir(self): - file_names = ['/file1', '/file2', '/file3'] - for file_name in file_names: - self.fs.create(file_name, 0o660) + file_names = [] + for _ in range(3): + file_names.append(str(uuid.uuid4())) + self.fs.create(file_names[-1], 0o660) directory_contents = self.fs.readdir('/', None) - self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file2', 'file3'}) + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[1], file_names[2]}) - self.fs.unlink('/file2') + self.fs.unlink(file_names[1]) directory_contents = self.fs.readdir('/', None) - self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file3'}) + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[2]}) @patch("os.chmod") def test_chmod_success(self, mock_chmod):