Skip to content

Commit

Permalink
Added: working document_id overrides for TinyDB (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
signebedi committed Mar 22, 2024
1 parent 8d29c66 commit e9794ca
Showing 1 changed file with 135 additions and 4 deletions.
139 changes: 135 additions & 4 deletions libreforms_fastapi/utils/document_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,141 @@
from bson import ObjectId
from datetime import datetime
from zoneinfo import ZoneInfo
from tinydb import TinyDB, Query
from tinydb import (
TinyDB,
Query,
Storage
)
from tinydb.table import (
Table as TinyTable,
Document
)

from typing import (
Mapping,
Union,
Iterable,
List,
)
from abc import ABC, abstractmethod

from libreforms_fastapi.utils.logging import set_logger

# We want to modify TinyDB use use string representations of bson
# ObjectIDs. As such, we will need to modify some underlying behavior,
# see https://github.com/signebedi/libreforms-fastapi/issues/15.
class CustomTable(TinyTable):
document_id_class = str # Use string IDs instead of integers

def _get_next_id(self, document_id=str(ObjectId())):
"""
Generate a new BSON ObjectID string to use as the TinyDB document ID.
"""
return document_id


def insert(self, document: Mapping, document_id:Union[str, bool]=False) -> int:
"""
Insert a new document into the table.
:param document: the document to insert
:returns: the inserted document's ID
"""

if not document_id:
document_id = str(ObjectId())

# Make sure the document implements the ``Mapping`` interface
if not isinstance(document, Mapping):
raise ValueError('Document is not a Mapping')

# First, we get the document ID for the new document
if isinstance(document, Document):
# For a `Document` object we use the specified ID
doc_id = document.doc_id

# We also reset the stored next ID so the next insert won't
# re-use document IDs by accident when storing an old value
self._next_id = None
else:
# In all other cases we use the next free ID
doc_id = self._get_next_id(document_id=document_id)

# Now, we update the table and add the document
def updater(table: dict):
if doc_id in table:
raise ValueError(f'Document with ID {str(doc_id)} '
f'already exists')

# By calling ``dict(document)`` we convert the data we got to a
# ``dict`` instance even if it was a different class that
# implemented the ``Mapping`` interface
table[doc_id] = dict(document)

# See below for details on ``Table._update``
self._update_table(updater)

return doc_id

def insert_multiple(self, documents: Iterable[Mapping], document_ids:Union[List, bool]=False) -> List[int]:
"""
Insert multiple documents into the table.
:param documents: an Iterable of documents to insert
:returns: a list containing the inserted documents' IDs
"""
doc_ids = []

if document_ids and len(document_ids) != len(documents):
raise Exception("When inserting multiple and passing your own document_ids," \
"the list must be the same length as the document list")

def updater(table: dict):
# for document in documents:
for i, document in enumerate(documents):

# Make sure the document implements the ``Mapping`` interface
if not isinstance(document, Mapping):
raise ValueError('Document is not a Mapping')

if isinstance(document, Document):
# Check if document does not override an existing document
if document.doc_id in table:
raise ValueError(
f'Document with ID {str(document.doc_id)} '
f'already exists'
)

# Store the doc_id, so we can return all document IDs
# later. Then save the document with its doc_id and
# skip the rest of the current loop
doc_id = document.doc_id
doc_ids.append(doc_id)
table[doc_id] = dict(document)
continue

# Generate new document ID for this document
# Store the doc_id, so we can return all document IDs
# later, then save the document with the new doc_id
if not document_ids:
document_id = str(ObjectId())
else:
document_id = document_ids[i]
doc_id = self._get_next_id()
doc_ids.append(doc_id)
table[doc_id] = dict(document)

# See below for details on ``Table._update``
self._update_table(updater)

return doc_ids

# Subclass TinyDB and override the table_class attribute with our new logic
class CustomTinyDB(TinyDB):
table_class = CustomTable



class CollectionDoesNotExist(Exception):
"""Exception raised when attempting to access a collection that does not exist."""
def __init__(self, form_name):
Expand Down Expand Up @@ -133,7 +263,8 @@ def _initialize_database_collections(self):
# Initialize databases
self.databases = {}
for form_name in self.config.keys():
self.databases[form_name] = TinyDB(self._get_db_path(form_name))
# self.databases[form_name] = TinyDB(self._get_db_path(form_name))
self.databases[form_name] = CustomTinyDB(self._get_db_path(form_name))

def _get_db_path(self, form_name:str):
"""Constructs a file path for the given form's database."""
Expand All @@ -160,7 +291,7 @@ def create_document(self, form_name:str, json_data, metadata={}):
data_dict = {
"data": convert_data_to_dict,
"metadata": {
self.document_id_field: document_id,
# self.document_id_field: document_id,
self.is_deleted_field: metadata.get(self.is_deleted_field, False),
self.timezone_field: metadata.get(self.timezone_field, self.timezone.key),
self.created_at_field: metadata.get(self.created_at_field, current_timestamp.isoformat()),
Expand All @@ -176,7 +307,7 @@ def create_document(self, form_name:str, json_data, metadata={}):
}

# document_id = self.databases[form_name].insert(data_dict)
_ = self.databases[form_name].insert(data_dict)
_ = self.databases[form_name].insert(data_dict, document_id=document_id)

if self.use_logger:
self.logger.info(f"Inserted document for {form_name} with document_id {document_id}")
Expand Down

0 comments on commit e9794ca

Please sign in to comment.