Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion time_based_storage/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pytest>=7.0.0
black>=23.0.0
flake8>=6.0.0
pytest-cov>=4.0.0
codecov>=2.1.12
codecov>=2.1.12
sortedcontainers>=2.4.0
12 changes: 10 additions & 2 deletions time_based_storage/src/time_based_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,28 @@
Core (non-thread-safe) implementations:
- TimeBasedStorage: Basic time-based storage using a dictionary
- TimeBasedStorageHeap: Heap-based implementation for efficient range queries
- TimeBasedStorageRBTree: Red-Black Tree implementation for efficient insertion and range queries

Concurrent (thread-safe) implementations:
- ThreadSafeTimeBasedStorage: Thread-safe wrapper around TimeBasedStorage
- ThreadSafeTimeBasedStorageHeap: Thread-safe wrapper around TimeBasedStorageHeap
- ThreadSafeTimeBasedStorageRBTree: Thread-safe wrapper around TimeBasedStorageRBTree
"""

from .core import TimeBasedStorage, TimeBasedStorageHeap
from .concurrent import ThreadSafeTimeBasedStorage, ThreadSafeTimeBasedStorageHeap
from .core import TimeBasedStorage, TimeBasedStorageHeap, TimeBasedStorageRBTree
from .concurrent import (
ThreadSafeTimeBasedStorage,
ThreadSafeTimeBasedStorageHeap,
ThreadSafeTimeBasedStorageRBTree,
)

__all__ = [
# Core implementations
"TimeBasedStorage",
"TimeBasedStorageHeap",
"TimeBasedStorageRBTree",
# Concurrent implementations
"ThreadSafeTimeBasedStorage",
"ThreadSafeTimeBasedStorageHeap",
"ThreadSafeTimeBasedStorageRBTree",
]
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@

from .thread_safe import ThreadSafeTimeBasedStorage
from .thread_safe_heap import ThreadSafeTimeBasedStorageHeap
from .thread_safe_rbtree import ThreadSafeTimeBasedStorageRBTree

__all__ = ["ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap"]
__all__ = ["ThreadSafeTimeBasedStorage", "ThreadSafeTimeBasedStorageHeap", "ThreadSafeTimeBasedStorageRBTree"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
from typing import List, Optional, TypeVar, Generic
from datetime import datetime
import threading
from ..core import TimeBasedStorageRBTree

T = TypeVar("T")


class ThreadSafeTimeBasedStorageRBTree(TimeBasedStorageRBTree[T], Generic[T]):
"""
Thread-safe implementation of TimeBasedStorageRBTree using Python's threading module.
This implementation provides safe concurrent access to the Red-Black Tree storage using read-write locks.

Benefits over the standard ThreadSafeTimeBasedStorage:
- Better insertion performance: O(log n) vs O(n)
- Better range query performance: O(log n + k) vs O(n) where k is the number of items in range
"""

def __init__(self):
super().__init__()
# Read-write lock for thread-safe operations
self._lock = threading.RLock()
# Condition variable for waiting on events
self._condition = threading.Condition(self._lock)

def add(self, timestamp: datetime, value: T) -> None:
"""
Thread-safe method to add a value with its timestamp.

Args:
timestamp: The timestamp of the value
value: The value to store
"""
with self._lock:
super().add(timestamp, value)
# Notify any waiting threads that new data is available
self._condition.notify_all()

def add_unique_timestamp(self, timestamp: datetime, value: T, max_offset_microseconds: int = 1000000) -> datetime:
"""
Thread-safe method to add a value with a guaranteed unique timestamp.

Args:
timestamp: The desired timestamp
value: The value to store
max_offset_microseconds: Maximum random offset to add (default: 1 second)

Returns:
The actual timestamp used (may be different from input if offset was added)
"""
with self._lock:
result = super().add_unique_timestamp(timestamp, value, max_offset_microseconds)
# Notify any waiting threads that new data is available
self._condition.notify_all()
return result

def get_range(self, start_time: datetime, end_time: datetime) -> List[T]:
"""
Thread-safe method to get all values within a time range.

Args:
start_time: Start of the time range
end_time: End of the time range

Returns:
List of values within the specified time range
"""
with self._lock:
return super().get_range(start_time, end_time)

def get_duration(self, duration: float) -> List[T]:
"""
Thread-safe method to get all values within the last duration seconds.

Args:
duration: Number of seconds to look back

Returns:
List of values within the specified duration
"""
with self._lock:
return super().get_duration(duration)

def clear(self) -> None:
"""Thread-safe method to clear all stored values."""
with self._lock:
super().clear()

def get_all(self) -> List[T]:
"""
Thread-safe method to get all stored values.

Returns:
List of all stored values
"""
with self._lock:
return super().get_all()

def get_timestamps(self) -> List[datetime]:
"""
Thread-safe method to get all stored timestamps.

Returns:
List of all stored timestamps
"""
with self._lock:
return super().get_timestamps()

def get_value_at(self, timestamp: datetime) -> Optional[T]:
"""
Thread-safe method to get the value at a specific timestamp.

Args:
timestamp: The timestamp to look up

Returns:
The value at the specified timestamp, or None if not found
"""
with self._lock:
return super().get_value_at(timestamp)

def remove(self, timestamp: datetime) -> bool:
"""
Thread-safe method to remove a value at a specific timestamp.

