From 8b024f0cc8a75ee7695835c03d083e9bbd8b5e23 Mon Sep 17 00:00:00 2001 From: John Burbridge Date: Sun, 23 Mar 2025 00:27:45 -0700 Subject: [PATCH 1/2] feat: RB-Tree implementation and unit tests --- time_based_storage/requirements-dev.txt | 3 +- .../src/time_based_storage/__init__.py | 12 +- .../time_based_storage/concurrent/__init__.py | 3 +- .../concurrent/thread_safe_rbtree.py | 176 ++++++++++++++++++ .../src/time_based_storage/core/__init__.py | 3 +- .../src/time_based_storage/core/rbtree.py | 161 ++++++++++++++++ time_based_storage/tests/test_rbtree.py | 120 ++++++++++++ 7 files changed, 473 insertions(+), 5 deletions(-) create mode 100644 time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py create mode 100644 time_based_storage/src/time_based_storage/core/rbtree.py create mode 100644 time_based_storage/tests/test_rbtree.py diff --git a/time_based_storage/requirements-dev.txt b/time_based_storage/requirements-dev.txt index 955a846..f3b7247 100644 --- a/time_based_storage/requirements-dev.txt +++ b/time_based_storage/requirements-dev.txt @@ -2,4 +2,5 @@ pytest>=7.0.0 black>=23.0.0 flake8>=6.0.0 pytest-cov>=4.0.0 -codecov>=2.1.12 \ No newline at end of file +codecov>=2.1.12 +sortedcontainers>=2.4.0 \ No newline at end of file diff --git a/time_based_storage/src/time_based_storage/__init__.py b/time_based_storage/src/time_based_storage/__init__.py index 630fb65..1448b76 100644 --- a/time_based_storage/src/time_based_storage/__init__.py +++ b/time_based_storage/src/time_based_storage/__init__.py @@ -4,20 +4,28 @@ Core (non-thread-safe) implementations: - TimeBasedStorage: Basic time-based storage using a dictionary - TimeBasedStorageHeap: Heap-based implementation for efficient range queries +- TimeBasedStorageRBTree: Red-Black Tree implementation for efficient insertion and range queries Concurrent (thread-safe) implementations: - ThreadSafeTimeBasedStorage: Thread-safe wrapper around TimeBasedStorage - ThreadSafeTimeBasedStorageHeap: Thread-safe wrapper around TimeBasedStorageHeap +- ThreadSafeTimeBasedStorageRBTree: Thread-safe wrapper around TimeBasedStorageRBTree """ -from .core import TimeBasedStorage, TimeBasedStorageHeap -from .concurrent import ThreadSafeTimeBasedStorage, ThreadSafeTimeBasedStorageHeap +from .core import TimeBasedStorage, TimeBasedStorageHeap, TimeBasedStorageRBTree +from .concurrent import ( + ThreadSafeTimeBasedStorage, + ThreadSafeTimeBasedStorageHeap, + ThreadSafeTimeBasedStorageRBTree, +) __all__ = [ # Core implementations "TimeBasedStorage", "TimeBasedStorageHeap", + "TimeBasedStorageRBTree", # Concurrent implementations "ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap", + "ThreadSafeTimeBasedStorageRBTree", ] diff --git a/time_based_storage/src/time_based_storage/concurrent/__init__.py b/time_based_storage/src/time_based_storage/concurrent/__init__.py index 87c2bc1..661e832 100644 --- a/time_based_storage/src/time_based_storage/concurrent/__init__.py +++ b/time_based_storage/src/time_based_storage/concurrent/__init__.py @@ -5,5 +5,6 @@ from .thread_safe import ThreadSafeTimeBasedStorage from .thread_safe_heap import ThreadSafeTimeBasedStorageHeap +from .thread_safe_rbtree import ThreadSafeTimeBasedStorageRBTree -__all__ = ["ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap"] +__all__ = ["ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap", "ThreadSafeTimeBasedStorageRBTree"] diff --git a/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py b/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py new file mode 100644 index 0000000..bcfca2d --- /dev/null +++ b/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py @@ -0,0 +1,176 @@ +from typing import List, Optional, TypeVar, Generic +from datetime import datetime +import threading +from ..core import TimeBasedStorageRBTree + +T = TypeVar("T") + + +class ThreadSafeTimeBasedStorageRBTree(TimeBasedStorageRBTree[T], Generic[T]): + """ + Thread-safe implementation of TimeBasedStorageRBTree using Python's threading module. + This implementation provides safe concurrent access to the Red-Black Tree storage using read-write locks. + + Benefits over the standard ThreadSafeTimeBasedStorage: + - Better insertion performance: O(log n) vs O(n) + - Better range query performance: O(log n + k) vs O(n) where k is the number of items in range + """ + + def __init__(self): + super().__init__() + # Read-write lock for thread-safe operations + self._lock = threading.RLock() + # Condition variable for waiting on events + self._condition = threading.Condition(self._lock) + + def add(self, timestamp: datetime, value: T) -> None: + """ + Thread-safe method to add a value with its timestamp. + + Args: + timestamp: The timestamp of the value + value: The value to store + """ + with self._lock: + super().add(timestamp, value) + # Notify any waiting threads that new data is available + self._condition.notify_all() + + def add_unique_timestamp(self, timestamp: datetime, value: T, max_offset_microseconds: int = 1000000) -> datetime: + """ + Thread-safe method to add a value with a guaranteed unique timestamp. + + Args: + timestamp: The desired timestamp + value: The value to store + max_offset_microseconds: Maximum random offset to add (default: 1 second) + + Returns: + The actual timestamp used (may be different from input if offset was added) + """ + with self._lock: + result = super().add_unique_timestamp(timestamp, value, max_offset_microseconds) + # Notify any waiting threads that new data is available + self._condition.notify_all() + return result + + def get_range(self, start_time: datetime, end_time: datetime) -> List[T]: + """ + Thread-safe method to get all values within a time range. + + Args: + start_time: Start of the time range + end_time: End of the time range + + Returns: + List of values within the specified time range + """ + with self._lock: + return super().get_range(start_time, end_time) + + def get_duration(self, duration: float) -> List[T]: + """ + Thread-safe method to get all values within the last duration seconds. + + Args: + duration: Number of seconds to look back + + Returns: + List of values within the specified duration + """ + with self._lock: + return super().get_duration(duration) + + def clear(self) -> None: + """Thread-safe method to clear all stored values.""" + with self._lock: + super().clear() + + def get_all(self) -> List[T]: + """ + Thread-safe method to get all stored values. + + Returns: + List of all stored values + """ + with self._lock: + return super().get_all() + + def get_timestamps(self) -> List[datetime]: + """ + Thread-safe method to get all stored timestamps. + + Returns: + List of all stored timestamps + """ + with self._lock: + return super().get_timestamps() + + def get_value_at(self, timestamp: datetime) -> Optional[T]: + """ + Thread-safe method to get the value at a specific timestamp. + + Args: + timestamp: The timestamp to look up + + Returns: + The value at the specified timestamp, or None if not found + """ + with self._lock: + return super().get_value_at(timestamp) + + def remove(self, timestamp: datetime) -> bool: + """ + Thread-safe method to remove a value at a specific timestamp. + + Args: + timestamp: The timestamp of the value to remove + + Returns: + True if the value was removed, False if not found + """ + with self._lock: + return super().remove(timestamp) + + def size(self) -> int: + """ + Thread-safe method to get the number of stored values. + + Returns: + Number of stored values + """ + with self._lock: + return super().size() + + def is_empty(self) -> bool: + """ + Thread-safe method to check if the storage is empty. + + Returns: + True if the storage is empty, False otherwise + """ + with self._lock: + return super().is_empty() + + def wait_for_data(self, timeout: float = None) -> bool: + """ + Wait for data to be available in the storage. + + Args: + timeout: Maximum time to wait in seconds, or None to wait indefinitely + + Returns: + True if data is available, False if timeout occurred + """ + with self._lock: + if not self.is_empty(): + return True + return self._condition.wait(timeout=timeout) + + def notify_data_available(self) -> None: + """ + Notify waiting threads that data is available. + This is automatically called by add() and add_unique_timestamp(). + """ + with self._lock: + self._condition.notify_all() \ No newline at end of file diff --git a/time_based_storage/src/time_based_storage/core/__init__.py b/time_based_storage/src/time_based_storage/core/__init__.py index 389afac..5668af8 100644 --- a/time_based_storage/src/time_based_storage/core/__init__.py +++ b/time_based_storage/src/time_based_storage/core/__init__.py @@ -5,5 +5,6 @@ from .base import TimeBasedStorage from .heap import TimeBasedStorageHeap +from .rbtree import TimeBasedStorageRBTree -__all__ = ["TimeBasedStorage", "TimeBasedStorageHeap"] +__all__ = ["TimeBasedStorage", "TimeBasedStorageHeap", "TimeBasedStorageRBTree"] diff --git a/time_based_storage/src/time_based_storage/core/rbtree.py b/time_based_storage/src/time_based_storage/core/rbtree.py new file mode 100644 index 0000000..dd60d1e --- /dev/null +++ b/time_based_storage/src/time_based_storage/core/rbtree.py @@ -0,0 +1,161 @@ +from typing import List, Optional, TypeVar, Generic +from datetime import datetime, timedelta +from sortedcontainers import SortedDict +import random + +T = TypeVar("T") + + +class TimeBasedStorageRBTree(Generic[T]): + """ + Red-Black Tree implementation of time-based storage. + This implementation uses SortedDict from sortedcontainers which provides + Red-Black Tree performance characteristics. + + This is a non-thread-safe implementation suitable for single-threaded use. + + Compared to the base TimeBasedStorage: + - Better insertion performance: O(log n) vs O(n) + - Equivalent lookup performance: O(1) + - Better range query performance: O(log n + k) vs O(n) where k is the number of items in range + + Note: + Timestamps have microsecond precision (0.000001 seconds). + When adding items rapidly, consider using add_unique_timestamp() to avoid collisions. + """ + + def __init__(self): + self._storage = SortedDict() + + def add(self, timestamp: datetime, value: T) -> None: + """ + Add a value with its timestamp. + + Args: + timestamp: The timestamp of the value + value: The value to store + + Raises: + ValueError: If a value already exists at the given timestamp + """ + if timestamp in self._storage: + raise ValueError(f"Value already exists at timestamp {timestamp}") + self._storage[timestamp] = value + + def add_unique_timestamp(self, timestamp: datetime, value: T, max_offset_microseconds: int = 1000000) -> datetime: + """ + Add a value with a guaranteed unique timestamp. + If the timestamp already exists, adds a random microsecond offset. + + Args: + timestamp: The desired timestamp + value: The value to store + max_offset_microseconds: Maximum random offset to add (default: 1 second) + + Returns: + The actual timestamp used (may be different from input if offset was added) + """ + if timestamp not in self._storage: + self._storage[timestamp] = value + return timestamp + + # Add random offset in microseconds + offset = random.randint(0, max_offset_microseconds) + unique_timestamp = timestamp + timedelta(microseconds=offset) + self._storage[unique_timestamp] = value + return unique_timestamp + + def get_range(self, start_time: datetime, end_time: datetime) -> List[T]: + """ + Get all values within a time range. + Efficiently uses SortedDict's irange method for optimized range queries. + + Args: + start_time: Start of the time range + end_time: End of the time range + + Returns: + List of values within the specified time range + """ + return [self._storage[ts] for ts in self._storage.irange(start_time, end_time)] + + def get_duration(self, duration: float) -> List[T]: + """ + Get all values within the last duration seconds. + + Args: + duration: Number of seconds to look back + + Returns: + List of values within the specified duration + """ + now = datetime.now() + start_time = now.fromtimestamp(now.timestamp() - duration) + return self.get_range(start_time, now) + + def clear(self) -> None: + """Clear all stored values.""" + self._storage.clear() + + def get_all(self) -> List[T]: + """ + Get all stored values. + + Returns: + List of all stored values + """ + return list(self._storage.values()) + + def get_timestamps(self) -> List[datetime]: + """ + Get all stored timestamps. + + Returns: + List of all stored timestamps + """ + return list(self._storage.keys()) + + def get_value_at(self, timestamp: datetime) -> Optional[T]: + """ + Get the value at a specific timestamp. + + Args: + timestamp: The timestamp to look up + + Returns: + The value at the specified timestamp, or None if not found + """ + return self._storage.get(timestamp) + + def remove(self, timestamp: datetime) -> bool: + """ + Remove a value at a specific timestamp. + + Args: + timestamp: The timestamp of the value to remove + + Returns: + True if the value was removed, False if not found + """ + if timestamp in self._storage: + del self._storage[timestamp] + return True + return False + + def size(self) -> int: + """ + Get the number of stored values. + + Returns: + Number of stored values + """ + return len(self._storage) + + def is_empty(self) -> bool: + """ + Check if the storage is empty. + + Returns: + True if the storage is empty, False otherwise + """ + return len(self._storage) == 0 \ No newline at end of file diff --git a/time_based_storage/tests/test_rbtree.py b/time_based_storage/tests/test_rbtree.py new file mode 100644 index 0000000..46c7fa9 --- /dev/null +++ b/time_based_storage/tests/test_rbtree.py @@ -0,0 +1,120 @@ +import unittest +from datetime import datetime, timedelta +import time +import threading +from time_based_storage import ( + TimeBasedStorageRBTree, + ThreadSafeTimeBasedStorageRBTree, +) + + +class TestTimeBasedStorageRBTree(unittest.TestCase): + def setUp(self): + self.storage = TimeBasedStorageRBTree[str]() + self.now = datetime.now() + + def test_add_and_get(self): + self.storage.add(self.now, "test value") + self.assertEqual(self.storage.get_value_at(self.now), "test value") + + def test_range_query(self): + # Add some values + self.storage.add(self.now - timedelta(minutes=10), "value1") + self.storage.add(self.now - timedelta(minutes=5), "value2") + self.storage.add(self.now, "value3") + self.storage.add(self.now + timedelta(minutes=5), "value4") + + # Test range query + values = self.storage.get_range( + self.now - timedelta(minutes=7), + self.now + timedelta(minutes=1) + ) + self.assertEqual(set(values), {"value2", "value3"}) + + def test_duplicate_timestamp(self): + self.storage.add(self.now, "value1") + with self.assertRaises(ValueError): + self.storage.add(self.now, "value2") + + def test_add_unique_timestamp(self): + # Add first value + ts1 = self.storage.add_unique_timestamp(self.now, "value1") + self.assertEqual(ts1, self.now) + + # Add second value with same timestamp + ts2 = self.storage.add_unique_timestamp(self.now, "value2") + self.assertNotEqual(ts1, ts2) + + # Verify both values are stored + self.assertEqual(self.storage.size(), 2) + + def test_duration_query(self): + # Add values with timestamps in the past + old_time = datetime.now() - timedelta(seconds=10) + self.storage.add(old_time, "old value") + + # Add recent value + recent_time = datetime.now() - timedelta(seconds=1) + self.storage.add(recent_time, "recent value") + + # Get values from last 5 seconds + values = self.storage.get_duration(5) + self.assertIn("recent value", values) + self.assertNotIn("old value", values) + + +class TestThreadSafeTimeBasedStorageRBTree(unittest.TestCase): + def setUp(self): + self.storage = ThreadSafeTimeBasedStorageRBTree[str]() + self.now = datetime.now() + + def test_concurrent_add(self): + # Define a function to add values from a thread + def add_values(start_idx, count): + for i in range(start_idx, start_idx + count): + timestamp = self.now + timedelta(microseconds=i) + self.storage.add(timestamp, f"value{i}") + + # Create and start threads + threads = [] + for i in range(5): + t = threading.Thread(target=add_values, args=(i * 100, 100)) + threads.append(t) + t.start() + + # Wait for all threads to complete + for t in threads: + t.join() + + # Verify all values were added + self.assertEqual(self.storage.size(), 500) + + def test_wait_for_data(self): + # Thread to add data after a delay + def delayed_add(): + time.sleep(0.1) + self.storage.add(self.now, "delayed value") + + # Start thread to add data with delay + t = threading.Thread(target=delayed_add) + t.start() + + # Wait for data to be available + result = self.storage.wait_for_data(timeout=1.0) + t.join() + + # Verify data is available and wait was successful + self.assertTrue(result) + self.assertEqual(self.storage.get_value_at(self.now), "delayed value") + + def test_wait_timeout(self): + # Wait with a short timeout when no data is available + result = self.storage.wait_for_data(timeout=0.01) + + # Verify wait timed out + self.assertFalse(result) + self.assertTrue(self.storage.is_empty()) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From 80e34218814ac7db58f42dbd010f8eb90785dccb Mon Sep 17 00:00:00 2001 From: John Burbridge Date: Sun, 23 Mar 2025 00:34:27 -0700 Subject: [PATCH 2/2] style: apply Black formatting to RB-Tree implementation files --- .../concurrent/thread_safe_rbtree.py | 4 ++-- .../src/time_based_storage/core/rbtree.py | 8 ++++---- time_based_storage/tests/test_rbtree.py | 19 ++++++++----------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py b/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py index bcfca2d..fb2926d 100644 --- a/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py +++ b/time_based_storage/src/time_based_storage/concurrent/thread_safe_rbtree.py @@ -10,7 +10,7 @@ class ThreadSafeTimeBasedStorageRBTree(TimeBasedStorageRBTree[T], Generic[T]): """ Thread-safe implementation of TimeBasedStorageRBTree using Python's threading module. This implementation provides safe concurrent access to the Red-Black Tree storage using read-write locks. - + Benefits over the standard ThreadSafeTimeBasedStorage: - Better insertion performance: O(log n) vs O(n) - Better range query performance: O(log n + k) vs O(n) where k is the number of items in range @@ -173,4 +173,4 @@ def notify_data_available(self) -> None: This is automatically called by add() and add_unique_timestamp(). """ with self._lock: - self._condition.notify_all() \ No newline at end of file + self._condition.notify_all() diff --git a/time_based_storage/src/time_based_storage/core/rbtree.py b/time_based_storage/src/time_based_storage/core/rbtree.py index dd60d1e..f8246d4 100644 --- a/time_based_storage/src/time_based_storage/core/rbtree.py +++ b/time_based_storage/src/time_based_storage/core/rbtree.py @@ -11,14 +11,14 @@ class TimeBasedStorageRBTree(Generic[T]): Red-Black Tree implementation of time-based storage. This implementation uses SortedDict from sortedcontainers which provides Red-Black Tree performance characteristics. - + This is a non-thread-safe implementation suitable for single-threaded use. - + Compared to the base TimeBasedStorage: - Better insertion performance: O(log n) vs O(n) - Equivalent lookup performance: O(1) - Better range query performance: O(log n + k) vs O(n) where k is the number of items in range - + Note: Timestamps have microsecond precision (0.000001 seconds). When adding items rapidly, consider using add_unique_timestamp() to avoid collisions. @@ -158,4 +158,4 @@ def is_empty(self) -> bool: Returns: True if the storage is empty, False otherwise """ - return len(self._storage) == 0 \ No newline at end of file + return len(self._storage) == 0 diff --git a/time_based_storage/tests/test_rbtree.py b/time_based_storage/tests/test_rbtree.py index 46c7fa9..19184a6 100644 --- a/time_based_storage/tests/test_rbtree.py +++ b/time_based_storage/tests/test_rbtree.py @@ -23,12 +23,9 @@ def test_range_query(self): self.storage.add(self.now - timedelta(minutes=5), "value2") self.storage.add(self.now, "value3") self.storage.add(self.now + timedelta(minutes=5), "value4") - + # Test range query - values = self.storage.get_range( - self.now - timedelta(minutes=7), - self.now + timedelta(minutes=1) - ) + values = self.storage.get_range(self.now - timedelta(minutes=7), self.now + timedelta(minutes=1)) self.assertEqual(set(values), {"value2", "value3"}) def test_duplicate_timestamp(self): @@ -40,11 +37,11 @@ def test_add_unique_timestamp(self): # Add first value ts1 = self.storage.add_unique_timestamp(self.now, "value1") self.assertEqual(ts1, self.now) - + # Add second value with same timestamp ts2 = self.storage.add_unique_timestamp(self.now, "value2") self.assertNotEqual(ts1, ts2) - + # Verify both values are stored self.assertEqual(self.storage.size(), 2) @@ -52,11 +49,11 @@ def test_duration_query(self): # Add values with timestamps in the past old_time = datetime.now() - timedelta(seconds=10) self.storage.add(old_time, "old value") - + # Add recent value recent_time = datetime.now() - timedelta(seconds=1) self.storage.add(recent_time, "recent value") - + # Get values from last 5 seconds values = self.storage.get_duration(5) self.assertIn("recent value", values) @@ -110,11 +107,11 @@ def delayed_add(): def test_wait_timeout(self): # Wait with a short timeout when no data is available result = self.storage.wait_for_data(timeout=0.01) - + # Verify wait timed out self.assertFalse(result) self.assertTrue(self.storage.is_empty()) if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()