diff --git a/tests/main.py b/tests/main.py index 6d58a33..1d8d5a0 100644 --- a/tests/main.py +++ b/tests/main.py @@ -34,7 +34,11 @@ def method_with_two_params(param1, param2=None): # Define three calls call1 = Call('method_without_params') call2 = Call('method_with_one_param', args=['value1']) -call3 = Call('method_with_two_params', kwargs={'param1': 'value1', 'param2': 'value2'}) +call3 = Call('method_with_two_params', + kwargs={ + 'param1': 'value1', + 'param2': 'value2' + }) # Add the three calls to the txlog atomically and check their indexes txlog.begin() diff --git a/txlog/__init__.py b/txlog/__init__.py index eb44f82..0973634 100644 --- a/txlog/__init__.py +++ b/txlog/__init__.py @@ -4,8 +4,8 @@ from .txlog import TxLog, Call import logging -__version__ = '0.0.5a' +__version__ = '2.214.0' logging.getLogger().setLevel(logging.INFO) -logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') -logging.info(f'TxLog v{__version__}') +logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') diff --git a/txlog/txlog.py b/txlog/txlog.py index 2d5016f..389e683 100644 --- a/txlog/txlog.py +++ b/txlog/txlog.py @@ -1,14 +1,13 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +from .utils import get_timestamp_ms from easyrocks import RocksDB, WriteBatch, utils -import time import builtins from typing import Generator - -def get_timestamp_ms(): - return int(round(time.time() * 1000)) +UINT_BYTES = 5 +MAX_UINT = 2**(UINT_BYTES * 8) - 1 class Call: @@ -82,7 +81,16 @@ def set_index(self, index): class TxLog: - def __init__(self, path='./txlog_data', max_committed_items=0, committed_ttl_seconds=None): + + CALL_PREFIX = b'\x00' + OFFSET_PREFIX = b'\x01' + INDEX_PREFIX = b'\x02' + META_PREFIX = b'\x03' + + def __init__(self, + path='./txlog-data', + max_committed_items=0, + committed_ttl_seconds=None): self._batch_index = None self._write_batch = None self._committed_ttl_seconds = committed_ttl_seconds @@ -101,22 +109,21 @@ def commit(self): def rollback(self): self._write_batch = None - @staticmethod - def _get_call_key(index: int): - return f'txlog_{utils.get_padded_int(index)}' - def commit_call(self, call: Call): - new_write_batch = self._write_batch is None - self.begin() + is_new_write_batch = self._write_batch is None + if is_new_write_batch: + self.begin() + call._committed = True call._commitment_timestamp = get_timestamp_ms() self._update_call(call.index, call) self._increment_offset() - if new_write_batch: + + if is_new_write_batch: self.commit() def get(self, index: int) -> Call: - call_key = TxLog._get_call_key(index) + call_key = self._get_call_key(index) return self._db.get(call_key) def exec_uncommitted_calls(self, container_object=None): @@ -127,7 +134,11 @@ def exec_uncommitted_calls(self, container_object=None): def add(self, call: Call): if not isinstance(call, Call): raise TypeError + index = self._get_next_index() + if index > MAX_UINT: + raise ValueError(index) + call.set_index(index) self._put_call(index, call) return index @@ -141,26 +152,27 @@ def print_uncommitted_calls(self): print(call._method_name, call._args, call._kwargs) def get_calls(self) -> Generator[Call, None, None]: - for _, call in self._db.scan(prefix='txlog_'): + for _, call in self._db.scan(prefix=self.CALL_PREFIX): yield call def get_first_uncommitted_call(self) -> Call: next_offset = self._get_next_offset() if next_offset > self._get_index(): return - call_key = TxLog._get_call_key(next_offset) + call_key = self._get_call_key(next_offset) return self._db.get(call_key) def get_uncommitted_calls(self) -> Generator[Call, None, None]: first_uncommitted_call = self.get_first_uncommitted_call() if first_uncommitted_call is not None: - for index in range(first_uncommitted_call.index, self._get_next_index()): + for index in range(first_uncommitted_call.index, + self._get_next_index()): yield self.get(index) def truncate(self): if self._max_committed_items is not None: committed_calls_no = self.count_committed_calls() - for key, call in self._db.scan(prefix='txlog_'): + for key, call in self._db.scan(prefix=self.CALL_PREFIX): if not call.committed: break @@ -171,47 +183,50 @@ def truncate(self): committed_calls_no -= 1 if self._committed_ttl_seconds is not None: - for key, call in self._db.scan(prefix='txlog_'): + for key, call in self._db.scan(prefix=self.CALL_PREFIX): if not call.committed: break - min_timestamp = get_timestamp_ms() - self._committed_ttl_seconds * 1000 + min_timestamp = get_timestamp_ms() \ + - self._committed_ttl_seconds * 1000 if call._creation_timestamp <= min_timestamp: self._db.delete(key) def count_committed_calls(self) -> int: counter = 0 - for _, call in self._db.scan(prefix='txlog_'): + for _, call in self._db.scan(prefix=self.CALL_PREFIX): if call.committed: counter += 1 return counter def count_calls(self) -> int: counter = 0 - for _, _ in self._db.scan(prefix='txlog_'): + for _, _ in self._db.scan(prefix=self.CALL_PREFIX): counter += 1 return counter + def _get_call_key(self, index: int): + return self.CALL_PREFIX + utils.int_to_padded_bytes(index, UINT_BYTES) + def _update_call(self, index: int, call: Call): if not isinstance(call, Call): raise TypeError - call_key = TxLog._get_call_key(index) + call_key = self._get_call_key(index) self._db.put(call_key, call, write_batch=self._write_batch) def _put_call(self, index: int, call: Call): if not isinstance(call, Call): raise TypeError - is_batch_new = False - if self._write_batch is None: - is_batch_new = True + is_new_write_batch = self._write_batch is None + if is_new_write_batch: self.begin() - call_key = TxLog._get_call_key(index) + call_key = self._get_call_key(index) self._db.put(call_key, call, write_batch=self._write_batch) self._increment_index() - if is_batch_new: + if is_new_write_batch: self.commit() def _increment_offset(self): @@ -236,16 +251,22 @@ def _get_next_index(self) -> int: return self._batch_index + 1 return self._get_int_attribute('index') + 1 + def _get_meta_key(self, attribute: str): + return self.META_PREFIX + bytes(attribute, 'utf-8') + def _increment_int_attribute(self, attribute: str): if attribute == 'index' and self._write_batch is not None: self._batch_index += 1 value = self._batch_index else: value = self._get_int_attribute(attribute) + 1 - self._db.put(f'meta_{attribute}', value, write_batch=self._write_batch) + + key = self._get_meta_key(attribute) + self._db.put(key, value, write_batch=self._write_batch) def _get_int_attribute(self, attribute: str) -> int: - value = self._db.get(f'meta_{attribute}') + key = self._get_meta_key(attribute) + value = self._db.get(key) if value is None: return -1 - return value \ No newline at end of file + return value diff --git a/txlog/utils.py b/txlog/utils.py new file mode 100644 index 0000000..4abb768 --- /dev/null +++ b/txlog/utils.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import time + + +def get_timestamp_ms(): + return int(round(time.time() * 1000))