diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ac977e..f0d5598 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ compression for Flight queries: - `disable_grpc_compression` parameter in `InfluxDBClient3` constructor - `INFLUX_DISABLE_GRPC_COMPRESSION` environment variable support in `from_env()` +1. [#180](https://github.com/InfluxCommunity/influxdb3-python/pull/180): Add `flush()` method to `InfluxDBClient3`: + - Allows flushing the write buffer without closing the client when using batching mode. + - Enables applications to ensure data is written before querying, while keeping the client open for further operations. ### Bug Fixes diff --git a/influxdb_client_3/__init__.py b/influxdb_client_3/__init__.py index 3f763b5..1f58b8b 100644 --- a/influxdb_client_3/__init__.py +++ b/influxdb_client_3/__init__.py @@ -660,6 +660,19 @@ def get_server_version(self) -> str: return version + def flush(self): + """ + Flush any buffered writes to InfluxDB without closing the client. + + This method immediately sends all buffered data points to the server + when using batching write mode. After flushing, the client remains + open and ready for more writes. + + For synchronous write mode, this is a no-op since data is written + immediately. + """ + self._write_api.flush() + def close(self): """Close the client and clean up resources.""" self._write_api.close() diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 9c601eb..6fe6783 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -287,31 +287,9 @@ def __init__(self, self._retry_callback = kwargs.get('retry_callback', None) if self._write_options.write_type is WriteType.batching: - # Define Subject that listen incoming data and produces writes into InfluxDB - self._subject = Subject() - - self._disposable = self._subject.pipe( - # Split incoming data to windows by batch_size or flush_interval - ops.window_with_time_or_count(count=write_options.batch_size, - timespan=timedelta(milliseconds=write_options.flush_interval)), - # Map window into groups defined by 'organization', 'bucket' and 'precision' - ops.flat_map(lambda window: window.pipe( - # Group window by 'organization', 'bucket' and 'precision' - ops.group_by(lambda batch_item: batch_item.key), - # Create batch (concatenation line protocols by \n) - ops.map(lambda group: group.pipe( - ops.to_iterable(), - ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), - ops.merge_all())), - # Write data into InfluxDB (possibility to retry if its fail) - ops.filter(lambda batch: batch.size > 0), - ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())), - ops.merge_all()) \ - .subscribe(self._on_next, self._on_error, self._on_complete) - + self._subject, self._disposable = self._create_batching_pipeline() else: - self._subject = None - self._disposable = None + self._subject, self._disposable = None, None if self._write_options.write_type is WriteType.asynchronous: message = """The 'WriteType.asynchronous' is deprecated and will be removed in future major version. @@ -426,14 +404,88 @@ def write_payload(payload): return results[0] return results + def _create_batching_pipeline(self) -> tuple[Subject[Any], rx.abc.DisposableBase]: + """Create the batching pipeline for collecting and writing data.""" + # Define Subject that listen incoming data and produces writes into InfluxDB + subject = Subject() + + disposable = subject.pipe( + # Split incoming data to windows by batch_size or flush_interval + ops.window_with_time_or_count(count=self._write_options.batch_size, + timespan=timedelta(milliseconds=self._write_options.flush_interval)), + # Map window into groups defined by 'organization', 'bucket' and 'precision' + ops.flat_map(lambda window: window.pipe( # type: ignore + # Group window by 'organization', 'bucket' and 'precision' + ops.group_by(lambda batch_item: batch_item.key), # type: ignore + # Create batch (concatenation line protocols by \n) + ops.map(lambda group: group.pipe( # type: ignore + ops.to_iterable(), + ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), # type: ignore + ops.merge_all())), + # Write data into InfluxDB (possibility to retry if its fail) + ops.filter(lambda batch: batch.size > 0), + ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())), + ops.merge_all()) \ + .subscribe(self._on_next, self._on_error, self._on_complete) + + return subject, disposable + def flush(self): - """Flush data.""" - # TODO - pass + """ + Flush any buffered writes to InfluxDB without closing the client. + + This method immediately sends all buffered data points to the server + when using batching write mode. After flushing, the client remains + open and ready for more writes. + + For synchronous or asynchronous write modes, this is a no-op since + data is written immediately. + """ + if self._write_options.write_type is not WriteType.batching: + return # Nothing to flush for synchronous/asynchronous writes + + self.close() # Close existing batching pipeline + + # Recreate the batching pipeline for continued use + self._subject, self._disposable = self._create_batching_pipeline() def close(self): """Flush data and dispose a batching buffer.""" - self.__del__() + if self._subject is None: + return # Already closed + + self._subject.on_completed() + self._subject.dispose() + self._subject = None + + """ + We impose a maximum wait time to ensure that we do not cause a deadlock if the + background thread has exited abnormally + + Each iteration waits 100ms, but sleep expects the unit to be seconds so convert + the maximum wait time to seconds. + + We keep a counter of how long we've waited + """ + max_wait_time = self._write_options.max_close_wait / 1000 + waited = 0 + sleep_period = 0.1 + + # Wait for writing to finish + while not self._disposable.is_disposed: + sleep(sleep_period) + waited += sleep_period + + # Have we reached the upper limit? + if waited >= max_wait_time: + logger.warning( + "Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing", + max_wait_time + ) + break + + if self._disposable: + self._disposable = None def __enter__(self): """ @@ -452,40 +504,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __del__(self): """Close WriteApi.""" - if self._subject: - self._subject.on_completed() - self._subject.dispose() - self._subject = None - - """ - We impose a maximum wait time to ensure that we do not cause a deadlock if the - background thread has exited abnormally - - Each iteration waits 100ms, but sleep expects the unit to be seconds so convert - the maximum wait time to seconds. - - We keep a counter of how long we've waited - """ - max_wait_time = self._write_options.max_close_wait / 1000 - waited = 0 - sleep_period = 0.1 - - # Wait for writing to finish - while not self._disposable.is_disposed: - sleep(sleep_period) - waited += sleep_period - - # Have we reached the upper limit? - if waited >= max_wait_time: - logger.warning( - "Reached max_close_wait (%s seconds) waiting for batches to finish writing. Force closing", - max_wait_time - ) - break - - if self._disposable: - self._disposable = None - pass + self.close() def _write_batching(self, bucket, org, data, precision=None, diff --git a/tests/test_flush.py b/tests/test_flush.py new file mode 100644 index 0000000..962729c --- /dev/null +++ b/tests/test_flush.py @@ -0,0 +1,86 @@ +"""Tests for the flush() method in InfluxDBClient3 and WriteApi.""" +import unittest +from unittest.mock import MagicMock, patch + +from influxdb_client_3 import InfluxDBClient3, WriteOptions, write_client_options, WriteType + + +class TestFlushMethod(unittest.TestCase): + """Test cases for the flush() method.""" + + def test_flush_sends_buffered_data_and_allows_continued_writes(self): + """Test that flush() sends pending data and allows continued writes.""" + write_count = 0 + + def success_callback(conf, data): + nonlocal write_count + write_count += 1 + + write_options = WriteOptions( + write_type=WriteType.batching, + batch_size=1000, + flush_interval=60_000, + max_close_wait=5_000 + ) + + wc_opts = write_client_options( + success_callback=success_callback, + write_options=write_options + ) + + with patch('influxdb_client_3.write_client.client.write_api.WriteApi._post_write') as mock_post: + mock_post.return_value = MagicMock() + + client = InfluxDBClient3( + host="http://localhost:8086", + token="my-token", + database="my-db", + write_client_options=wc_opts + ) + + try: + # Write data, flush, write more, flush again + for i in range(5): + client.write(f"test,tag=value field={i}i") + client.flush() + + for i in range(5): + client.write(f"test,tag=value field={i}i") + client.flush() + + # Both batches should have been flushed + self.assertEqual(2, write_count) + self.assertEqual(2, mock_post.call_count) + + # Verify that all 10 data points (5 per batch) were sent + for call in mock_post.call_args_list: + args, kwargs = call + body = kwargs.get('body') or args[3] + if isinstance(body, bytes): + body = body.decode('utf-8') + for i in range(5): + self.assertIn(f"test,tag=value field={i}i", body) + finally: + client.close() + + def test_flush_is_safe_in_synchronous_mode_and_after_close(self): + """Test that flush() doesn't crash in sync mode or after close.""" + # Test synchronous mode + sync_opts = write_client_options(write_options=WriteOptions(write_type=WriteType.synchronous)) + with patch('influxdb_client_3.write_client.client.write_api.WriteApi._post_write'): + client = InfluxDBClient3(host="http://localhost:8086", token="t", database="db", + write_client_options=sync_opts) + client.flush() # Should not raise + client.close() + + # Test flush after close in batching mode + batch_opts = write_client_options(write_options=WriteOptions(write_type=WriteType.batching)) + with patch('influxdb_client_3.write_client.client.write_api.WriteApi._post_write'): + client = InfluxDBClient3(host="http://localhost:8086", token="t", database="db", + write_client_options=batch_opts) + client.close() + client.flush() # Should not raise + + +if __name__ == '__main__': + unittest.main()