Args:
timestamp: The timestamp of the value to remove

Returns:
True if the value was removed, False if not found
"""
with self._lock:
return super().remove(timestamp)

def size(self) -> int:
"""
Thread-safe method to get the number of stored values.

Returns:
Number of stored values
"""
with self._lock:
return super().size()

def is_empty(self) -> bool:
"""
Thread-safe method to check if the storage is empty.

Returns:
True if the storage is empty, False otherwise
"""
with self._lock:
return super().is_empty()

def wait_for_data(self, timeout: float = None) -> bool:
"""
Wait for data to be available in the storage.

Args:
timeout: Maximum time to wait in seconds, or None to wait indefinitely

Returns:
True if data is available, False if timeout occurred
"""
with self._lock:
if not self.is_empty():
return True
return self._condition.wait(timeout=timeout)

def notify_data_available(self) -> None:
"""
Notify waiting threads that data is available.
This is automatically called by add() and add_unique_timestamp().
"""
with self._lock:
self._condition.notify_all()
3 changes: 2 additions & 1 deletion time_based_storage/src/time_based_storage/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@

from .base import TimeBasedStorage
from .heap import TimeBasedStorageHeap
from .rbtree import TimeBasedStorageRBTree

__all__ = ["TimeBasedStorage", "TimeBasedStorageHeap"]
__all__ = ["TimeBasedStorage", "TimeBasedStorageHeap", "TimeBasedStorageRBTree"]
161 changes: 161 additions & 0 deletions time_based_storage/src/time_based_storage/core/rbtree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from typing import List, Optional, TypeVar, Generic
from datetime import datetime, timedelta
from sortedcontainers import SortedDict
import random

T = TypeVar("T")


class TimeBasedStorageRBTree(Generic[T]):
"""
Red-Black Tree implementation of time-based storage.
This implementation uses SortedDict from sortedcontainers which provides
Red-Black Tree performance characteristics.

This is a non-thread-safe implementation suitable for single-threaded use.

Compared to the base TimeBasedStorage:
- Better insertion performance: O(log n) vs O(n)
- Equivalent lookup performance: O(1)
- Better range query performance: O(log n + k) vs O(n) where k is the number of items in range

Note:
Timestamps have microsecond precision (0.000001 seconds).
When adding items rapidly, consider using add_unique_timestamp() to avoid collisions.
"""

def __init__(self):
self._storage = SortedDict()

def add(self, timestamp: datetime, value: T) -> None:
"""
Add a value with its timestamp.

Args:
timestamp: The timestamp of the value
value: The value to store

Raises:
ValueError: If a value already exists at the given timestamp
"""
if timestamp in self._storage:
raise ValueError(f"Value already exists at timestamp {timestamp}")
self._storage[timestamp] = value

def add_unique_timestamp(self, timestamp: datetime, value: T, max_offset_microseconds: int = 1000000) -> datetime:
"""
Add a value with a guaranteed unique timestamp.
If the timestamp already exists, adds a random microsecond offset.

Args:
timestamp: The desired timestamp
value: The value to store
max_offset_microseconds: Maximum random offset to add (default: 1 second)

Returns:
The actual timestamp used (may be different from input if offset was added)
"""
if timestamp not in self._storage:
self._storage[timestamp] = value
return timestamp

# Add random offset in microseconds
offset = random.randint(0, max_offset_microseconds)
unique_timestamp = timestamp + timedelta(microseconds=offset)
self._storage[unique_timestamp] = value
return unique_timestamp

def get_range(self, start_time: datetime, end_time: datetime) -> List[T]:
"""
Get all values within a time range.
Efficiently uses SortedDict's irange method for optimized range queries.

Args:
start_time: Start of the time range
end_time: End of the time range

Returns:
List of values within the specified time range
"""
return [self._storage[ts] for ts in self._storage.irange(start_time, end_time)]

def get_duration(self, duration: float) -> List[T]:
"""
Get all values within the last duration seconds.

Args:
duration: Number of seconds to look back

Returns:
List of values within the specified duration
"""
now = datetime.now()
start_time = now.fromtimestamp(now.timestamp() - duration)
return self.get_range(start_time, now)

def clear(self) -> None:
"""Clear all stored values."""
self._storage.clear()

def get_all(self) -> List[T]:
"""
Get all stored values.

Returns:
List of all stored values
"""
return list(self._storage.values())

def get_timestamps(self) -> List[datetime]:
"""
Get all stored timestamps.

Returns:
List of all stored timestamps
"""
return list(self._storage.keys())

def get_value_at(self, timestamp: datetime) -> Optional[T]:
"""
Get the value at a specific timestamp.

Args:
timestamp: The timestamp to look up

Returns:
The value at the specified timestamp, or None if not found
"""
return self._storage.get(timestamp)

def remove(self, timestamp: datetime) -> bool:
"""
Remove a value at a specific timestamp.

Args:
timestamp: The timestamp of the value to remove

Returns:
True if the value was removed, False if not found
"""
if timestamp in self._storage:
del self._storage[timestamp]
return True
return False

def size(self) -> int:
"""
Get the number of stored values.

Returns:
Number of stored values
"""
return len(self._storage)

def is_empty(self) -> bool:
"""
Check if the storage is empty.

Returns:
True if the storage is empty, False otherwise
"""
return len(self._storage) == 0
Loading