Skip to content

Commit

Permalink
Simplify code remove duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
evilaliv3 committed Dec 27, 2024
1 parent a18384f commit 92661e8
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 101 deletions.
190 changes: 96 additions & 94 deletions src/globaleaks_eph_fs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
import errno
import os
import re
import stat
import uuid
import threading
Expand All @@ -11,26 +12,50 @@

CHUNK_SIZE = 4096

UUID4_PATTERN = re.compile(r'^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$', re.IGNORECASE)


def is_valid_uuid4(filename):
"""
Validates if the given filename follows the UUIDv4 format.
:param filename: The name of the file.
:return: True if the filename is a valid UUIDv4, otherwise False.
"""
return bool(UUID4_PATTERN.match(filename))


class EphemeralFile:
def __init__(self, filesdir):
def __init__(self, filesdir, filename=None):
"""
Initializes an ephemeral file with ChaCha20 encryption.
Creates a new random file path and generates a unique encryption key and nonce.
:param filesdir: The directory where the ephemeral file will be stored.
:param filenames: Optional filename. If not provided, a UUID4 is used.
"""
self.fd = None
self.filename = str(uuid.uuid4()) # UUID as a string for the file name
self.filepath = os.path.join(filesdir, self.filename)
self.nonce = os.urandom(16) # 128-bit nonce
self.key = os.urandom(32) # 256-bit key
self.cipher = Cipher(ChaCha20(self.key, self.nonce), mode=None)
self.position = 0
filename = filename or str(uuid.uuid4()) # If filenames is None, generate a random UUID as a string
self.filepath = os.path.join(filesdir, filename)
self.cipher = Cipher(ChaCha20(os.urandom(32), uuid.UUID(filename).bytes[:16]), mode=None)
self.enc = self.cipher.encryptor()
self.dec = self.cipher.decryptor()
self.mutex = threading.Lock()

self.fd = None

def __getattribute__(self, name):
"""
Intercepts attribute access for the `EphemeralFile` class.
If the attribute being accessed is 'size', it returns the size of the file
by checking the file's attributes using os.stat. For all other attributes,
it defers to the default behavior of `__getattribute__`, allowing normal
attribute access.
:param name: The name of the attribute being accessed.
:return: The value of the requested attribute. If 'size' is requested,
the size of the file is returned. Otherwise, the default
behavior for attribute access is used.
"""
if name == "size":
return os.stat(self.filepath).st_size

Expand All @@ -44,9 +69,8 @@ def open(self, flags, mode=0o660):
:param mode: 'w' for writing, 'r' for reading.
:return: The file object.
"""
with self.mutex:
self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND, mode)
os.chmod(self.filepath, mode)
self.fd = os.open(self.filepath, os.O_RDWR | os.O_CREAT | os.O_APPEND, mode)
os.chmod(self.filepath, mode)
return self

def write(self, data):
Expand All @@ -55,11 +79,9 @@ def write(self, data):
:param data: Data to write to the file, can be a string or bytes.
"""
with self.mutex:
if isinstance(data, str):
data = data.encode('utf-8')
os.write(self.fd, self.enc.update(data))
self.position += len(data)
if isinstance(data, str):
data = data.encode('utf-8')
os.write(self.fd, self.enc.update(data))

def read(self, size=None):
"""
Expand All @@ -68,25 +90,22 @@ def read(self, size=None):
:param size: The number of bytes to read. If None, reads until the end of the file.
:return: The decrypted data read from the file.
"""
with self.mutex:
data = b""
bytes_read = 0

while True:
# Determine how much to read in this chunk
chunk_size = min(CHUNK_SIZE, size - bytes_read) if size is not None else CHUNK_SIZE
data = b""
bytes_read = 0

chunk = os.read(self.fd, chunk_size)
if not chunk: # End of file
break
while True:
# Determine how much to read in this chunk
chunk_size = min(CHUNK_SIZE, size - bytes_read) if size is not None else CHUNK_SIZE

