diff --git a/btrdb/endpoint.py b/btrdb/endpoint.py index 79a3b72..a28dfa6 100644 --- a/btrdb/endpoint.py +++ b/btrdb/endpoint.py @@ -27,8 +27,10 @@ import io import typing import uuid +import grpc +import asyncio -from btrdb.exceptions import BTrDBError, check_proto_stat, error_handler +from btrdb.exceptions import BTrDBError, check_proto_stat, error_handler, handle_grpc_error from btrdb.grpcinterface import btrdb_pb2, btrdb_pb2_grpc from btrdb.point import RawPoint from btrdb.utils.general import unpack_stream_descriptor @@ -221,8 +223,7 @@ def setStreamTags(self, uu, expected, tags, collection): result = self.stub.SetStreamTags(params) check_proto_stat(result.stat) - @error_handler - def create(self, uu, collection, tags, annotations): + def _create_params(self, uu, collection, tags, annotations): tagkvlist = [] for k, v in tags.items(): kv = btrdb_pb2.KeyOptValue(key=k, val=btrdb_pb2.OptValue(value=v)) @@ -231,12 +232,42 @@ def create(self, uu, collection, tags, annotations): for k, v in annotations.items(): kv = btrdb_pb2.KeyOptValue(key=k, val=btrdb_pb2.OptValue(value=v)) annkvlist.append(kv) - params = btrdb_pb2.CreateParams( + return btrdb_pb2.CreateParams( uuid=uu.bytes, collection=collection, tags=tagkvlist, annotations=annkvlist ) + + @error_handler + def create(self, uu, collection, tags, annotations): + params = self._create_params(uu, collection, tags, annotations) result = self.stub.Create(params) check_proto_stat(result.stat) + async def create_async(self, uu, collection, tags, annotations): + loop = asyncio.get_running_loop() + params = self._create_params(uu, collection, tags, annotations) + fut = self.stub.Create.future(params) + async_fut = loop.create_future() + + def async_done_cb(async_fut): + if async_fut.cancelled(): + fut.cancel() + + def done_cb(fut): + if fut.cancelled(): + return + try: + try: # XXX it would be nice to avoid this double try nesting. + result = fut.result() + loop.call_soon_threadsafe(async_fut.set_result, result) + except grpc.RpcError as e: + handle_grpc_error(e) + except Exception as e: + loop.call_soon_threadsafe(async_fut.set_exception, e) + + fut.add_done_callback(done_cb) + async_fut.add_done_callback(async_done_cb) + return await async_fut + @error_handler def listCollections(self, prefix): """ diff --git a/btrdb/exceptions.py b/btrdb/exceptions.py index 9777679..9cdc25f 100644 --- a/btrdb/exceptions.py +++ b/btrdb/exceptions.py @@ -100,7 +100,6 @@ def wrap(*args, **kwargs): return wrap - ########################################################################## ## gRPC error handling ########################################################################## @@ -148,7 +147,6 @@ def check_proto_stat(stat): raise BTRDBServerError(stat.msg) raise BTrDBError(stat.msg) - ########################################################################## ## BTrDB Exceptions ########################################################################## diff --git a/btrdb/transformers.py b/btrdb/transformers.py index 9bcf7b5..f01787f 100644 --- a/btrdb/transformers.py +++ b/btrdb/transformers.py @@ -138,18 +138,13 @@ def arrow_to_series(streamset, agg="mean", name_callable=None): return [arrow_df[col] for col in arrow_df] -def arrow_to_dataframe( - streamset, columns=None, agg=None, name_callable=None -) -> pd.DataFrame: +def arrow_to_dataframe(streamset, agg=None, name_callable=None) -> pd.DataFrame: """ Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column. Parameters ---------- - columns: sequence - column names to use for DataFrame. Deprecated and not compatible with name_callable. - agg : List[str], default: ["mean"] Specify the StatPoint fields (e.g. aggregating function) to create the dataframe from. Must be one or more of "min", "mean", "max", "count", "stddev", or "all". This @@ -175,13 +170,6 @@ def arrow_to_dataframe( raise ImportError( f"Please install Pandas and pyarrow to use this transformation function. ErrorMessage: {err}" ) - # deprecation warning added in v5.8 - if columns: - warn( - "the columns argument is deprecated and will be removed in a future release", - DeprecationWarning, - stacklevel=2, - ) if agg is None: agg = ["mean"] @@ -227,16 +215,13 @@ def arrow_to_dataframe( return tmp.to_pandas(date_as_object=False, types_mapper=pd.ArrowDtype) -def to_dataframe(streamset, columns=None, agg="mean", name_callable=None): +def to_dataframe(streamset, agg="mean", name_callable=None): """ Returns a Pandas DataFrame object indexed by time and using the values of a stream for each column. Parameters ---------- - columns: sequence - column names to use for DataFrame. Deprecated and not compatible with name_callable. - agg : str, default: "mean" Specify the StatPoint field (e.g. aggregating function) to create the Series from. Must be one of "min", "mean", "max", "count", "stddev", or "all". This @@ -253,14 +238,6 @@ def to_dataframe(streamset, columns=None, agg="mean", name_callable=None): except ImportError: raise ImportError("Please install Pandas to use this transformation function.") - # deprecation warning added in v5.8 - if columns: - warn( - "the columns argument is deprecated and will be removed in a future release", - DeprecationWarning, - stacklevel=2, - ) - # TODO: allow this at some future point if agg == "all" and name_callable is not None: raise AttributeError( @@ -288,7 +265,7 @@ def to_dataframe(streamset, columns=None, agg="mean", name_callable=None): ] df.columns = pd.MultiIndex.from_tuples(stream_names) else: - df.columns = columns if columns else _stream_names(streamset, name_callable) + df.columns = _stream_names(streamset, name_callable) return df diff --git a/pyproject.toml b/pyproject.toml index 80f656d..d3c9f94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,6 +4,9 @@ version = "5.31.0" authors = [ {name="PingThingsIO", email="support@pingthings.io"}, ] +maintainers = [ + {name="PingThingsIO", email="support@pingthings.io"}, +] description = "Bindings to interact with the Berkeley Tree Database using gRPC." readme = "README.md" license = {file="LICENSE.txt"} @@ -65,8 +68,7 @@ all = [ ] [project.urls] -"Homepage" = "https://btrdb.io" -"Docs" = "https://btrdb.readthedocs.io" +"Docs" = "https://btrdb-python.readthedocs.io/" "Repository" = "https://github.com/pingthingsio/btrdb-python.git" [build-system] diff --git a/setup.cfg b/setup.cfg index 3f445b3..8d1166f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] -description-file = DESCRIPTION.md -license_file = LICENSE.txt +description_file = DESCRIPTION.md +license_files = LICENSE.txt [aliases] test=pytest diff --git a/setup.py b/setup.py index a5b82d1..df6ad76 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ REPOSITORY = "https://github.com/PingThingsIO/btrdb-python" PACKAGE = "btrdb" URL = "http://btrdb.io/" -DOCS_URL = "https://btrdb.readthedocs.io/en/latest/" +DOCS_URL = "https://btrdb-python.readthedocs.io/" ## Define the keywords KEYWORDS = ("btrdb", "berkeley", "timeseries", "database", "bindings" "gRPC") @@ -133,7 +133,6 @@ def get_description_type(path=PKG_DESCRIBE): "license": LICENSE, "author": AUTHOR, "author_email": EMAIL, - "url": URL, "maintainer": MAINTAINER, "maintainer_email": EMAIL, "project_urls": { diff --git a/tests/btrdb/test_transformers.py b/tests/btrdb/test_transformers.py index 833de69..c7ce719 100644 --- a/tests/btrdb/test_transformers.py +++ b/tests/btrdb/test_transformers.py @@ -601,7 +601,7 @@ def test_to_series(self, streamset): def test_to_series_name_lambda(self, streamset): """ - assert to_dateframe uses name lambda + assert to_series uses name lambda """ result = streamset.to_series(name_callable=lambda s: s.name) assert [s.name for s in result] == ["stream0", "stream1", "stream2", "stream3"] @@ -691,23 +691,16 @@ def test_to_dataframe(self, streamset): df.set_index("time", inplace=True) assert to_dataframe(streamset).equals(df) - def test_to_dataframe_column_issues_warning(self, statpoint_streamset): + def test_to_dataframe_column_issues_error(self, statpoint_streamset): """ - assert to_dateframe with column argument issues warning + assert to_dateframe with column argument issues error """ columns = ["test/cats", "test/dogs", "test/horses", "test/fishes"] - with pytest.deprecated_call(): + with pytest.raises(TypeError) as unexpected_key_err: statpoint_streamset.to_dataframe(columns=columns) - - def test_to_dataframe_column(self, statpoint_streamset): - """ - assert to_dateframe with column argument actually renames columns - """ - columns = ["test/cats", "test/dogs", "test/horses", "test/fishes"] - with pytest.deprecated_call(): - df = statpoint_streamset.to_dataframe(columns=columns) - - assert df.columns.tolist() == columns + assert "got an unexpected keyword argument 'columns'" in str( + unexpected_key_err.value + ) def test_to_dataframe_multindex(self, statpoint_streamset): """