diff --git a/time_based_storage/requirements-dev.txt b/time_based_storage/requirements-dev.txt index 955a846..f3b7247 100644 --- a/time_based_storage/requirements-dev.txt +++ b/time_based_storage/requirements-dev.txt @@ -2,4 +2,5 @@ pytest>=7.0.0 black>=23.0.0 flake8>=6.0.0 pytest-cov>=4.0.0 -codecov>=2.1.12 \ No newline at end of file +codecov>=2.1.12 +sortedcontainers>=2.4.0 \ No newline at end of file diff --git a/time_based_storage/src/time_based_storage/__init__.py b/time_based_storage/src/time_based_storage/__init__.py index 630fb65..1448b76 100644 --- a/time_based_storage/src/time_based_storage/__init__.py +++ b/time_based_storage/src/time_based_storage/__init__.py @@ -4,20 +4,28 @@ Core (non-thread-safe) implementations: - TimeBasedStorage: Basic time-based storage using a dictionary - TimeBasedStorageHeap: Heap-based implementation for efficient range queries +- TimeBasedStorageRBTree: Red-Black Tree implementation for efficient insertion and range queries Concurrent (thread-safe) implementations: - ThreadSafeTimeBasedStorage: Thread-safe wrapper around TimeBasedStorage - ThreadSafeTimeBasedStorageHeap: Thread-safe wrapper around TimeBasedStorageHeap +- ThreadSafeTimeBasedStorageRBTree: Thread-safe wrapper around TimeBasedStorageRBTree """ -from .core import TimeBasedStorage, TimeBasedStorageHeap -from .concurrent import ThreadSafeTimeBasedStorage, ThreadSafeTimeBasedStorageHeap +from .core import TimeBasedStorage, TimeBasedStorageHeap, TimeBasedStorageRBTree +from .concurrent import ( + ThreadSafeTimeBasedStorage, + ThreadSafeTimeBasedStorageHeap, + ThreadSafeTimeBasedStorageRBTree, +) __all__ = [ # Core implementations "TimeBasedStorage", "TimeBasedStorageHeap", + "TimeBasedStorageRBTree", # Concurrent implementations "ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap", + "ThreadSafeTimeBasedStorageRBTree", ] diff --git a/time_based_storage/src/time_based_storage/concurrent/__init__.py b/time_based_storage/src/time_based_storage/concurrent/__init__.py index 87c2bc1..661e832 100644 --- a/time_based_storage/src/time_based_storage/concurrent/__init__.py +++ b/time_based_storage/src/time_based_storage/concurrent/__init__.py @@ -5,5 +5,6 @@ from .thread_safe import ThreadSafeTimeBasedStorage from .thread_safe_heap import ThreadSafeTimeBasedStorageHeap +from .thread_safe_rbtree import ThreadSafeTimeBasedStorageRBTree -__all__ = ["ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap"] +__all__ = ["ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap", "ThreadSafeTimeBasedStorageRBTree"] diff --git a/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py b/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py new file mode 100644 index 0000000..fb2926d --- /dev/null +++ b/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py @@ -0,0 +1,176 @@ +from typing import List, Optional, TypeVar, Generic +from datetime import datetime +import threading +from ..core import TimeBasedStorageRBTree + +T = TypeVar("T") + + +class ThreadSafeTimeBasedStorageRBTree(TimeBasedStorageRBTree[T], Generic[T]): + """ + Thread-safe implementation of TimeBasedStorageRBTree using Python's threading module. + This implementation provides safe concurrent access to the Red-Black Tree storage using read-write locks. + + Benefits over the standard ThreadSafeTimeBasedStorage: + - Better insertion performance: O(log n) vs O(n) + - Better range query performance: O(log n + k) vs O(n) where k is the number of items in range + """ + + def __init__(self): + super().__init__() + # Read-write lock for thread-safe operations + self._lock = threading.RLock() + # Condition variable for waiting on events + self._condition = threading.Condition(self._lock) + + def add(self, timestamp: datetime, value: T) -> None: + """ + Thread-safe method to add a value with its timestamp. + + Args: + timestamp: The timestamp of the value + value: The value to store + """ + with self._lock: + super().add(timestamp, value) + # Notify any waiting threads that new data is available + self._condition.notify_all() + + def add_unique_timestamp(self, timestamp: datetime, value: T, max_offset_microseconds: int = 1000000) -> datetime: + """ + Thread-safe method to add a value with a guaranteed unique timestamp. + + Args: + timestamp: The desired timestamp + value: The value to store + max_offset_microseconds: Maximum random offset to add (default: 1 second) + + Returns: + The actual timestamp used (may be different from input if offset was added) + """ + with self._lock: + result = super().add_unique_timestamp(timestamp, value, max_offset_microseconds) + # Notify any waiting threads that new data is available + self._condition.notify_all() + return result + + def get_range(self, start_time: datetime, end_time: datetime) -> List[T]: + """ + Thread-safe method to get all values within a time range. + + Args: + start_time: Start of the time range + end_time: End of the time range + + Returns: + List of values within the specified time range + """ + with self._lock: + return super().get_range(start_time, end_time) + + def get_duration(self, duration: float) -> List[T]: + """ + Thread-safe method to get all values within the last duration seconds. + + Args: + duration: Number of seconds to look back + + Returns: + List of values within the specified duration + """ + with self._lock: + return super().get_duration(duration) + + def clear(self) -> None: + """Thread-safe method to clear all stored values.""" + with self._lock: + super().clear() + + def get_all(self) -> List[T]: + """ + Thread-safe method to get all stored values. + + Returns: + List of all stored values + """ + with self._lock: + return super().get_all() + + def get_timestamps(self) -> List[datetime]: + """ + Thread-safe method to get all stored timestamps. + + Returns: + List of all stored timestamps + """ + with self._lock: + return super().get_timestamps() + + def get_value_at(self, timestamp: datetime) -> Optional[T]: + """ + Thread-safe method to get the value at a specific timestamp. + + Args: + timestamp: The timestamp to look up + + Returns: + The value at the specified timestamp, or None if not found + """ + with self._lock: + return super().get_value_at(timestamp) + + def remove(self, timestamp: datetime) -> bool: + """ + Thread-safe method to remove a value at a specific timestamp. + + Args: + timestamp: The timestamp of the value to remove + + Returns: + True if the value was removed, False if not found + """ + with self._lock: + return super().remove(timestamp) + + def size(self) -> int: + """ + Thread-safe method to get the number of stored values. + + Returns: + Number of stored values + """ + with self._lock: + return super().size() + + def is_empty(self) -> bool: + """ + Thread-safe method to check if the storage is empty. + + Returns: + True if the storage is empty, False otherwise + """ + with self._lock: + return super().is_empty() + + def wait_for_data(self, timeout: float = None) -> bool: + """ + Wait for data to be available in the storage. + + Args: + timeout: Maximum time to wait in seconds, or None to wait indefinitely + + Returns: + True if data is available, False if timeout occurred + """ + with self._lock: + if not self.is_empty(): + return True + return self._condition.wait(timeout=timeout) + + def notify_data_available(self) -> None: + """ + Notify waiting threads that data is available. + This is automatically called by add() and add_unique_timestamp(). + """ + with self._lock: + self._condition.notify_all() diff --git a/time_based_storage/src/time_based_storage/core/__init__.py b/time_based_storage/src/time_based_storage/core/__init__.py index 389afac..5668af8 100644 --- a/time_based_storage/src/time_based_storage/core/__init__.py +++ b/time_based_storage/src/time_based_storage/core/__init__.py @@ -5,5 +5,6 @@ from .base import TimeBasedStorage from .heap import TimeBasedStorageHeap +from .rbtree import TimeBasedStorageRBTree -__all__ = ["TimeBasedStorage", "TimeBasedStorageHeap"] +__all__ = ["TimeBasedStorage", "TimeBasedStorageHeap", "TimeBasedStorageRBTree"] diff --git a/time_based_storage/src/time_based_storage/core/rbtree.py b/time_based_storage/src/time_based_storage/core/rbtree.py new file mode 100644 index 0000000..f8246d4 --- /dev/null +++ b/time_based_storage/src/time_based_storage/core/rbtree.py @@ -0,0 +1,161 @@ +from typing import List, Optional, TypeVar, Generic +from datetime import datetime, timedelta +from sortedcontainers import SortedDict +import random + +T = TypeVar("T") + + +class TimeBasedStorageRBTree(Generic[T]): + """ + Red-Black Tree implementation of time-based storage. + This implementation uses SortedDict from sortedcontainers which provides + Red-Black Tree performance characteristics. + + This is a non-thread-safe implementation suitable for single-threaded use. + + Compared to the base TimeBasedStorage: + - Better insertion performance: O(log n) vs O(n) + - Equivalent lookup performance: O(1) + - Better range query performance: O(log n + k) vs O(n) where k is the number of items in range + + Note: + Timestamps have microsecond precision (0.000001 seconds). + When adding items rapidly, consider using add_unique_timestamp() to avoid collisions. + """ + + def __init__(self): + self._storage = SortedDict() + + def add(self, timestamp: datetime, value: T) -> None: + """ + Add a value with its timestamp. + + Args: + timestamp: The timestamp of the value + value: The value to store + + Raises: + ValueError: If a value already exists at the given timestamp + """ + if timestamp in self._storage: + raise ValueError(f"Value already exists at timestamp {timestamp}") + self._storage[timestamp] = value + + def add_unique_timestamp(self, timestamp: datetime, value: T, max_offset_microseconds: int = 1000000) -> datetime: + """ + Add a value with a guaranteed unique timestamp. + If the timestamp already exists, adds a random microsecond offset. + + Args: + timestamp: The desired timestamp + value: The value to store + max_offset_microseconds: Maximum random offset to add (default: 1 second) + + Returns: + The actual timestamp used (may be different from input if offset was added) + """ + if timestamp not in self._storage: + self._storage[timestamp] = value + return timestamp + + # Add random offset in microseconds + offset = random.randint(0, max_offset_microseconds) + unique_timestamp = timestamp + timedelta(microseconds=offset) + self._storage[unique_timestamp] = value + return unique_timestamp + + def get_range(self, start_time: datetime, end_time: datetime) -> List[T]: + """ + Get all values within a time range. + Efficiently uses SortedDict's irange method for optimized range queries. + + Args: + start_time: Start of the time range + end_time: End of the time range + + Returns: + List of values within the specified time range + """ + return [self._storage[ts] for ts in self._storage.irange(start_time, end_time)] + + def get_duration(self, duration: float) -> List[T]: + """ + Get all values within the last duration seconds. + + Args: + duration: Number of seconds to look back + + Returns: + List of values within the specified duration + """ + now = datetime.now() + start_time = now.fromtimestamp(now.timestamp() - duration) + return self.get_range(start_time, now) + + def clear(self) -> None: + """Clear all stored values.""" + self._storage.clear() + + def get_all(self) -> List[T]: + """ + Get all stored values. + + Returns: + List of all stored values + """ + return list(self._storage.values()) + + def get_timestamps(self) -> List[datetime]: + """ + Get all stored timestamps. + + Returns: + List of all stored timestamps + """ + return list(self._storage.keys()) + + def get_value_at(self, timestamp: datetime) -> Optional[T]: + """ + Get the value at a specific timestamp. + + Args: + timestamp: The timestamp to look up + + Returns: + The value at the specified timestamp, or None if not found + """ + return self._storage.get(timestamp) + + def remove(self, timestamp: datetime) -> bool: + """ + Remove a value at a specific timestamp. + + Args: + timestamp: The timestamp of the value to remove + + Returns: + True if the value was removed, False if not found + """ + if timestamp in self._storage: + del self._storage[timestamp] + return True + return False + + def size(self) -> int: + """ + Get the number of stored values. + + Returns: + Number of stored values + """ + return len(self._storage) + + def is_empty(self) -> bool: + """ + Check if the storage is empty. + + Returns: + True if the storage is empty, False otherwise + """ + return len(self._storage) == 0 diff --git a/time_based_storage/tests/test_rbtree.py b/time_based_storage/tests/test_rbtree.py new file mode 100644 index 0000000..19184a6 --- /dev/null +++ b/time_based_storage/tests/test_rbtree.py @@ -0,0 +1,117 @@ +import unittest +from datetime import datetime, timedelta +import time +import threading +from time_based_storage import ( + TimeBasedStorageRBTree, + ThreadSafeTimeBasedStorageRBTree, +) + + +class TestTimeBasedStorageRBTree(unittest.TestCase): + def setUp(self): + self.storage = TimeBasedStorageRBTree[str]() + self.now = datetime.now() + + def test_add_and_get(self): + self.storage.add(self.now, "test value") + self.assertEqual(self.storage.get_value_at(self.now), "test value") + + def test_range_query(self): + # Add some values + self.storage.add(self.now - timedelta(minutes=10), "value1") + self.storage.add(self.now - timedelta(minutes=5), "value2") + self.storage.add(self.now, "value3") + self.storage.add(self.now + timedelta(minutes=5), "value4") + + # Test range query + values = self.storage.get_range(self.now - timedelta(minutes=7), self.now + timedelta(minutes=1)) + self.assertEqual(set(values), {"value2", "value3"}) + + def test_duplicate_timestamp(self): + self.storage.add(self.now, "value1") + with self.assertRaises(ValueError): + self.storage.add(self.now, "value2") + + def test_add_unique_timestamp(self): + # Add first value + ts1 = self.storage.add_unique_timestamp(self.now, "value1") + self.assertEqual(ts1, self.now) + + # Add second value with same timestamp + ts2 = self.storage.add_unique_timestamp(self.now, "value2") + self.assertNotEqual(ts1, ts2) + + # Verify both values are stored + self.assertEqual(self.storage.size(), 2) + + def test_duration_query(self): + # Add values with timestamps in the past + old_time = datetime.now() - timedelta(seconds=10) + self.storage.add(old_time, "old value") + + # Add recent value + recent_time = datetime.now() - timedelta(seconds=1) + self.storage.add(recent_time, "recent value") + + # Get values from last 5 seconds + values = self.storage.get_duration(5) + self.assertIn("recent value", values) + self.assertNotIn("old value", values) + + +class TestThreadSafeTimeBasedStorageRBTree(unittest.TestCase): + def setUp(self): + self.storage = ThreadSafeTimeBasedStorageRBTree[str]() + self.now = datetime.now() + + def test_concurrent_add(self): + # Define a function to add values from a thread + def add_values(start_idx, count): + for i in range(start_idx, start_idx + count): + timestamp = self.now + timedelta(microseconds=i) + self.storage.add(timestamp, f"value{i}") + + # Create and start threads + threads = [] + for i in range(5): + t = threading.Thread(target=add_values, args=(i * 100, 100)) + threads.append(t) + t.start() + + # Wait for all threads to complete + for t in threads: + t.join() + + # Verify all values were added + self.assertEqual(self.storage.size(), 500) + + def test_wait_for_data(self): + # Thread to add data after a delay + def delayed_add(): + time.sleep(0.1) + self.storage.add(self.now, "delayed value") + + # Start thread to add data with delay + t = threading.Thread(target=delayed_add) + t.start() + + # Wait for data to be available + result = self.storage.wait_for_data(timeout=1.0) + t.join() + + # Verify data is available and wait was successful + self.assertTrue(result) + self.assertEqual(self.storage.get_value_at(self.now), "delayed value") + + def test_wait_timeout(self): + # Wait with a short timeout when no data is available + result = self.storage.wait_for_data(timeout=0.01) + + # Verify wait timed out + self.assertFalse(result) + self.assertTrue(self.storage.is_empty()) + + +if __name__ == "__main__": + unittest.main()