data += self.dec.update(chunk)
bytes_read += len(chunk)
chunk = os.read(self.fd, chunk_size)
if not chunk: # End of file
break

if size is not None and bytes_read >= size:
break
data += self.dec.update(chunk)
bytes_read += len(chunk)

self.position += bytes_read # Update position after read
if size is not None and bytes_read >= size:
break

return data

Expand All @@ -96,32 +115,32 @@ def seek(self, offset):
:param offset: The offset to seek to.
"""
with self.mutex:
if self.position != offset:
if offset < self.position:
self.dec = self.cipher.decryptor() # Reuse the existing cipher instance
os.lseek(self.fd, 0, os.SEEK_SET)
self.position = 0

discard_size = offset - self.position
while discard_size > 0:
to_read = min(CHUNK_SIZE, discard_size)
self.dec.update(os.read(self.fd, to_read)) # Discard the data
discard_size -= to_read

self.position = offset # Update position after seek
position = 0
self.dec = self.cipher.decryptor()
self.enc = self.cipher.encryptor()
os.lseek(self.fd, 0, os.SEEK_SET)
discard_size = offset - position
while discard_size > 0:
to_read = min(CHUNK_SIZE, discard_size)
data = self.dec.update(os.read(self.fd, to_read))
data = self.enc.update(data)
discard_size -= to_read

def tell(self):
return self.position
"""
Returns the current position in the file.
:return: The current position in the file.
"""
return os.lseek(self.fd, 0, os.SEEK_CUR)

def close(self):
"""
Closes the file descriptor.
"""
with self.mutex:
if self.fd is not None:
os.close(self.fd)
self.fd = None
if self.fd is not None:
os.close(self.fd)
self.fd = None

def __enter__(self):
"""
Expand Down Expand Up @@ -152,12 +171,16 @@ def __init__(self, storage_directory=None):
:param storage_directory: The directory to store the files. Defaults to a temporary directory.
"""
if storage_directory is None:
storage_directory = mkdtemp()
self.storage_directory = storage_directory
self.storage_directory = storage_directory if storage_directory is not None else mkdtemp()
self.files = {} # Track open files and their secure temporary file handlers
self.mutex = threading.Lock()

def get_file(self, path):
file = self.files.get(path)
if file is None:
raise FuseOSError(errno.ENOENT)
return file

def getattr(self, path, fh=None):
"""
Retrieves file or directory attributes.
Expand All @@ -170,9 +193,7 @@ def getattr(self, path, fh=None):
if path == '/':
return {'st_mode': (stat.S_IFDIR | 0o750), 'st_nlink': 2}

file = self.files.get(path)
if file is None or not os.path.exists(file.filepath):
raise OSError(errno.ENOENT, "No such file or directory", path)
file = self.get_file(path)

file_stat = os.stat(file.filepath)

Expand Down Expand Up @@ -206,6 +227,10 @@ def create(self, path, mode):
:param mode: The mode in which the file will be opened.
:return: The file descriptor.
"""
filename = os.path.basename(path)
if not is_valid_uuid4(filename):
raise FuseOSError(errno.ENOENT)

with self.mutex:
file = EphemeralFile(self.storage_directory)
file.open('w', mode)
Expand All @@ -221,9 +246,7 @@ def open(self, path, flags):
:return: The file descriptor.
"""
with self.mutex:
file = self.files.get(path)
if path not in self.files:
raise FuseOSError(errno.ENOENT)
file = self.get_file(path)
file.open('w' if (flags & os.O_RDWR or flags & os.O_WRONLY) else 'r')
return file.fd

Expand All @@ -238,9 +261,7 @@ def read(self, path, size, offset, fh=None):
:return: The data read from the file.
"""
with self.mutex:
file = self.files.get(path)
if file is None:
raise FuseOSError(errno.ENOENT)
file = self.get_file(path)
file.seek(offset)
return file.read(size)

Expand All @@ -255,7 +276,7 @@ def write(self, path, data, offset, fh=None):
:return: The number of bytes written.
"""
with self.mutex:
file = self.files.get(path)
file = self.get_file(path)
file.write(data)
return len(data)

Expand All @@ -279,9 +300,7 @@ def release(self, path, fh=None):
:param fh: File handle (not used here).
"""
with self.mutex:
file = self.files.get(path)
if file:
file.close()
self.get_file(path).close()

def truncate(self, path, length, fh=None):
"""
Expand All @@ -294,32 +313,19 @@ def truncate(self, path, length, fh=None):
:param length: The new size of the file.
"""
with self.mutex:
original_file = self.files.get(path)
if original_file is None:
raise FuseOSError(errno.ENOENT)
file = self.get_file(path)

if length < original_file.size:
original_file.open('r')
if length < file.size:
os.truncate(file.filepath, length)

truncated_file = EphemeralFile(self.storage_directory)
truncated_file.open('w')
file.seek(length)

if length > file.size:
length = length - file.size
bytes_written = 0
while bytes_written < length:
remaining = length - bytes_written
to_read = min(CHUNK_SIZE, remaining)
truncated_file.write(original_file.read(to_read))
bytes_written += to_read

del original_file
self.files[path] = truncated_file
elif length > original_file.size:
length = length - original_file.size
bytes_written = 0
while bytes_written < length:
remaining = length - bytes_written
to_write = min(CHUNK_SIZE, remaining)
original_file.write(b'\0' * to_write)
to_write = min(CHUNK_SIZE, length - bytes_written)
file.write(b'\0' * to_write)
bytes_written += to_write

def chmod(self, path, mode):
Expand All @@ -330,9 +336,7 @@ def chmod(self, path, mode):
:param mode: The new permissions mode (e.g., 0o777 for full permissions).
:raises FuseOSError: If the file does not exist.
"""
file = self.files.get(path)
if not file:
raise FuseOSError(errno.ENOENT)
file = self.get_file(path)
return os.chmod(file.filepath, mode)

def chown(self, path, uid, gid):
Expand All @@ -344,9 +348,7 @@ def chown(self, path, uid, gid):
:param gid: The group ID (gid) to set as the new group owner.
:raises FuseOSError: If the file does not exist.
"""
file = self.files.get(path)
if not file:
raise FuseOSError(errno.ENOENT)
file = self.get_file(path)
return os.chown(file.filepath, uid, gid)

class EphemeralFS(FUSE):
Expand Down
16 changes: 9 additions & 7 deletions tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import stat
import tempfile
import unittest
import uuid

from contextlib import redirect_stdout, redirect_stderr
from fuse import FUSE, FuseOSError
Expand All @@ -12,7 +13,7 @@

from globaleaks_eph_fs import EphemeralFS, EphemeralFile, EphemeralOperations

TEST_PATH = 'TESTFILE.TXT'
TEST_PATH = str(uuid.uuid4())
TEST_DATA = b"Hello, world! This is a test data for writing, seeking and reading operations."

ORIGINAL_SIZE = len(TEST_DATA)
Expand Down Expand Up @@ -153,16 +154,17 @@ def test_extend(self):
self.assertTrue(all(byte == 0 for byte in file_content[ORIGINAL_SIZE:]))

def test_readdir(self):
file_names = ['/file1', '/file2', '/file3']
for file_name in file_names:
self.fs.create(file_name, 0o660)
file_names = []
for _ in range(3):
file_names.append(str(uuid.uuid4()))
self.fs.create(file_names[-1], 0o660)

directory_contents = self.fs.readdir('/', None)
self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file2', 'file3'})
self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[1], file_names[2]})

self.fs.unlink('/file2')
self.fs.unlink(file_names[1])
directory_contents = self.fs.readdir('/', None)
self.assertEqual(set(directory_contents), {'.', '..', 'file1', 'file3'})
self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[2]})

@patch("os.chmod")
def test_chmod_success(self, mock_chmod):
Expand Down

0 comments on commit 92661e8

Please sign in to comment.