From c64121de4d2d39a05200c7e32874ea8e24dae55b Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Wed, 24 Dec 2025 16:14:11 -0800 Subject: [PATCH 1/6] fix: move class var ThreadPoolExecutors to container --- .../database_dialect.py | 8 +- aws_advanced_python_wrapper/driver_dialect.py | 8 +- .../host_list_provider.py | 18 ++- .../host_monitoring_plugin.py | 7 +- .../mysql_driver_dialect.py | 5 +- aws_advanced_python_wrapper/plugin_service.py | 5 +- .../thread_pool_container.py | 112 ++++++++++++++++++ aws_advanced_python_wrapper/wrapper.py | 8 ++ tests/integration/container/conftest.py | 3 + 9 files changed, 153 insertions(+), 21 deletions(-) create mode 100644 aws_advanced_python_wrapper/thread_pool_container.py diff --git a/aws_advanced_python_wrapper/database_dialect.py b/aws_advanced_python_wrapper/database_dialect.py index 1dafd862f..2946faf5b 100644 --- a/aws_advanced_python_wrapper/database_dialect.py +++ b/aws_advanced_python_wrapper/database_dialect.py @@ -26,9 +26,10 @@ from aws_advanced_python_wrapper.pep249 import Connection from .driver_dialect import DriverDialect from .exception_handling import ExceptionHandler + from concurrent.futures import Executor from abc import ABC, abstractmethod -from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError +from concurrent.futures import TimeoutError from contextlib import closing from enum import Enum, auto @@ -44,6 +45,7 @@ PropertiesUtils, WrapperProperties) from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from .driver_dialect_codes import DriverDialectCodes from .utils.cache_map import CacheMap from .utils.messages import Messages @@ -638,7 +640,7 @@ class DatabaseDialectManager(DatabaseDialectProvider): _ENDPOINT_CACHE_EXPIRATION_NS = 30 * 60_000_000_000 # 30 minutes _known_endpoint_dialects: CacheMap[str, DialectCode] = CacheMap() _custom_dialect: Optional[DatabaseDialect] = None - _executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="DatabaseDialectManagerExecutor") + _executor_name: ClassVar[str] = "DatabaseDialectManagerExecutor" _known_dialects_by_code: Dict[DialectCode, DatabaseDialect] = { DialectCode.MYSQL: MysqlDatabaseDialect(), DialectCode.RDS_MYSQL: RdsMysqlDialect(), @@ -776,7 +778,7 @@ def query_for_dialect(self, url: str, host_info: Optional[HostInfo], conn: Conne timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get(self._props) try: cursor_execute_func_with_timeout = preserve_transaction_status_with_timeout( - DatabaseDialectManager._executor, + ThreadPoolContainer.get_thread_pool(DatabaseDialectManager._executor_name), timeout_sec, driver_dialect, conn)(dialect_candidate.is_dialect) diff --git a/aws_advanced_python_wrapper/driver_dialect.py b/aws_advanced_python_wrapper/driver_dialect.py index c9df891a8..636851f3b 100644 --- a/aws_advanced_python_wrapper/driver_dialect.py +++ b/aws_advanced_python_wrapper/driver_dialect.py @@ -19,13 +19,15 @@ if TYPE_CHECKING: from aws_advanced_python_wrapper.hostinfo import HostInfo from aws_advanced_python_wrapper.pep249 import Connection, Cursor + from concurrent.futures import Executor from abc import ABC -from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError +from concurrent.futures import TimeoutError from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes from aws_advanced_python_wrapper.errors import (QueryTimeoutError, UnsupportedOperationError) +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from aws_advanced_python_wrapper.utils.decorators import timeout from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, @@ -40,7 +42,7 @@ class DriverDialect(ABC): _QUERY = "SELECT 1" _ALL_METHODS = "*" - _executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="DriverDialectExecutor") + _executor_name: ClassVar[str] = "DriverDialectExecutor" _dialect_code: str = DriverDialectCodes.GENERIC _network_bound_methods: Set[str] = {_ALL_METHODS} _read_only: bool = False @@ -136,7 +138,7 @@ def execute( if exec_timeout > 0: try: - execute_with_timeout = timeout(DriverDialect._executor, exec_timeout)(exec_func) + execute_with_timeout = timeout(ThreadPoolContainer.get_thread_pool(DriverDialect._executor_name), exec_timeout)(exec_func) return execute_with_timeout() except TimeoutError as e: raise QueryTimeoutError(Messages.get_formatted("DriverDialect.ExecuteTimeout", method_name)) from e diff --git a/aws_advanced_python_wrapper/host_list_provider.py b/aws_advanced_python_wrapper/host_list_provider.py index 9d7138173..a85572b87 100644 --- a/aws_advanced_python_wrapper/host_list_provider.py +++ b/aws_advanced_python_wrapper/host_list_provider.py @@ -16,7 +16,7 @@ import uuid from abc import ABC, abstractmethod -from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError +from concurrent.futures import TimeoutError from contextlib import closing from dataclasses import dataclass from datetime import datetime @@ -41,6 +41,7 @@ ProgrammingError) from aws_advanced_python_wrapper.utils.cache_map import CacheMap from aws_advanced_python_wrapper.utils.log import Logger +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) @@ -148,8 +149,6 @@ class RdsHostListProvider(DynamicHostListProvider, HostListProvider): # cluster IDs so that connections to the same clusters can share topology info. _cluster_ids_to_update: CacheMap[str, str] = CacheMap() - _executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="RdsHostListProviderExecutor") - def __init__(self, host_list_provider_service: HostListProviderService, props: Properties, topology_utils: TopologyUtils): self._host_list_provider_service: HostListProviderService = host_list_provider_service self._props: Properties = props @@ -425,6 +424,8 @@ class TopologyUtils(ABC): to various database engine deployments (e.g. Aurora, Multi-AZ, etc.). """ + _executor_name: ClassVar[str] = "TopologyUtils" + def __init__(self, dialect: db_dialect.TopologyAwareDatabaseDialect, props: Properties): self._dialect: db_dialect.TopologyAwareDatabaseDialect = dialect self._rds_utils = RdsUtils() @@ -487,7 +488,7 @@ def query_for_topology( an empty tuple will be returned. """ query_for_topology_func_with_timeout = preserve_transaction_status_with_timeout( - RdsHostListProvider._executor, self._max_timeout, driver_dialect, conn)(self._query_for_topology) + ThreadPoolContainer.get_thread_pool(self._executor_name), self._max_timeout, driver_dialect, conn)(self._query_for_topology) return query_for_topology_func_with_timeout(conn) @abstractmethod @@ -549,7 +550,7 @@ def create_host( def get_host_role(self, connection: Connection, driver_dialect: DriverDialect) -> HostRole: try: cursor_execute_func_with_timeout = preserve_transaction_status_with_timeout( - RdsHostListProvider._executor, self._max_timeout, driver_dialect, connection)(self._get_host_role) + ThreadPoolContainer.get_thread_pool(self._executor_name), self._max_timeout, driver_dialect, connection)(self._get_host_role) result = cursor_execute_func_with_timeout(connection) if result is not None: is_reader = result[0] @@ -572,7 +573,7 @@ def get_host_id(self, connection: Connection, driver_dialect: DriverDialect) -> """ cursor_execute_func_with_timeout = preserve_transaction_status_with_timeout( - RdsHostListProvider._executor, self._max_timeout, driver_dialect, connection)(self._get_host_id) + ThreadPoolContainer.get_thread_pool(self._executor_name), self._max_timeout, driver_dialect, connection)(self._get_host_id) result = cursor_execute_func_with_timeout(connection) if result: host_id: str = result[0] @@ -586,6 +587,9 @@ def _get_host_id(self, conn: Connection): class AuroraTopologyUtils(TopologyUtils): + + _executor_name: ClassVar[str] = "AuroraTopologyUtils" + def _query_for_topology(self, conn: Connection) -> Optional[Tuple[HostInfo, ...]]: """ Query the database for topology information. @@ -636,6 +640,8 @@ def _process_query_results(self, cursor: Cursor) -> Tuple[HostInfo, ...]: class MultiAzTopologyUtils(TopologyUtils): + + _executor_name: ClassVar[str] = "MultiAzTopologyUtils" def __init__( self, dialect: db_dialect.TopologyAwareDatabaseDialect, diff --git a/aws_advanced_python_wrapper/host_monitoring_plugin.py b/aws_advanced_python_wrapper/host_monitoring_plugin.py index 415a0a9ec..35f2d637c 100644 --- a/aws_advanced_python_wrapper/host_monitoring_plugin.py +++ b/aws_advanced_python_wrapper/host_monitoring_plugin.py @@ -48,6 +48,7 @@ from aws_advanced_python_wrapper.utils.telemetry.telemetry import ( TelemetryCounter, TelemetryTraceLevel) from aws_advanced_python_wrapper.utils.utils import QueueUtils +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer logger = Logger(__name__) @@ -565,7 +566,7 @@ class MonitoringThreadContainer: _monitor_map: ConcurrentDict[str, Monitor] = ConcurrentDict() _tasks_map: ConcurrentDict[Monitor, Future] = ConcurrentDict() - _executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="MonitoringThreadContainerExecutor") + _executor_name: ClassVar[str] = "MonitoringThreadContainerExecutor" # This logic ensures that this class is a Singleton def __new__(cls, *args, **kwargs): @@ -650,10 +651,6 @@ def _release_resources(self): self._tasks_map.clear() - # Reset the executor. - self._executor.shutdown(wait=False) - MonitoringThreadContainer._executor = ThreadPoolExecutor(thread_name_prefix="MonitoringThreadContainerExecutor") - class MonitorService: def __init__(self, plugin_service: PluginService): diff --git a/aws_advanced_python_wrapper/mysql_driver_dialect.py b/aws_advanced_python_wrapper/mysql_driver_dialect.py index dd7055c55..99c293756 100644 --- a/aws_advanced_python_wrapper/mysql_driver_dialect.py +++ b/aws_advanced_python_wrapper/mysql_driver_dialect.py @@ -20,6 +20,7 @@ from aws_advanced_python_wrapper.hostinfo import HostInfo from aws_advanced_python_wrapper.pep249 import Connection +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError from inspect import signature @@ -55,7 +56,7 @@ class MySQLDriverDialect(DriverDialect): AUTH_METHOD = "mysql_clear_password" IS_CLOSED_TIMEOUT_SEC = 3 - _executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="MySQLDriverDialectExecutor") + _executor_name: ClassVar[str] = "MySQLDriverDialectExecutor" _dialect_code: str = DriverDialectCodes.MYSQL_CONNECTOR_PYTHON _network_bound_methods: Set[str] = { @@ -94,7 +95,7 @@ def is_closed(self, conn: Connection) -> bool: if self.can_execute_query(conn): socket_timeout = WrapperProperties.SOCKET_TIMEOUT_SEC.get_float(self._props) timeout_sec = socket_timeout if socket_timeout > 0 else MySQLDriverDialect.IS_CLOSED_TIMEOUT_SEC - is_connected_with_timeout = timeout(MySQLDriverDialect._executor, timeout_sec)(conn.is_connected) # type: ignore + is_connected_with_timeout = timeout(ThreadPoolContainer.get_thread_pool(MySQLDriverDialect._executor_name), timeout_sec)(conn.is_connected) # type: ignore try: return not is_connected_with_timeout() diff --git a/aws_advanced_python_wrapper/plugin_service.py b/aws_advanced_python_wrapper/plugin_service.py index fbf909e11..a81890e2e 100644 --- a/aws_advanced_python_wrapper/plugin_service.py +++ b/aws_advanced_python_wrapper/plugin_service.py @@ -89,6 +89,7 @@ from aws_advanced_python_wrapper.utils.decorators import \ preserve_transaction_status_with_timeout from aws_advanced_python_wrapper.utils.log import Logger +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.notifications import ( ConnectionEvent, HostEvent, OldConnectionSuggestedAction) @@ -314,7 +315,7 @@ class PluginServiceImpl(PluginService, HostListProviderService, CanReleaseResour _host_availability_expiring_cache: CacheMap[str, HostAvailability] = CacheMap() _status_cache: ClassVar[CacheMap[str, Any]] = CacheMap() - _executor: ClassVar[Executor] = ThreadPoolExecutor(thread_name_prefix="PluginServiceImplExecutor") + _executor_name: ClassVar[str] = "PluginServiceImplExecutor" def __init__( self, @@ -611,7 +612,7 @@ def fill_aliases(self, connection: Optional[Connection] = None, host_info: Optio try: timeout_sec = WrapperProperties.AUXILIARY_QUERY_TIMEOUT_SEC.get(self._props) cursor_execute_func_with_timeout = preserve_transaction_status_with_timeout( - PluginServiceImpl._executor, timeout_sec, driver_dialect, connection)(self._fill_aliases) + ThreadPoolContainer.get_thread_pool(PluginServiceImpl._executor_name), timeout_sec, driver_dialect, connection)(self._fill_aliases) cursor_execute_func_with_timeout(connection, host_info) except TimeoutError as e: raise QueryTimeoutError(Messages.get("PluginServiceImpl.FillAliasesTimeout")) from e diff --git a/aws_advanced_python_wrapper/thread_pool_container.py b/aws_advanced_python_wrapper/thread_pool_container.py new file mode 100644 index 000000000..83102c6cf --- /dev/null +++ b/aws_advanced_python_wrapper/thread_pool_container.py @@ -0,0 +1,112 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, Optional, List +import threading + + +class ThreadPoolContainer: + """ + A container class for managing multiple named thread pools. + Provides static methods for getting, creating, and releasing thread pools. + """ + + _pools: Dict[str, ThreadPoolExecutor] = {} + _lock: threading.Lock = threading.Lock() + _default_max_workers: Optional[int] = None # Uses Python's default + + @classmethod + def get_thread_pool( + cls, + name: str, + max_workers: Optional[int] = None + ) -> ThreadPoolExecutor: + """ + Get an existing thread pool or create a new one if it doesn't exist. + + Args: + name: Unique identifier for the thread pool + max_workers: Max worker threads (only used when creating new pool) + If None, uses Python's default: min(32, os.cpu_count() + 4) + + Returns: + ThreadPoolExecutor instance + """ + with cls._lock: + if name not in cls._pools: + workers = max_workers or cls._default_max_workers + cls._pools[name] = ThreadPoolExecutor( + max_workers=workers, + thread_name_prefix=name + ) + return cls._pools[name] + + @classmethod + def release_resources(cls) -> None: + """ + Shutdown all thread pools and release resources. + + Args: + wait: If True, wait for all pending tasks to complete + """ + with cls._lock: + for name, pool in cls._pools.items(): + try: + pool.shutdown(wait=wait) + except Exception as e: + print(f"Error shutting down pool '{name}': {e}") + cls._pools.clear() + + @classmethod + def release_pool(cls, name: str, wait: bool = True) -> bool: + """ + Release a specific thread pool by name. + + Args: + name: The name of the thread pool to release + wait: If True, wait for pending tasks to complete + + Returns: + True if pool was found and released, False otherwise + """ + with cls._lock: + if name in cls._pools: + cls._pools[name].shutdown(wait=wait) + del cls._pools[name] + return True + return False + + @classmethod + def has_pool(cls, name: str) -> bool: + """Check if a pool with the given name exists.""" + with cls._lock: + return name in cls._pools + + @classmethod + def get_pool_names(cls) -> List[str]: + """Get a list of all active pool names.""" + with cls._lock: + return list(cls._pools.keys()) + + @classmethod + def get_pool_count(cls) -> int: + """Get the number of active pools.""" + with cls._lock: + return len(cls._pools) + + @classmethod + def set_default_max_workers(cls, max_workers: Optional[int]) -> None: + """Set the default max workers for new pools.""" + cls._default_max_workers = max_workers \ No newline at end of file diff --git a/aws_advanced_python_wrapper/wrapper.py b/aws_advanced_python_wrapper/wrapper.py index e04d293f8..29be81969 100644 --- a/aws_advanced_python_wrapper/wrapper.py +++ b/aws_advanced_python_wrapper/wrapper.py @@ -24,11 +24,13 @@ DriverDialectManager from aws_advanced_python_wrapper.errors import (AwsWrapperError, FailoverSuccessError) +from aws_advanced_python_wrapper.host_monitoring_plugin import MonitoringThreadContainer from aws_advanced_python_wrapper.pep249 import Connection, Cursor, Error from aws_advanced_python_wrapper.plugin import CanReleaseResources from aws_advanced_python_wrapper.plugin_service import ( PluginManager, PluginService, PluginServiceImpl, PluginServiceManagerContainer) +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, @@ -326,3 +328,9 @@ def __enter__(self: AwsWrapperCursor) -> AwsWrapperCursor: def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.close() + +class Wrapper: + @staticmethod + def release_resources() -> None: + MonitoringThreadContainer.clean_up() + ThreadPoolContainer.release_resources() diff --git a/tests/integration/container/conftest.py b/tests/integration/container/conftest.py index 2e23eeba3..207b9533c 100644 --- a/tests/integration/container/conftest.py +++ b/tests/integration/container/conftest.py @@ -31,6 +31,8 @@ from aws_advanced_python_wrapper.host_monitoring_plugin import \ MonitoringThreadContainer from aws_advanced_python_wrapper.plugin_service import PluginServiceImpl +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils @@ -140,6 +142,7 @@ def pytest_runtest_setup(item): CustomEndpointPlugin._monitors.clear() CustomEndpointMonitor._custom_endpoint_info_cache.clear() MonitoringThreadContainer.clean_up() + ThreadPoolContainer.release_resources() ConnectionProviderManager.reset_provider() DatabaseDialectManager.reset_custom_dialect() From 5a38d2935a5a636af4f1cf9b7c1ae00ac8819cd0 Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Tue, 30 Dec 2025 14:58:11 -0800 Subject: [PATCH 2/6] fix custom endpoint plugin stop event --- .../custom_endpoint_plugin.py | 9 +- .../database_dialect.py | 4 +- aws_advanced_python_wrapper/driver_dialect.py | 4 +- .../host_list_provider.py | 6 +- .../host_monitoring_plugin.py | 10 +- .../mysql_driver_dialect.py | 8 +- aws_advanced_python_wrapper/plugin_service.py | 5 +- .../thread_pool_container.py | 24 ++-- aws_advanced_python_wrapper/wrapper.py | 7 +- tests/unit/test_monitor_service.py | 19 ++-- .../unit/test_monitoring_thread_container.py | 19 ++-- tests/unit/test_thread_pool_container.py | 103 ++++++++++++++++++ 12 files changed, 164 insertions(+), 54 deletions(-) create mode 100644 tests/unit/test_thread_pool_container.py diff --git a/aws_advanced_python_wrapper/custom_endpoint_plugin.py b/aws_advanced_python_wrapper/custom_endpoint_plugin.py index 03b672a6e..8e490e029 100644 --- a/aws_advanced_python_wrapper/custom_endpoint_plugin.py +++ b/aws_advanced_python_wrapper/custom_endpoint_plugin.py @@ -169,7 +169,8 @@ def _run(self): len(endpoints), endpoint_hostnames) - sleep(self._refresh_rate_ns / 1_000_000_000) + if self._stop_event.wait(self._refresh_rate_ns / 1_000_000_000): + break continue endpoint_info = CustomEndpointInfo.from_db_cluster_endpoint(endpoints[0]) @@ -178,7 +179,8 @@ def _run(self): if cached_info is not None and cached_info == endpoint_info: elapsed_time = perf_counter_ns() - start_ns sleep_duration = max(0, self._refresh_rate_ns - elapsed_time) - sleep(sleep_duration / 1_000_000_000) + if self._stop_event.wait(sleep_duration / 1_000_000_000): + break continue logger.debug( @@ -196,7 +198,8 @@ def _run(self): elapsed_time = perf_counter_ns() - start_ns sleep_duration = max(0, self._refresh_rate_ns - elapsed_time) - sleep(sleep_duration / 1_000_000_000) + if self._stop_event.wait(sleep_duration / 1_000_000_000): + break continue except InterruptedError as e: raise e diff --git a/aws_advanced_python_wrapper/database_dialect.py b/aws_advanced_python_wrapper/database_dialect.py index 2946faf5b..e6f4b973a 100644 --- a/aws_advanced_python_wrapper/database_dialect.py +++ b/aws_advanced_python_wrapper/database_dialect.py @@ -26,7 +26,6 @@ from aws_advanced_python_wrapper.pep249 import Connection from .driver_dialect import DriverDialect from .exception_handling import ExceptionHandler - from concurrent.futures import Executor from abc import ABC, abstractmethod from concurrent.futures import TimeoutError @@ -38,6 +37,8 @@ from aws_advanced_python_wrapper.host_list_provider import ( ConnectionStringHostListProvider, RdsHostListProvider) from aws_advanced_python_wrapper.hostinfo import HostInfo +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.decorators import \ preserve_transaction_status_with_timeout from aws_advanced_python_wrapper.utils.log import Logger @@ -45,7 +46,6 @@ PropertiesUtils, WrapperProperties) from aws_advanced_python_wrapper.utils.rdsutils import RdsUtils -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from .driver_dialect_codes import DriverDialectCodes from .utils.cache_map import CacheMap from .utils.messages import Messages diff --git a/aws_advanced_python_wrapper/driver_dialect.py b/aws_advanced_python_wrapper/driver_dialect.py index 636851f3b..9722cf940 100644 --- a/aws_advanced_python_wrapper/driver_dialect.py +++ b/aws_advanced_python_wrapper/driver_dialect.py @@ -19,7 +19,6 @@ if TYPE_CHECKING: from aws_advanced_python_wrapper.hostinfo import HostInfo from aws_advanced_python_wrapper.pep249 import Connection, Cursor - from concurrent.futures import Executor from abc import ABC from concurrent.futures import TimeoutError @@ -27,7 +26,8 @@ from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes from aws_advanced_python_wrapper.errors import (QueryTimeoutError, UnsupportedOperationError) -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.decorators import timeout from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, diff --git a/aws_advanced_python_wrapper/host_list_provider.py b/aws_advanced_python_wrapper/host_list_provider.py index a85572b87..81409443f 100644 --- a/aws_advanced_python_wrapper/host_list_provider.py +++ b/aws_advanced_python_wrapper/host_list_provider.py @@ -39,9 +39,10 @@ from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole from aws_advanced_python_wrapper.pep249 import (Connection, Cursor, ProgrammingError) +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.cache_map import CacheMap from aws_advanced_python_wrapper.utils.log import Logger -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) @@ -587,7 +588,7 @@ def _get_host_id(self, conn: Connection): class AuroraTopologyUtils(TopologyUtils): - + _executor_name: ClassVar[str] = "AuroraTopologyUtils" def _query_for_topology(self, conn: Connection) -> Optional[Tuple[HostInfo, ...]]: @@ -642,6 +643,7 @@ def _process_query_results(self, cursor: Cursor) -> Tuple[HostInfo, ...]: class MultiAzTopologyUtils(TopologyUtils): _executor_name: ClassVar[str] = "MultiAzTopologyUtils" + def __init__( self, dialect: db_dialect.TopologyAwareDatabaseDialect, diff --git a/aws_advanced_python_wrapper/host_monitoring_plugin.py b/aws_advanced_python_wrapper/host_monitoring_plugin.py index 35f2d637c..ea607d661 100644 --- a/aws_advanced_python_wrapper/host_monitoring_plugin.py +++ b/aws_advanced_python_wrapper/host_monitoring_plugin.py @@ -22,8 +22,7 @@ from aws_advanced_python_wrapper.pep249 import Connection from aws_advanced_python_wrapper.plugin_service import PluginService -from concurrent.futures import (Executor, Future, ThreadPoolExecutor, - TimeoutError) +from concurrent.futures import Future, TimeoutError from dataclasses import dataclass from queue import Queue from threading import Event, Lock, RLock @@ -36,6 +35,8 @@ from aws_advanced_python_wrapper.host_availability import HostAvailability from aws_advanced_python_wrapper.plugin import (CanReleaseResources, Plugin, PluginFactory) +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.concurrent import ConcurrentDict from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.messages import Messages @@ -48,7 +49,6 @@ from aws_advanced_python_wrapper.utils.telemetry.telemetry import ( TelemetryCounter, TelemetryTraceLevel) from aws_advanced_python_wrapper.utils.utils import QueueUtils -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer logger = Logger(__name__) @@ -594,7 +594,9 @@ def _get_or_create_monitor(_) -> Monitor: if supplied_monitor is None: raise AwsWrapperError(Messages.get("MonitoringThreadContainer.SupplierMonitorNone")) self._tasks_map.compute_if_absent( - supplied_monitor, lambda _: MonitoringThreadContainer._executor.submit(supplied_monitor.run)) + supplied_monitor, + lambda _: ThreadPoolContainer.get_thread_pool(MonitoringThreadContainer._executor_name) + .submit(supplied_monitor.run)) return supplied_monitor if monitor is None: diff --git a/aws_advanced_python_wrapper/mysql_driver_dialect.py b/aws_advanced_python_wrapper/mysql_driver_dialect.py index 99c293756..d777cb4a1 100644 --- a/aws_advanced_python_wrapper/mysql_driver_dialect.py +++ b/aws_advanced_python_wrapper/mysql_driver_dialect.py @@ -20,13 +20,14 @@ from aws_advanced_python_wrapper.hostinfo import HostInfo from aws_advanced_python_wrapper.pep249 import Connection -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer -from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError +from concurrent.futures import TimeoutError from inspect import signature from aws_advanced_python_wrapper.driver_dialect import DriverDialect from aws_advanced_python_wrapper.driver_dialect_codes import DriverDialectCodes from aws_advanced_python_wrapper.errors import UnsupportedOperationError +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.decorators import timeout from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, @@ -95,7 +96,8 @@ def is_closed(self, conn: Connection) -> bool: if self.can_execute_query(conn): socket_timeout = WrapperProperties.SOCKET_TIMEOUT_SEC.get_float(self._props) timeout_sec = socket_timeout if socket_timeout > 0 else MySQLDriverDialect.IS_CLOSED_TIMEOUT_SEC - is_connected_with_timeout = timeout(ThreadPoolContainer.get_thread_pool(MySQLDriverDialect._executor_name), timeout_sec)(conn.is_connected) # type: ignore + is_connected_with_timeout = timeout( + ThreadPoolContainer.get_thread_pool(MySQLDriverDialect._executor_name), timeout_sec)(conn.is_connected) # type: ignore try: return not is_connected_with_timeout() diff --git a/aws_advanced_python_wrapper/plugin_service.py b/aws_advanced_python_wrapper/plugin_service.py index a81890e2e..fa1879ab1 100644 --- a/aws_advanced_python_wrapper/plugin_service.py +++ b/aws_advanced_python_wrapper/plugin_service.py @@ -41,7 +41,7 @@ from aws_advanced_python_wrapper.plugin import Plugin, PluginFactory from abc import abstractmethod -from concurrent.futures import Executor, ThreadPoolExecutor, TimeoutError +from concurrent.futures import TimeoutError from contextlib import closing from typing import (Any, Callable, Dict, FrozenSet, Optional, Protocol, Set, Tuple) @@ -85,11 +85,12 @@ from aws_advanced_python_wrapper.simple_read_write_splitting_plugin import \ SimpleReadWriteSplittingPluginFactory from aws_advanced_python_wrapper.stale_dns_plugin import StaleDnsPluginFactory +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.cache_map import CacheMap from aws_advanced_python_wrapper.utils.decorators import \ preserve_transaction_status_with_timeout from aws_advanced_python_wrapper.utils.log import Logger -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.notifications import ( ConnectionEvent, HostEvent, OldConnectionSuggestedAction) diff --git a/aws_advanced_python_wrapper/thread_pool_container.py b/aws_advanced_python_wrapper/thread_pool_container.py index 83102c6cf..8b64fe8e7 100644 --- a/aws_advanced_python_wrapper/thread_pool_container.py +++ b/aws_advanced_python_wrapper/thread_pool_container.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from concurrent.futures import ThreadPoolExecutor -from typing import Dict, Optional, List import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, List, Optional class ThreadPoolContainer: @@ -22,25 +22,25 @@ class ThreadPoolContainer: A container class for managing multiple named thread pools. Provides static methods for getting, creating, and releasing thread pools. """ - + _pools: Dict[str, ThreadPoolExecutor] = {} _lock: threading.Lock = threading.Lock() _default_max_workers: Optional[int] = None # Uses Python's default @classmethod def get_thread_pool( - cls, - name: str, + cls, + name: str, max_workers: Optional[int] = None ) -> ThreadPoolExecutor: """ Get an existing thread pool or create a new one if it doesn't exist. - + Args: name: Unique identifier for the thread pool max_workers: Max worker threads (only used when creating new pool) If None, uses Python's default: min(32, os.cpu_count() + 4) - + Returns: ThreadPoolExecutor instance """ @@ -57,14 +57,14 @@ def get_thread_pool( def release_resources(cls) -> None: """ Shutdown all thread pools and release resources. - + Args: wait: If True, wait for all pending tasks to complete """ with cls._lock: for name, pool in cls._pools.items(): try: - pool.shutdown(wait=wait) + pool.shutdown(wait=False) except Exception as e: print(f"Error shutting down pool '{name}': {e}") cls._pools.clear() @@ -73,11 +73,11 @@ def release_resources(cls) -> None: def release_pool(cls, name: str, wait: bool = True) -> bool: """ Release a specific thread pool by name. - + Args: name: The name of the thread pool to release wait: If True, wait for pending tasks to complete - + Returns: True if pool was found and released, False otherwise """ @@ -109,4 +109,4 @@ def get_pool_count(cls) -> int: @classmethod def set_default_max_workers(cls, max_workers: Optional[int]) -> None: """Set the default max workers for new pools.""" - cls._default_max_workers = max_workers \ No newline at end of file + cls._default_max_workers = max_workers diff --git a/aws_advanced_python_wrapper/wrapper.py b/aws_advanced_python_wrapper/wrapper.py index 29be81969..d8d17623d 100644 --- a/aws_advanced_python_wrapper/wrapper.py +++ b/aws_advanced_python_wrapper/wrapper.py @@ -24,13 +24,15 @@ DriverDialectManager from aws_advanced_python_wrapper.errors import (AwsWrapperError, FailoverSuccessError) -from aws_advanced_python_wrapper.host_monitoring_plugin import MonitoringThreadContainer +from aws_advanced_python_wrapper.host_monitoring_plugin import \ + MonitoringThreadContainer from aws_advanced_python_wrapper.pep249 import Connection, Cursor, Error from aws_advanced_python_wrapper.plugin import CanReleaseResources from aws_advanced_python_wrapper.plugin_service import ( PluginManager, PluginService, PluginServiceImpl, PluginServiceManagerContainer) -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, @@ -329,6 +331,7 @@ def __enter__(self: AwsWrapperCursor) -> AwsWrapperCursor: def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.close() + class Wrapper: @staticmethod def release_resources() -> None: diff --git a/tests/unit/test_monitor_service.py b/tests/unit/test_monitor_service.py index ed8e743bd..40a0fd169 100644 --- a/tests/unit/test_monitor_service.py +++ b/tests/unit/test_monitor_service.py @@ -46,15 +46,8 @@ def mock_monitor(mocker): @pytest.fixture -def mock_executor(mocker): - return mocker.MagicMock() - - -@pytest.fixture -def thread_container(mock_executor): - container = MonitoringThreadContainer() - MonitoringThreadContainer._executor = mock_executor - return container +def thread_container(): + return MonitoringThreadContainer() @pytest.fixture @@ -99,16 +92,20 @@ def test_start_monitoring( assert aliases == monitor_service_mocked_container._cached_monitor_aliases -def test_start_monitoring__multiple_calls(monitor_service_with_container, mock_monitor, mock_executor, mock_conn): +def test_start_monitoring__multiple_calls(monitor_service_with_container, mock_monitor, mock_conn, mocker): aliases = frozenset({"instance-1"}) + # Mock ThreadPoolContainer.get_thread_pool + mock_thread_pool = mocker.MagicMock() + mocker.patch('aws_advanced_python_wrapper.host_monitoring_plugin.ThreadPoolContainer.get_thread_pool', return_value=mock_thread_pool) + num_calls = 5 for _ in range(num_calls): monitor_service_with_container.start_monitoring( mock_conn, aliases, HostInfo("instance-1"), Properties(), 5000, 1000, 3) assert num_calls == mock_monitor.start_monitoring.call_count - mock_executor.submit.assert_called_once_with(mock_monitor.run) + mock_thread_pool.submit.assert_called_once_with(mock_monitor.run) assert mock_monitor == monitor_service_with_container._cached_monitor() assert aliases == monitor_service_with_container._cached_monitor_aliases diff --git a/tests/unit/test_monitoring_thread_container.py b/tests/unit/test_monitoring_thread_container.py index cfd9e1558..7c7c2a690 100644 --- a/tests/unit/test_monitoring_thread_container.py +++ b/tests/unit/test_monitoring_thread_container.py @@ -20,15 +20,8 @@ @pytest.fixture -def container(mock_executor): - container = MonitoringThreadContainer() - MonitoringThreadContainer._executor = mock_executor - return container - - -@pytest.fixture -def mock_executor(mocker): - return mocker.MagicMock() +def container(): + return MonitoringThreadContainer() @pytest.fixture @@ -72,12 +65,16 @@ def release_container(): def test_get_or_create_monitor__monitor_created( - container, mock_monitor_supplier, mock_stopped_monitor, mock_monitor1, mock_executor, mock_future): + container, mock_monitor_supplier, mock_stopped_monitor, mock_monitor1, mock_future, mocker): + mock_thread_pool = mocker.MagicMock() + mock_thread_pool.submit.return_value = mock_future + mocker.patch('aws_advanced_python_wrapper.host_monitoring_plugin.ThreadPoolContainer.get_thread_pool', return_value=mock_thread_pool) + result = container.get_or_create_monitor(frozenset({"alias-1", "alias-2"}), mock_monitor_supplier) assert mock_monitor1 == result mock_monitor_supplier.assert_called_once() - mock_executor.submit.assert_called_once_with(mock_monitor1.run) + mock_thread_pool.submit.assert_called_once_with(mock_monitor1.run) assert mock_monitor1 == container._monitor_map.get("alias-1") assert mock_monitor1 == container._monitor_map.get("alias-2") diff --git a/tests/unit/test_thread_pool_container.py b/tests/unit/test_thread_pool_container.py new file mode 100644 index 000000000..5f4d415c4 --- /dev/null +++ b/tests/unit/test_thread_pool_container.py @@ -0,0 +1,103 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer + + +@pytest.fixture(autouse=True) +def cleanup_pools(): + """Clean up all pools after each test""" + yield + ThreadPoolContainer.release_resources() + + +def test_get_thread_pool_creates_new_pool(): + pool = ThreadPoolContainer.get_thread_pool("test_pool") + assert isinstance(pool, ThreadPoolExecutor) + assert ThreadPoolContainer.has_pool("test_pool") + + +def test_get_thread_pool_returns_existing_pool(): + pool1 = ThreadPoolContainer.get_thread_pool("test_pool") + pool2 = ThreadPoolContainer.get_thread_pool("test_pool") + assert pool1 is pool2 + + +def test_get_thread_pool_with_max_workers(): + pool = ThreadPoolContainer.get_thread_pool("test_pool", max_workers=5) + assert pool._max_workers == 5 + + +def test_has_pool(): + assert not ThreadPoolContainer.has_pool("nonexistent") + ThreadPoolContainer.get_thread_pool("test_pool") + assert ThreadPoolContainer.has_pool("test_pool") + + +def test_get_pool_names(): + assert ThreadPoolContainer.get_pool_names() == [] + ThreadPoolContainer.get_thread_pool("pool1") + ThreadPoolContainer.get_thread_pool("pool2") + names = ThreadPoolContainer.get_pool_names() + assert "pool1" in names + assert "pool2" in names + assert len(names) == 2 + + +def test_get_pool_count(): + assert ThreadPoolContainer.get_pool_count() == 0 + ThreadPoolContainer.get_thread_pool("pool1") + assert ThreadPoolContainer.get_pool_count() == 1 + ThreadPoolContainer.get_thread_pool("pool2") + assert ThreadPoolContainer.get_pool_count() == 2 + + +def test_release_pool(): + ThreadPoolContainer.get_thread_pool("test_pool") + assert ThreadPoolContainer.has_pool("test_pool") + + result = ThreadPoolContainer.release_pool("test_pool") + assert result is True + assert not ThreadPoolContainer.has_pool("test_pool") + + +def test_release_nonexistent_pool(): + result = ThreadPoolContainer.release_pool("nonexistent") + assert result is False + + +def test_release_resources(): + ThreadPoolContainer.get_thread_pool("pool1") + ThreadPoolContainer.get_thread_pool("pool2") + assert ThreadPoolContainer.get_pool_count() == 2 + + ThreadPoolContainer.release_resources() + assert ThreadPoolContainer.get_pool_count() == 0 + + +def test_set_default_max_workers(): + ThreadPoolContainer.set_default_max_workers(10) + pool = ThreadPoolContainer.get_thread_pool("test_pool") + assert pool._max_workers == 10 + + +def test_thread_name_prefix(): + pool = ThreadPoolContainer.get_thread_pool("custom_name") + # Check that the thread name prefix is set correctly + assert pool._thread_name_prefix == "custom_name" From 8c0259091cbb0484aa8e5775dbfe18612db28395 Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Tue, 30 Dec 2025 17:07:31 -0800 Subject: [PATCH 3/6] add docs and add fixes for release resources --- .../host_monitoring_plugin.py | 5 ++-- .../thread_pool_container.py | 4 +-- .../UsingThePythonDriver.md | 28 +++++++++++++++++++ .../UsingTheHostMonitoringPlugin.md | 4 +-- tests/integration/container/conftest.py | 2 +- tests/unit/test_monitor.py | 3 ++ 6 files changed, 38 insertions(+), 8 deletions(-) diff --git a/aws_advanced_python_wrapper/host_monitoring_plugin.py b/aws_advanced_python_wrapper/host_monitoring_plugin.py index ea607d661..254516cde 100644 --- a/aws_advanced_python_wrapper/host_monitoring_plugin.py +++ b/aws_advanced_python_wrapper/host_monitoring_plugin.py @@ -26,7 +26,7 @@ from dataclasses import dataclass from queue import Queue from threading import Event, Lock, RLock -from time import perf_counter_ns, sleep +from time import perf_counter_ns from typing import Any, Callable, ClassVar, Dict, FrozenSet, Optional, Set from _weakref import ReferenceType, ref @@ -549,9 +549,8 @@ def _execute_conn_check(self, conn: Connection, timeout_sec: float): driver_dialect.execute("Cursor.execute", lambda: cursor.execute(query), query, exec_timeout=timeout_sec) cursor.fetchone() - # Used to help with testing def sleep(self, duration: int): - sleep(duration) + self._is_stopped.wait(duration) class MonitoringThreadContainer: diff --git a/aws_advanced_python_wrapper/thread_pool_container.py b/aws_advanced_python_wrapper/thread_pool_container.py index 8b64fe8e7..dcbbb28d5 100644 --- a/aws_advanced_python_wrapper/thread_pool_container.py +++ b/aws_advanced_python_wrapper/thread_pool_container.py @@ -54,7 +54,7 @@ def get_thread_pool( return cls._pools[name] @classmethod - def release_resources(cls) -> None: + def release_resources(cls, wait=False) -> None: """ Shutdown all thread pools and release resources. @@ -64,7 +64,7 @@ def release_resources(cls) -> None: with cls._lock: for name, pool in cls._pools.items(): try: - pool.shutdown(wait=False) + pool.shutdown(wait=wait) except Exception as e: print(f"Error shutting down pool '{name}': {e}") cls._pools.clear() diff --git a/docs/using-the-python-driver/UsingThePythonDriver.md b/docs/using-the-python-driver/UsingThePythonDriver.md index 9bb19cd6d..427249451 100644 --- a/docs/using-the-python-driver/UsingThePythonDriver.md +++ b/docs/using-the-python-driver/UsingThePythonDriver.md @@ -44,6 +44,34 @@ These parameters are applicable to any instance of the AWS Advanced Python Drive | tcp_keepalive_interval | Number of seconds to wait before sending additional keepalive probes after the initial probe has been sent. | False | None | | tcp_keepalive_probes | Number of keepalive probes to send before concluding that the connection is invalid. | False | None | +## Resource Management + +The AWS Advanced Python Wrapper creates background threads and thread pools for various plugins during operations such as host monitoring and connection management. To ensure proper cleanup and prevent resource leaks, it's important to release these resources when your application shuts down. + +### Cleaning Up Resources + +Call the following methods before your application terminates: + +```python +from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer + +try: + # Your application code here + conn = AwsWrapperConnection.connect(...) + # ... use connection +finally: + # Clean up all resources before application exit + ThreadPoolContainer.release_resources() +``` + +> [!IMPORTANT] +> Always call both `AwsWrapperConnection.release_resources()` and `ThreadPoolContainer.release_resources()` at application shutdown to ensure: +> - All monitoring threads are properly terminated +> - Thread pools are shut down gracefully +> - No resource leaks occur +> - The application exits cleanly without hanging + ## Plugins The AWS Advanced Python Driver uses plugins to execute database API calls. You can think of a plugin as an extensible code module that adds extra logic around any database API calls. The AWS Advanced Python Driver has a number of [built-in plugins](#list-of-available-plugins) available for use. diff --git a/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md b/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md index a1a19ad4c..2f71b9f02 100644 --- a/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md +++ b/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md @@ -21,8 +21,8 @@ This plugin only works with drivers that support aborting connections from a sep > [IMPORTANT]\ > The Host Monitoring Plugin creates monitoring threads in the background to monitor all connections established to each cluster instance. The monitoring threads can be cleaned up in two ways: > 1. If there are no connections to the cluster instance the thread is monitoring for over a period of time, the Host Monitoring Plugin will automatically terminate the thread. This period of time is adjustable via the `monitor_disposal_time_ms` parameter. -> 2. Client applications can manually call `MonitoringThreadContainer.clean_up()` to clean up any dangling resources. -> It is best practice to call `MonitoringThreadContainer.clean_up()` at the end of the application to ensure a graceful exit; otherwise, the application may wait until the `monitor_disposal_time_ms` has been passed before terminating. This is because the Python driver waits for all daemon threads to complete before exiting. +> 2. Client applications can manually call `Wrapper.release_resources()` to clean up any dangling resources. +> It is best practice to call `Wrapper.release_resources()` at the end of the application to ensure a graceful exit; otherwise, the application may wait until the `monitor_disposal_time_ms` has been passed before terminating. This is because the Python driver waits for all daemon threads to complete before exiting. > See [PGFailover](../../examples/PGFailover.py) for an example. ### Enhanced Failure Monitoring Parameters diff --git a/tests/integration/container/conftest.py b/tests/integration/container/conftest.py index 207b9533c..53c73c6f9 100644 --- a/tests/integration/container/conftest.py +++ b/tests/integration/container/conftest.py @@ -142,7 +142,7 @@ def pytest_runtest_setup(item): CustomEndpointPlugin._monitors.clear() CustomEndpointMonitor._custom_endpoint_info_cache.clear() MonitoringThreadContainer.clean_up() - ThreadPoolContainer.release_resources() + ThreadPoolContainer.release_resources(wait=True) ConnectionProviderManager.reset_provider() DatabaseDialectManager.reset_custom_dialect() diff --git a/tests/unit/test_monitor.py b/tests/unit/test_monitor.py index 174eec87c..831660e83 100644 --- a/tests/unit/test_monitor.py +++ b/tests/unit/test_monitor.py @@ -21,6 +21,8 @@ from aws_advanced_python_wrapper.host_monitoring_plugin import ( Monitor, MonitoringContext, MonitoringThreadContainer) from aws_advanced_python_wrapper.hostinfo import HostInfo +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) @@ -84,6 +86,7 @@ def release_container(): yield while MonitoringThreadContainer._instance is not None: MonitoringThreadContainer.clean_up() + ThreadPoolContainer.release_resources() @pytest.fixture From afdf1e1acc13ab99f98eed464f7b54ec53bfb187 Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Fri, 2 Jan 2026 15:15:18 -0800 Subject: [PATCH 4/6] Add Wrapper.release_resources() to examples --- docs/examples/MySQLFailover.py | 51 ++++++++++--------- docs/examples/MySQLFastestResponseStrategy.py | 3 ++ docs/examples/MySQLFederatedAuthentication.py | 4 ++ docs/examples/MySQLIamAuthentication.py | 4 ++ ...QLInternalConnectionPoolPasswordWarning.py | 4 ++ docs/examples/MySQLOktaAuthentication.py | 4 ++ docs/examples/MySQLReadWriteSplitting.py | 4 ++ docs/examples/MySQLSecretsManager.py | 4 ++ .../examples/MySQLSimpleReadWriteSplitting.py | 4 ++ docs/examples/PGFailover.py | 33 ++++++------ docs/examples/PGFastestResponseStrategy.py | 4 ++ docs/examples/PGFederatedAuthentication.py | 4 ++ docs/examples/PGIamAuthentication.py | 4 ++ ...PGInternalConnectionPoolPasswordWarning.py | 4 ++ docs/examples/PGLimitless.py | 4 ++ docs/examples/PGOktaAuthentication.py | 4 ++ docs/examples/PGOpenTelemetry.py | 4 ++ docs/examples/PGReadWriteSplitting.py | 4 ++ docs/examples/PGSecretsManager.py | 4 ++ docs/examples/PGSimpleReadWriteSplitting.py | 4 ++ docs/examples/PGXRayTelemetry.py | 4 ++ .../UsingThePythonDriver.md | 2 +- 22 files changed, 120 insertions(+), 41 deletions(-) diff --git a/docs/examples/MySQLFailover.py b/docs/examples/MySQLFailover.py index b7e3b67f4..591f186a1 100644 --- a/docs/examples/MySQLFailover.py +++ b/docs/examples/MySQLFailover.py @@ -25,6 +25,7 @@ from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) +from aws_advanced_python_wrapper.wrapper import Wrapper def configure_initial_session_states(conn: Connection): @@ -61,26 +62,30 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O if __name__ == "__main__": - with AwsWrapperConnection.connect( - mysql.connector.Connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - database="mysql", - user="admin", - password="pwd", - plugins="failover", - wrapper_dialect="aurora-mysql", - autocommit=True - ) as awsconn: - configure_initial_session_states(awsconn) - execute_queries_with_failover_handling( - awsconn, "CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") - execute_queries_with_failover_handling( - awsconn, "INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) - execute_queries_with_failover_handling( - awsconn, "INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200)) - - cursor = execute_queries_with_failover_handling(awsconn, "SELECT * FROM bank_test") - for record in cursor: - print(record) - - execute_queries_with_failover_handling(awsconn, "DROP TABLE bank_test") + + try: + with AwsWrapperConnection.connect( + mysql.connector.Connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + database="mysql", + user="admin", + password="pwd", + plugins="failover", + wrapper_dialect="aurora-mysql", + autocommit=True + ) as awsconn: + configure_initial_session_states(awsconn) + execute_queries_with_failover_handling( + awsconn, "CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") + execute_queries_with_failover_handling( + awsconn, "INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) + execute_queries_with_failover_handling( + awsconn, "INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200)) + + cursor = execute_queries_with_failover_handling(awsconn, "SELECT * FROM bank_test") + for record in cursor: + print(record) + + execute_queries_with_failover_handling(awsconn, "DROP TABLE bank_test") + finally: + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/MySQLFastestResponseStrategy.py b/docs/examples/MySQLFastestResponseStrategy.py index ad2058922..579b13da7 100644 --- a/docs/examples/MySQLFastestResponseStrategy.py +++ b/docs/examples/MySQLFastestResponseStrategy.py @@ -19,6 +19,7 @@ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": provider = SqlAlchemyPooledConnectionProvider() @@ -62,5 +63,7 @@ with conn.cursor() as teardown_cursor: teardown_cursor.execute("DROP TABLE bank_test") + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() # Closes all pools and removes all cached pool connections ConnectionProviderManager.release_resources() diff --git a/docs/examples/MySQLFederatedAuthentication.py b/docs/examples/MySQLFederatedAuthentication.py index 415b0f5e8..af7513a4a 100644 --- a/docs/examples/MySQLFederatedAuthentication.py +++ b/docs/examples/MySQLFederatedAuthentication.py @@ -15,6 +15,7 @@ import mysql.connector from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -37,3 +38,6 @@ res = awscursor.fetchone() print(res) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/MySQLIamAuthentication.py b/docs/examples/MySQLIamAuthentication.py index b440c6583..c7039f844 100644 --- a/docs/examples/MySQLIamAuthentication.py +++ b/docs/examples/MySQLIamAuthentication.py @@ -15,6 +15,7 @@ import mysql.connector from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -34,3 +35,6 @@ for record in awscursor: print(record) awscursor.execute("DROP TABLE bank_test") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py b/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py index 311d1ec23..482ae36ad 100644 --- a/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py +++ b/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py @@ -19,6 +19,7 @@ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": params = { @@ -62,3 +63,6 @@ pass except Exception: print("Failed to connect - password was incorrect") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/MySQLOktaAuthentication.py b/docs/examples/MySQLOktaAuthentication.py index b49664383..6fc6b69c0 100644 --- a/docs/examples/MySQLOktaAuthentication.py +++ b/docs/examples/MySQLOktaAuthentication.py @@ -15,6 +15,7 @@ import mysql.connector from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -36,3 +37,6 @@ res = awscursor.fetchone() print(res) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/MySQLReadWriteSplitting.py b/docs/examples/MySQLReadWriteSplitting.py index 227465382..40c6ffbee 100644 --- a/docs/examples/MySQLReadWriteSplitting.py +++ b/docs/examples/MySQLReadWriteSplitting.py @@ -30,6 +30,7 @@ TransactionResolutionUnknownError) from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider +from aws_advanced_python_wrapper.wrapper import Wrapper def configure_pool(host_info: HostInfo, props: Dict[str, Any]) -> Dict[str, Any]: @@ -142,3 +143,6 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O """ If connection pools were enabled, close them here """ ConnectionProviderManager.release_resources() + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/MySQLSecretsManager.py b/docs/examples/MySQLSecretsManager.py index 19368a7bb..e693514cc 100644 --- a/docs/examples/MySQLSecretsManager.py +++ b/docs/examples/MySQLSecretsManager.py @@ -17,6 +17,7 @@ import mysql.connector from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -30,3 +31,6 @@ cursor.execute("SELECT @@aurora_server_id") for record in cursor.fetchone(): print(record) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/MySQLSimpleReadWriteSplitting.py b/docs/examples/MySQLSimpleReadWriteSplitting.py index bf91e3c02..b44c70e98 100644 --- a/docs/examples/MySQLSimpleReadWriteSplitting.py +++ b/docs/examples/MySQLSimpleReadWriteSplitting.py @@ -25,6 +25,7 @@ from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) +from aws_advanced_python_wrapper.wrapper import Wrapper def configure_initial_session_states(conn: Connection): @@ -116,3 +117,6 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O finally: with AwsWrapperConnection.connect(mysql.connector.Connect, **params) as conn: execute_queries_with_failover_handling(conn, "DROP TABLE bank_test") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGFailover.py b/docs/examples/PGFailover.py index e1ece5a51..539c1c202 100644 --- a/docs/examples/PGFailover.py +++ b/docs/examples/PGFailover.py @@ -18,8 +18,7 @@ import psycopg -from aws_advanced_python_wrapper.host_monitoring_plugin import \ - MonitoringThreadContainer +from aws_advanced_python_wrapper.wrapper import Wrapper if TYPE_CHECKING: from aws_advanced_python_wrapper.pep249 import Connection @@ -69,18 +68,18 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O "monitoring-socket_timeout": 10 } - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - dbname="postgres", - user="john", - password="pwd", - plugins="failover,host_monitoring", - connect_timeout=30, - socket_timeout=30, - autocommit=True - ) as awsconn: - try: + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + dbname="postgres", + user="john", + password="pwd", + plugins="failover,host_monitoring", + connect_timeout=30, + socket_timeout=30, + autocommit=True + ) as awsconn: configure_initial_session_states(awsconn) execute_queries_with_failover_handling( awsconn, "CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") @@ -95,6 +94,6 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O print(record) execute_queries_with_failover_handling(awsconn, "DROP TABLE bank_test") - finally: - # Clean up any remaining resources created by the Host Monitoring Plugin. - MonitoringThreadContainer.clean_up() + finally: + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/PGFastestResponseStrategy.py b/docs/examples/PGFastestResponseStrategy.py index 0cb99d32d..5ac467423 100644 --- a/docs/examples/PGFastestResponseStrategy.py +++ b/docs/examples/PGFastestResponseStrategy.py @@ -19,6 +19,7 @@ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": provider = SqlAlchemyPooledConnectionProvider() @@ -64,3 +65,6 @@ # Closes all pools and removes all cached pool connections ConnectionProviderManager.release_resources() + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/PGFederatedAuthentication.py b/docs/examples/PGFederatedAuthentication.py index c718e091b..00a493b20 100644 --- a/docs/examples/PGFederatedAuthentication.py +++ b/docs/examples/PGFederatedAuthentication.py @@ -15,6 +15,7 @@ import psycopg from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -36,3 +37,6 @@ res = awscursor.fetchone() print(res) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() diff --git a/docs/examples/PGIamAuthentication.py b/docs/examples/PGIamAuthentication.py index cde493932..c574ee5c1 100644 --- a/docs/examples/PGIamAuthentication.py +++ b/docs/examples/PGIamAuthentication.py @@ -15,6 +15,7 @@ import psycopg from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -35,3 +36,6 @@ for record in res: print(record) awscursor.execute("DROP TABLE bank_test") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGInternalConnectionPoolPasswordWarning.py b/docs/examples/PGInternalConnectionPoolPasswordWarning.py index 4404ee313..ba0be1799 100644 --- a/docs/examples/PGInternalConnectionPoolPasswordWarning.py +++ b/docs/examples/PGInternalConnectionPoolPasswordWarning.py @@ -19,6 +19,7 @@ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": params = { @@ -62,3 +63,6 @@ pass except Exception: print("Failed to connect - password was incorrect") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGLimitless.py b/docs/examples/PGLimitless.py index a7787b1f4..143efd39e 100644 --- a/docs/examples/PGLimitless.py +++ b/docs/examples/PGLimitless.py @@ -15,6 +15,7 @@ import psycopg from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -30,3 +31,6 @@ res = awscursor.fetchone() print(res) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGOktaAuthentication.py b/docs/examples/PGOktaAuthentication.py index f1f5eb487..60c6306f9 100644 --- a/docs/examples/PGOktaAuthentication.py +++ b/docs/examples/PGOktaAuthentication.py @@ -15,6 +15,7 @@ import psycopg from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -36,3 +37,6 @@ res = awscursor.fetchone() print(res) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGOpenTelemetry.py b/docs/examples/PGOpenTelemetry.py index 02b2ed5d3..7f26c7bbe 100644 --- a/docs/examples/PGOpenTelemetry.py +++ b/docs/examples/PGOpenTelemetry.py @@ -30,6 +30,7 @@ from opentelemetry.sdk.trace.export import BatchSpanProcessor from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper SQL_DBLIST = "select datname from pg_database;" @@ -78,3 +79,6 @@ print(record) print("-- end of application") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGReadWriteSplitting.py b/docs/examples/PGReadWriteSplitting.py index 1e5b46d96..b35cc24bb 100644 --- a/docs/examples/PGReadWriteSplitting.py +++ b/docs/examples/PGReadWriteSplitting.py @@ -30,6 +30,7 @@ TransactionResolutionUnknownError) from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider +from aws_advanced_python_wrapper.wrapper import Wrapper def configure_pool(host_info: HostInfo, props: Dict[str, Any]) -> Dict[str, Any]: @@ -143,3 +144,6 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O """ If connection pools were enabled, close them here """ ConnectionProviderManager.release_resources() + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGSecretsManager.py b/docs/examples/PGSecretsManager.py index 7fbe9774b..62a596227 100644 --- a/docs/examples/PGSecretsManager.py +++ b/docs/examples/PGSecretsManager.py @@ -17,6 +17,7 @@ import psycopg from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -30,3 +31,6 @@ cursor.execute("SELECT pg_catalog.aurora_db_instance_identifier()") for record in cursor.fetchone(): print(record) + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGSimpleReadWriteSplitting.py b/docs/examples/PGSimpleReadWriteSplitting.py index 28e651124..af48355a1 100644 --- a/docs/examples/PGSimpleReadWriteSplitting.py +++ b/docs/examples/PGSimpleReadWriteSplitting.py @@ -25,6 +25,7 @@ from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) +from aws_advanced_python_wrapper.wrapper import Wrapper def configure_initial_session_states(conn: Connection): @@ -117,3 +118,6 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O finally: with AwsWrapperConnection.connect(psycopg.Connection.connect, **params) as conn: execute_queries_with_failover_handling(conn, "DROP TABLE bank_test") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/examples/PGXRayTelemetry.py b/docs/examples/PGXRayTelemetry.py index 2b1888d38..f12e0b5c6 100644 --- a/docs/examples/PGXRayTelemetry.py +++ b/docs/examples/PGXRayTelemetry.py @@ -22,6 +22,7 @@ from aws_xray_sdk.core.sampling.local.sampler import LocalSampler from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.wrapper import Wrapper SQL_DBLIST = "select datname from pg_database;" @@ -54,3 +55,6 @@ print(record) print("-- end of application") + + # Clean up any remaining resources created by the plugins. + Wrapper.release_resources() \ No newline at end of file diff --git a/docs/using-the-python-driver/UsingThePythonDriver.md b/docs/using-the-python-driver/UsingThePythonDriver.md index 427249451..42fd8a294 100644 --- a/docs/using-the-python-driver/UsingThePythonDriver.md +++ b/docs/using-the-python-driver/UsingThePythonDriver.md @@ -66,7 +66,7 @@ finally: ``` > [!IMPORTANT] -> Always call both `AwsWrapperConnection.release_resources()` and `ThreadPoolContainer.release_resources()` at application shutdown to ensure: +> Always call `Wrapper.release_resources` at application shutdown to ensure: > - All monitoring threads are properly terminated > - Thread pools are shut down gracefully > - No resource leaks occur From e0811df5af89338d359e92eb901745e29c9364bc Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Mon, 5 Jan 2026 09:00:12 -0800 Subject: [PATCH 5/6] refactor: move release_resources to an independent method --- aws_advanced_python_wrapper/__init__.py | 12 ++++++++++ aws_advanced_python_wrapper/cleanup.py | 24 +++++++++++++++++++ aws_advanced_python_wrapper/wrapper.py | 11 --------- docs/examples/MySQLFailover.py | 8 +++---- docs/examples/MySQLFastestResponseStrategy.py | 5 ++-- docs/examples/MySQLFederatedAuthentication.py | 7 +++--- docs/examples/MySQLIamAuthentication.py | 7 +++--- ...QLInternalConnectionPoolPasswordWarning.py | 7 +++--- docs/examples/MySQLOktaAuthentication.py | 5 ++-- docs/examples/MySQLReadWriteSplitting.py | 5 ++-- docs/examples/MySQLSecretsManager.py | 7 +++--- .../examples/MySQLSimpleReadWriteSplitting.py | 5 ++-- docs/examples/PGFailover.py | 8 +++---- docs/examples/PGFastestResponseStrategy.py | 5 ++-- docs/examples/PGFederatedAuthentication.py | 5 ++-- docs/examples/PGIamAuthentication.py | 5 ++-- ...PGInternalConnectionPoolPasswordWarning.py | 5 ++-- docs/examples/PGLimitless.py | 5 ++-- docs/examples/PGOktaAuthentication.py | 5 ++-- docs/examples/PGOpenTelemetry.py | 5 ++-- docs/examples/PGReadWriteSplitting.py | 5 ++-- docs/examples/PGSecretsManager.py | 5 ++-- docs/examples/PGSimpleReadWriteSplitting.py | 5 ++-- docs/examples/PGXRayTelemetry.py | 5 ++-- .../UsingThePythonDriver.md | 7 +++--- .../UsingTheHostMonitoringPlugin.md | 4 ++-- 26 files changed, 89 insertions(+), 88 deletions(-) create mode 100644 aws_advanced_python_wrapper/cleanup.py diff --git a/aws_advanced_python_wrapper/__init__.py b/aws_advanced_python_wrapper/__init__.py index d388f0a7e..fbac66233 100644 --- a/aws_advanced_python_wrapper/__init__.py +++ b/aws_advanced_python_wrapper/__init__.py @@ -14,6 +14,7 @@ from logging import DEBUG, getLogger +from .cleanup import release_resources from .utils.utils import LogUtils from .wrapper import AwsWrapperConnection @@ -23,6 +24,17 @@ threadsafety = 2 paramstyle = "pyformat" +# Public API +__all__ = [ + 'connect', + 'AwsWrapperConnection', + 'release_resources', + 'set_logger', + 'apilevel', + 'threadsafety', + 'paramstyle' +] + def set_logger(name='aws_advanced_python_wrapper', level=DEBUG, format_string=None): LogUtils.setup_logger(getLogger(name), level, format_string) diff --git a/aws_advanced_python_wrapper/cleanup.py b/aws_advanced_python_wrapper/cleanup.py new file mode 100644 index 000000000..1327bc47f --- /dev/null +++ b/aws_advanced_python_wrapper/cleanup.py @@ -0,0 +1,24 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aws_advanced_python_wrapper.host_monitoring_plugin import \ + MonitoringThreadContainer +from aws_advanced_python_wrapper.thread_pool_container import \ + ThreadPoolContainer + + +def release_resources() -> None: + """Release all global resources used by the wrapper.""" + MonitoringThreadContainer.clean_up() + ThreadPoolContainer.release_resources() diff --git a/aws_advanced_python_wrapper/wrapper.py b/aws_advanced_python_wrapper/wrapper.py index d8d17623d..e04d293f8 100644 --- a/aws_advanced_python_wrapper/wrapper.py +++ b/aws_advanced_python_wrapper/wrapper.py @@ -24,15 +24,11 @@ DriverDialectManager from aws_advanced_python_wrapper.errors import (AwsWrapperError, FailoverSuccessError) -from aws_advanced_python_wrapper.host_monitoring_plugin import \ - MonitoringThreadContainer from aws_advanced_python_wrapper.pep249 import Connection, Cursor, Error from aws_advanced_python_wrapper.plugin import CanReleaseResources from aws_advanced_python_wrapper.plugin_service import ( PluginManager, PluginService, PluginServiceImpl, PluginServiceManagerContainer) -from aws_advanced_python_wrapper.thread_pool_container import \ - ThreadPoolContainer from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.utils.messages import Messages from aws_advanced_python_wrapper.utils.properties import (Properties, @@ -330,10 +326,3 @@ def __enter__(self: AwsWrapperCursor) -> AwsWrapperCursor: def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: self.close() - - -class Wrapper: - @staticmethod - def release_resources() -> None: - MonitoringThreadContainer.clean_up() - ThreadPoolContainer.release_resources() diff --git a/docs/examples/MySQLFailover.py b/docs/examples/MySQLFailover.py index 591f186a1..d007567c2 100644 --- a/docs/examples/MySQLFailover.py +++ b/docs/examples/MySQLFailover.py @@ -21,11 +21,10 @@ if TYPE_CHECKING: from aws_advanced_python_wrapper.pep249 import Connection -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) -from aws_advanced_python_wrapper.wrapper import Wrapper def configure_initial_session_states(conn: Connection): @@ -62,8 +61,7 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O if __name__ == "__main__": - - try: + try: with AwsWrapperConnection.connect( mysql.connector.Connect, host="database.cluster-xyz.us-east-1.rds.amazonaws.com", @@ -88,4 +86,4 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O execute_queries_with_failover_handling(awsconn, "DROP TABLE bank_test") finally: - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/MySQLFastestResponseStrategy.py b/docs/examples/MySQLFastestResponseStrategy.py index 579b13da7..13d40e06e 100644 --- a/docs/examples/MySQLFastestResponseStrategy.py +++ b/docs/examples/MySQLFastestResponseStrategy.py @@ -14,12 +14,11 @@ import mysql.connector -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider -from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": provider = SqlAlchemyPooledConnectionProvider() @@ -64,6 +63,6 @@ teardown_cursor.execute("DROP TABLE bank_test") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() # Closes all pools and removes all cached pool connections ConnectionProviderManager.release_resources() diff --git a/docs/examples/MySQLFederatedAuthentication.py b/docs/examples/MySQLFederatedAuthentication.py index af7513a4a..77e82b8c9 100644 --- a/docs/examples/MySQLFederatedAuthentication.py +++ b/docs/examples/MySQLFederatedAuthentication.py @@ -14,8 +14,7 @@ import mysql.connector -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -38,6 +37,6 @@ res = awscursor.fetchone() print(res) - + # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/MySQLIamAuthentication.py b/docs/examples/MySQLIamAuthentication.py index c7039f844..e776cdf5b 100644 --- a/docs/examples/MySQLIamAuthentication.py +++ b/docs/examples/MySQLIamAuthentication.py @@ -14,8 +14,7 @@ import mysql.connector -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -35,6 +34,6 @@ for record in awscursor: print(record) awscursor.execute("DROP TABLE bank_test") - + # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py b/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py index 482ae36ad..143d73b5b 100644 --- a/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py +++ b/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py @@ -14,12 +14,11 @@ import mysql.connector -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider -from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": params = { @@ -63,6 +62,6 @@ pass except Exception: print("Failed to connect - password was incorrect") - + # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/MySQLOktaAuthentication.py b/docs/examples/MySQLOktaAuthentication.py index 6fc6b69c0..8fbe161e4 100644 --- a/docs/examples/MySQLOktaAuthentication.py +++ b/docs/examples/MySQLOktaAuthentication.py @@ -14,8 +14,7 @@ import mysql.connector -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -39,4 +38,4 @@ print(res) # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/MySQLReadWriteSplitting.py b/docs/examples/MySQLReadWriteSplitting.py index 40c6ffbee..1cbe2dc55 100644 --- a/docs/examples/MySQLReadWriteSplitting.py +++ b/docs/examples/MySQLReadWriteSplitting.py @@ -22,7 +22,7 @@ import mysql.connector # type: ignore -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.errors import ( @@ -30,7 +30,6 @@ TransactionResolutionUnknownError) from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider -from aws_advanced_python_wrapper.wrapper import Wrapper def configure_pool(host_info: HostInfo, props: Dict[str, Any]) -> Dict[str, Any]: @@ -145,4 +144,4 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O ConnectionProviderManager.release_resources() # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/MySQLSecretsManager.py b/docs/examples/MySQLSecretsManager.py index e693514cc..4f695bae1 100644 --- a/docs/examples/MySQLSecretsManager.py +++ b/docs/examples/MySQLSecretsManager.py @@ -16,8 +16,7 @@ import mysql.connector -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -31,6 +30,6 @@ cursor.execute("SELECT @@aurora_server_id") for record in cursor.fetchone(): print(record) - + # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/MySQLSimpleReadWriteSplitting.py b/docs/examples/MySQLSimpleReadWriteSplitting.py index b44c70e98..d4b5218ab 100644 --- a/docs/examples/MySQLSimpleReadWriteSplitting.py +++ b/docs/examples/MySQLSimpleReadWriteSplitting.py @@ -21,11 +21,10 @@ import mysql.connector # type: ignore -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) -from aws_advanced_python_wrapper.wrapper import Wrapper def configure_initial_session_states(conn: Connection): @@ -119,4 +118,4 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O execute_queries_with_failover_handling(conn, "DROP TABLE bank_test") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGFailover.py b/docs/examples/PGFailover.py index 539c1c202..d5a67b577 100644 --- a/docs/examples/PGFailover.py +++ b/docs/examples/PGFailover.py @@ -18,12 +18,10 @@ import psycopg -from aws_advanced_python_wrapper.wrapper import Wrapper - if TYPE_CHECKING: from aws_advanced_python_wrapper.pep249 import Connection -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) @@ -68,7 +66,7 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O "monitoring-socket_timeout": 10 } - try: + try: with AwsWrapperConnection.connect( psycopg.Connection.connect, host="database.cluster-xyz.us-east-1.rds.amazonaws.com", @@ -96,4 +94,4 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O execute_queries_with_failover_handling(awsconn, "DROP TABLE bank_test") finally: # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/PGFastestResponseStrategy.py b/docs/examples/PGFastestResponseStrategy.py index 5ac467423..54b2fde14 100644 --- a/docs/examples/PGFastestResponseStrategy.py +++ b/docs/examples/PGFastestResponseStrategy.py @@ -14,12 +14,11 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider -from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": provider = SqlAlchemyPooledConnectionProvider() @@ -67,4 +66,4 @@ ConnectionProviderManager.release_resources() # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/PGFederatedAuthentication.py b/docs/examples/PGFederatedAuthentication.py index 00a493b20..9de0fdd92 100644 --- a/docs/examples/PGFederatedAuthentication.py +++ b/docs/examples/PGFederatedAuthentication.py @@ -14,8 +14,7 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -39,4 +38,4 @@ print(res) # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() + release_resources() diff --git a/docs/examples/PGIamAuthentication.py b/docs/examples/PGIamAuthentication.py index c574ee5c1..956adf43a 100644 --- a/docs/examples/PGIamAuthentication.py +++ b/docs/examples/PGIamAuthentication.py @@ -14,8 +14,7 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -38,4 +37,4 @@ awscursor.execute("DROP TABLE bank_test") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGInternalConnectionPoolPasswordWarning.py b/docs/examples/PGInternalConnectionPoolPasswordWarning.py index ba0be1799..a7cc0d0b5 100644 --- a/docs/examples/PGInternalConnectionPoolPasswordWarning.py +++ b/docs/examples/PGInternalConnectionPoolPasswordWarning.py @@ -14,12 +14,11 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider -from aws_advanced_python_wrapper.wrapper import Wrapper if __name__ == "__main__": params = { @@ -65,4 +64,4 @@ print("Failed to connect - password was incorrect") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGLimitless.py b/docs/examples/PGLimitless.py index 143efd39e..9f3f6188e 100644 --- a/docs/examples/PGLimitless.py +++ b/docs/examples/PGLimitless.py @@ -14,8 +14,7 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -33,4 +32,4 @@ print(res) # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGOktaAuthentication.py b/docs/examples/PGOktaAuthentication.py index 60c6306f9..7368856e6 100644 --- a/docs/examples/PGOktaAuthentication.py +++ b/docs/examples/PGOktaAuthentication.py @@ -14,8 +14,7 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -39,4 +38,4 @@ print(res) # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGOpenTelemetry.py b/docs/examples/PGOpenTelemetry.py index 7f26c7bbe..8b83d416d 100644 --- a/docs/examples/PGOpenTelemetry.py +++ b/docs/examples/PGOpenTelemetry.py @@ -29,8 +29,7 @@ from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources SQL_DBLIST = "select datname from pg_database;" @@ -81,4 +80,4 @@ print("-- end of application") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGReadWriteSplitting.py b/docs/examples/PGReadWriteSplitting.py index b35cc24bb..b345b295c 100644 --- a/docs/examples/PGReadWriteSplitting.py +++ b/docs/examples/PGReadWriteSplitting.py @@ -22,7 +22,7 @@ import psycopg # type: ignore -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.errors import ( @@ -30,7 +30,6 @@ TransactionResolutionUnknownError) from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider -from aws_advanced_python_wrapper.wrapper import Wrapper def configure_pool(host_info: HostInfo, props: Dict[str, Any]) -> Dict[str, Any]: @@ -146,4 +145,4 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O ConnectionProviderManager.release_resources() # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGSecretsManager.py b/docs/examples/PGSecretsManager.py index 62a596227..c65d465aa 100644 --- a/docs/examples/PGSecretsManager.py +++ b/docs/examples/PGSecretsManager.py @@ -16,8 +16,7 @@ import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": with AwsWrapperConnection.connect( @@ -33,4 +32,4 @@ print(record) # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGSimpleReadWriteSplitting.py b/docs/examples/PGSimpleReadWriteSplitting.py index af48355a1..d13f91f7f 100644 --- a/docs/examples/PGSimpleReadWriteSplitting.py +++ b/docs/examples/PGSimpleReadWriteSplitting.py @@ -21,11 +21,10 @@ import psycopg # type: ignore -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.errors import ( FailoverFailedError, FailoverSuccessError, TransactionResolutionUnknownError) -from aws_advanced_python_wrapper.wrapper import Wrapper def configure_initial_session_states(conn: Connection): @@ -120,4 +119,4 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O execute_queries_with_failover_handling(conn, "DROP TABLE bank_test") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/examples/PGXRayTelemetry.py b/docs/examples/PGXRayTelemetry.py index f12e0b5c6..e6e218e4f 100644 --- a/docs/examples/PGXRayTelemetry.py +++ b/docs/examples/PGXRayTelemetry.py @@ -21,8 +21,7 @@ from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.core.sampling.local.sampler import LocalSampler -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.wrapper import Wrapper +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources SQL_DBLIST = "select datname from pg_database;" @@ -57,4 +56,4 @@ print("-- end of application") # Clean up any remaining resources created by the plugins. - Wrapper.release_resources() \ No newline at end of file + release_resources() diff --git a/docs/using-the-python-driver/UsingThePythonDriver.md b/docs/using-the-python-driver/UsingThePythonDriver.md index 42fd8a294..bc753267d 100644 --- a/docs/using-the-python-driver/UsingThePythonDriver.md +++ b/docs/using-the-python-driver/UsingThePythonDriver.md @@ -53,8 +53,7 @@ The AWS Advanced Python Wrapper creates background threads and thread pools for Call the following methods before your application terminates: ```python -from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.thread_pool_container import ThreadPoolContainer +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources try: # Your application code here @@ -62,11 +61,11 @@ try: # ... use connection finally: # Clean up all resources before application exit - ThreadPoolContainer.release_resources() + release_resources() ``` > [!IMPORTANT] -> Always call `Wrapper.release_resources` at application shutdown to ensure: +> Always call `release_resources()` at application shutdown to ensure: > - All monitoring threads are properly terminated > - Thread pools are shut down gracefully > - No resource leaks occur diff --git a/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md b/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md index 2f71b9f02..f1fe4f347 100644 --- a/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md +++ b/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md @@ -21,8 +21,8 @@ This plugin only works with drivers that support aborting connections from a sep > [IMPORTANT]\ > The Host Monitoring Plugin creates monitoring threads in the background to monitor all connections established to each cluster instance. The monitoring threads can be cleaned up in two ways: > 1. If there are no connections to the cluster instance the thread is monitoring for over a period of time, the Host Monitoring Plugin will automatically terminate the thread. This period of time is adjustable via the `monitor_disposal_time_ms` parameter. -> 2. Client applications can manually call `Wrapper.release_resources()` to clean up any dangling resources. -> It is best practice to call `Wrapper.release_resources()` at the end of the application to ensure a graceful exit; otherwise, the application may wait until the `monitor_disposal_time_ms` has been passed before terminating. This is because the Python driver waits for all daemon threads to complete before exiting. +> 2. Client applications can manually call `aws_advanced_python_wrapper.release_resources()` to clean up any dangling resources. +> It is best practice to call `aws_advanced_python_wrapper.release_resources()` at the end of the application to ensure a graceful exit; otherwise, the application may wait until the `monitor_disposal_time_ms` has been passed before terminating. This is because the Python driver waits for all daemon threads to complete before exiting. > See [PGFailover](../../examples/PGFailover.py) for an example. ### Enhanced Failure Monitoring Parameters From 529fee6dfc4f4abdb671c2f168af40d7f5ccc848 Mon Sep 17 00:00:00 2001 From: Favian Samatha Date: Tue, 6 Jan 2026 12:08:27 -0800 Subject: [PATCH 6/6] address PR comments --- .../host_monitoring_plugin.py | 1 + ...dvanced_python_wrapper_messages.properties | 2 + .../thread_pool_container.py | 15 +++- docs/examples/MySQLFastestResponseStrategy.py | 77 ++++++++++--------- docs/examples/MySQLFederatedAuthentication.py | 45 +++++------ docs/examples/MySQLIamAuthentication.py | 39 +++++----- ...QLInternalConnectionPoolPasswordWarning.py | 76 +++++++++--------- docs/examples/MySQLOktaAuthentication.py | 42 +++++----- docs/examples/MySQLReadWriteSplitting.py | 2 +- docs/examples/MySQLSecretsManager.py | 29 +++---- .../examples/MySQLSimpleReadWriteSplitting.py | 4 +- docs/examples/PGFailover.py | 2 +- docs/examples/PGFastestResponseStrategy.py | 77 ++++++++++--------- docs/examples/PGFederatedAuthentication.py | 43 ++++++----- docs/examples/PGIamAuthentication.py | 41 +++++----- ...PGInternalConnectionPoolPasswordWarning.py | 75 +++++++++--------- docs/examples/PGLimitless.py | 31 ++++---- docs/examples/PGOktaAuthentication.py | 43 ++++++----- docs/examples/PGOpenTelemetry.py | 47 +++++------ docs/examples/PGReadWriteSplitting.py | 4 +- docs/examples/PGSecretsManager.py | 29 +++---- docs/examples/PGSimpleReadWriteSplitting.py | 5 +- docs/examples/PGXRayTelemetry.py | 45 +++++------ .../UsingTheHostMonitoringPlugin.md | 29 +++---- .../container/test_aurora_failover.py | 6 +- .../container/test_basic_connectivity.py | 5 +- .../container/test_read_write_splitting.py | 6 +- tests/unit/test_monitor.py | 8 +- tests/unit/test_monitor_service.py | 3 +- .../unit/test_monitoring_thread_container.py | 4 +- .../test_multithreaded_monitor_service.py | 13 ++-- 31 files changed, 437 insertions(+), 411 deletions(-) diff --git a/aws_advanced_python_wrapper/host_monitoring_plugin.py b/aws_advanced_python_wrapper/host_monitoring_plugin.py index 254516cde..a5e5752f4 100644 --- a/aws_advanced_python_wrapper/host_monitoring_plugin.py +++ b/aws_advanced_python_wrapper/host_monitoring_plugin.py @@ -650,6 +650,7 @@ def _release_resources(self): for monitor, _ in self._tasks_map.items(): monitor.stop() + ThreadPoolContainer.release_pool(MonitoringThreadContainer._executor_name, wait=False) self._tasks_map.clear() diff --git a/aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties b/aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties index a35c0bfb2..1c1c0fbd7 100644 --- a/aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties +++ b/aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties @@ -274,6 +274,8 @@ HostResponseTimeMonitor.OpeningConnection=[HostResponseTimeMonitor] Opening a Re HostResponseTimeMonitor.ResponseTime=[HostResponseTimeMonitor] Response time for '{}': {} ms HostResponseTimeMonitor.Stopped=[HostResponseTimeMonitor] Stopped Response time thread for host '{}'. +ThreadPoolContainer.ErrorShuttingDownPool=[ThreadPoolContainer] Error shutting down pool '{}': '{}'. + OpenedConnectionTracker.OpenedConnectionsTracked=[OpenedConnectionTracker] Opened Connections Tracked: {} OpenedConnectionTracker.InvalidatingConnections=[OpenedConnectionTracker] Invalidating opened connections to host: {} OpenedConnectionTracker.UnableToPopulateOpenedConnectionSet=[OpenedConnectionTracker] The driver is unable to track this opened connection because the instance endpoint is unknown. diff --git a/aws_advanced_python_wrapper/thread_pool_container.py b/aws_advanced_python_wrapper/thread_pool_container.py index dcbbb28d5..6c9cf905a 100644 --- a/aws_advanced_python_wrapper/thread_pool_container.py +++ b/aws_advanced_python_wrapper/thread_pool_container.py @@ -16,6 +16,10 @@ from concurrent.futures import ThreadPoolExecutor from typing import Dict, List, Optional +from aws_advanced_python_wrapper.utils.log import Logger + +logger = Logger(__name__) + class ThreadPoolContainer: """ @@ -66,7 +70,7 @@ def release_resources(cls, wait=False) -> None: try: pool.shutdown(wait=wait) except Exception as e: - print(f"Error shutting down pool '{name}': {e}") + logger.warning("ThreadPoolContainer.ErrorShuttingDownPool", name, e) cls._pools.clear() @classmethod @@ -83,9 +87,12 @@ def release_pool(cls, name: str, wait: bool = True) -> bool: """ with cls._lock: if name in cls._pools: - cls._pools[name].shutdown(wait=wait) - del cls._pools[name] - return True + try: + cls._pools[name].shutdown(wait=wait) + del cls._pools[name] + return True + except Exception as e: + logger.warning("ThreadPoolContainer.ErrorShuttingDownPool", name, e) return False @classmethod diff --git a/docs/examples/MySQLFastestResponseStrategy.py b/docs/examples/MySQLFastestResponseStrategy.py index 13d40e06e..ffb525619 100644 --- a/docs/examples/MySQLFastestResponseStrategy.py +++ b/docs/examples/MySQLFastestResponseStrategy.py @@ -24,45 +24,46 @@ provider = SqlAlchemyPooledConnectionProvider() ConnectionProviderManager.set_connection_provider(provider) - with AwsWrapperConnection.connect( - mysql.connector.Connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - database="mysql", - user="user", - password="password", - plugins="read_write_splitting,fastest_response_strategy", - reader_host_selector_strategy="fastest_response", - autocommit=True - ) as conn: - # Set up - with conn.cursor() as setup_cursor: - setup_cursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") - setup_cursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) + try: + with AwsWrapperConnection.connect( + mysql.connector.Connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + database="mysql", + user="user", + password="password", + plugins="read_write_splitting,fastest_response_strategy", + reader_host_selector_strategy="fastest_response", + autocommit=True + ) as conn: + # Set up + with conn.cursor() as setup_cursor: + setup_cursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") + setup_cursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) - conn.read_only = True - with conn.cursor() as cursor_1: - cursor_1.execute("SELECT * FROM bank_test") - results = cursor_1.fetchall() - for record in results: - print(record) + conn.read_only = True + with conn.cursor() as cursor_1: + cursor_1.execute("SELECT * FROM bank_test") + results = cursor_1.fetchall() + for record in results: + print(record) - # Switch to writer host - conn.read_only = False + # Switch to writer host + conn.read_only = False - # Use cached host when switching back to a reader - conn.read_only = True - with conn.cursor() as cursor_2: - cursor_2.execute("SELECT * FROM bank_test") - results = cursor_2.fetchall() - for record in results: - print(record) + # Use cached host when switching back to a reader + conn.read_only = True + with conn.cursor() as cursor_2: + cursor_2.execute("SELECT * FROM bank_test") + results = cursor_2.fetchall() + for record in results: + print(record) - # Tear down - conn.read_only = False - with conn.cursor() as teardown_cursor: - teardown_cursor.execute("DROP TABLE bank_test") - - # Clean up any remaining resources created by the plugins. - release_resources() - # Closes all pools and removes all cached pool connections - ConnectionProviderManager.release_resources() + # Tear down + conn.read_only = False + with conn.cursor() as teardown_cursor: + teardown_cursor.execute("DROP TABLE bank_test") + finally: + # Clean up global resources created by wrapper + release_resources() + # Closes all pools and removes all cached pool connections + ConnectionProviderManager.release_resources() diff --git a/docs/examples/MySQLFederatedAuthentication.py b/docs/examples/MySQLFederatedAuthentication.py index 77e82b8c9..65cf1d219 100644 --- a/docs/examples/MySQLFederatedAuthentication.py +++ b/docs/examples/MySQLFederatedAuthentication.py @@ -17,26 +17,27 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - mysql.connector.Connect, - host="database.cluster-xyz.us-east-2.rds.amazonaws.com", - database="mysql", - plugins="federated_auth", - idp_name="adfs", - app_id="abcde1fgh3kLZTBz1S5d7", - idp_endpoint="ec2amaz-ab3cdef.example.com", - iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", - iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", - iam_region="us-east-2", - idp_username="some_federated_username@example.com", - idp_password="some_password", - db_user="john", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("SELECT 1") + try: + with AwsWrapperConnection.connect( + mysql.connector.Connect, + host="database.cluster-xyz.us-east-2.rds.amazonaws.com", + database="mysql", + plugins="federated_auth", + idp_name="adfs", + app_id="abcde1fgh3kLZTBz1S5d7", + idp_endpoint="ec2amaz-ab3cdef.example.com", + iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", + iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", + iam_region="us-east-2", + idp_username="some_federated_username@example.com", + idp_password="some_password", + db_user="john", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("SELECT 1") - res = awscursor.fetchone() - print(res) - - # Clean up any remaining resources created by the plugins. - release_resources() + res = awscursor.fetchone() + print(res) + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/MySQLIamAuthentication.py b/docs/examples/MySQLIamAuthentication.py index e776cdf5b..32848d922 100644 --- a/docs/examples/MySQLIamAuthentication.py +++ b/docs/examples/MySQLIamAuthentication.py @@ -17,23 +17,24 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - mysql.connector.Connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - database="mysql", - user="admin", - plugins="iam", - wrapper_dialect="aurora-mysql", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") - awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) - awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200)) - awscursor.execute("SELECT * FROM bank_test") + try: + with AwsWrapperConnection.connect( + mysql.connector.Connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + database="mysql", + user="admin", + plugins="iam", + wrapper_dialect="aurora-mysql", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") + awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) + awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200)) + awscursor.execute("SELECT * FROM bank_test") - for record in awscursor: - print(record) - awscursor.execute("DROP TABLE bank_test") - - # Clean up any remaining resources created by the plugins. - release_resources() + for record in awscursor: + print(record) + awscursor.execute("DROP TABLE bank_test") + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py b/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py index 143d73b5b..1f516d62e 100644 --- a/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py +++ b/docs/examples/MySQLInternalConnectionPoolPasswordWarning.py @@ -21,47 +21,49 @@ SqlAlchemyPooledConnectionProvider if __name__ == "__main__": - params = { - # In general, you should not use instance URLs to connect. However, we will use one here to simplify this - # example, because internal connection pools are only opened when connecting to an instance URL. Normally the - # internal connection pool would be opened when read_only is set instead of when you are initially connecting. - "host": "database-instance.xyz.us-east-1.rds.amazonaws.com", - "database": "mysql", - "user": "admin", - "plugins": "read_write_splitting,failover", - "autocommit": True - } - - correct_password = "correct_password" - incorrect_password = "incorrect_password" - - provider = SqlAlchemyPooledConnectionProvider() - ConnectionProviderManager.set_connection_provider(provider) + try: + params = { + # In general, you should not use instance URLs to connect. However, we will use one here to simplify this + # example, because internal connection pools are only opened when connecting to an instance URL. Normally the + # internal connection pool would be opened when read_only is set instead of when you are initially connecting. + "host": "database-instance.xyz.us-east-1.rds.amazonaws.com", + "database": "mysql", + "user": "admin", + "plugins": "read_write_splitting,failover", + "autocommit": True + } - # Create an internal connection pool with the correct password - conn = AwsWrapperConnection.connect(mysql.connector.Connect, **params, password=correct_password) - # Finished with connection. The connection is not actually closed here, instead it will be returned to the pool but - # will remain open. - conn.close() + correct_password = "correct_password" + incorrect_password = "incorrect_password" - # Even though we use an incorrect password, the original connection 'conn' will be returned by the pool, and we can - # still use it. - with AwsWrapperConnection.connect( - mysql.connector.Connect, **params, password=incorrect_password) as incorrect_password_conn: - incorrect_password_conn.cursor().execute("SELECT 1") + provider = SqlAlchemyPooledConnectionProvider() + ConnectionProviderManager.set_connection_provider(provider) - # Closes all pools and removes all cached pool connections - ConnectionProviderManager.release_resources() + # Create an internal connection pool with the correct password + conn = AwsWrapperConnection.connect(mysql.connector.Connect, **params, password=correct_password) + # Finished with connection. The connection is not actually closed here, instead it will be returned to the pool but + # will remain open. + conn.close() - try: - # Correctly throws an exception - creates a fresh connection pool which will check the password because there - # are no longer any cached pool connections. + # Even though we use an incorrect password, the original connection 'conn' will be returned by the pool, and we can + # still use it. with AwsWrapperConnection.connect( mysql.connector.Connect, **params, password=incorrect_password) as incorrect_password_conn: - # Will not reach - exception will be thrown - pass - except Exception: - print("Failed to connect - password was incorrect") + incorrect_password_conn.cursor().execute("SELECT 1") + + # Closes all pools and removes all cached pool connections + ConnectionProviderManager.release_resources() + + try: + # Correctly throws an exception - creates a fresh connection pool which will check the password because there + # are no longer any cached pool connections. + with AwsWrapperConnection.connect( + mysql.connector.Connect, **params, password=incorrect_password) as incorrect_password_conn: + # Will not reach - exception will be thrown + pass + except Exception: + print("Failed to connect - password was incorrect") - # Clean up any remaining resources created by the plugins. - release_resources() + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/MySQLOktaAuthentication.py b/docs/examples/MySQLOktaAuthentication.py index 8fbe161e4..743830133 100644 --- a/docs/examples/MySQLOktaAuthentication.py +++ b/docs/examples/MySQLOktaAuthentication.py @@ -17,25 +17,27 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - mysql.connector.Connect, - host="database.cluster-xyz.us-east-2.rds.amazonaws.com", - database="mysql", - plugins="okta", - idp_endpoint="ec2amaz-ab3cdef.example.com", - app_id="abcde1fgh3kLZTBz1S5d7", - iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", - iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", - iam_region="us-east-2", - idp_username="some_federated_username@example.com", - idp_password="some_password", - db_user="john", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("SELECT @@aurora_server_id") + try: + with AwsWrapperConnection.connect( + mysql.connector.Connect, + host="database.cluster-xyz.us-east-2.rds.amazonaws.com", + database="mysql", + plugins="okta", + idp_endpoint="ec2amaz-ab3cdef.example.com", + app_id="abcde1fgh3kLZTBz1S5d7", + iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", + iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", + iam_region="us-east-2", + idp_username="some_federated_username@example.com", + idp_password="some_password", + db_user="john", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("SELECT @@aurora_server_id") - res = awscursor.fetchone() - print(res) + res = awscursor.fetchone() + print(res) - # Clean up any remaining resources created by the plugins. - release_resources() + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/MySQLReadWriteSplitting.py b/docs/examples/MySQLReadWriteSplitting.py index 1cbe2dc55..a102d68da 100644 --- a/docs/examples/MySQLReadWriteSplitting.py +++ b/docs/examples/MySQLReadWriteSplitting.py @@ -143,5 +143,5 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O """ If connection pools were enabled, close them here """ ConnectionProviderManager.release_resources() - # Clean up any remaining resources created by the plugins. + # Clean up global resources created by wrapper release_resources() diff --git a/docs/examples/MySQLSecretsManager.py b/docs/examples/MySQLSecretsManager.py index 4f695bae1..bff460d4f 100644 --- a/docs/examples/MySQLSecretsManager.py +++ b/docs/examples/MySQLSecretsManager.py @@ -19,17 +19,18 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - mysql.connector.Connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - database="mysql", - secrets_manager_secret_id="arn:aws:secretsmanager:::secret:Secre78tName-6RandomCharacters", - secrets_manager_region="us-east-2", - plugins="aws_secrets_manager" - ) as awsconn, awsconn.cursor() as cursor: - cursor.execute("SELECT @@aurora_server_id") - for record in cursor.fetchone(): - print(record) - - # Clean up any remaining resources created by the plugins. - release_resources() + try: + with AwsWrapperConnection.connect( + mysql.connector.Connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + database="mysql", + secrets_manager_secret_id="arn:aws:secretsmanager:::secret:Secre78tName-6RandomCharacters", + secrets_manager_region="us-east-2", + plugins="aws_secrets_manager" + ) as awsconn, awsconn.cursor() as cursor: + cursor.execute("SELECT @@aurora_server_id") + for record in cursor.fetchone(): + print(record) + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/MySQLSimpleReadWriteSplitting.py b/docs/examples/MySQLSimpleReadWriteSplitting.py index d4b5218ab..170796648 100644 --- a/docs/examples/MySQLSimpleReadWriteSplitting.py +++ b/docs/examples/MySQLSimpleReadWriteSplitting.py @@ -117,5 +117,5 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O with AwsWrapperConnection.connect(mysql.connector.Connect, **params) as conn: execute_queries_with_failover_handling(conn, "DROP TABLE bank_test") - # Clean up any remaining resources created by the plugins. - release_resources() + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGFailover.py b/docs/examples/PGFailover.py index d5a67b577..c15964fbe 100644 --- a/docs/examples/PGFailover.py +++ b/docs/examples/PGFailover.py @@ -93,5 +93,5 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O execute_queries_with_failover_handling(awsconn, "DROP TABLE bank_test") finally: - # Clean up any remaining resources created by the plugins. + # Clean up global resources created by wrapper release_resources() diff --git a/docs/examples/PGFastestResponseStrategy.py b/docs/examples/PGFastestResponseStrategy.py index 54b2fde14..2916bdbc3 100644 --- a/docs/examples/PGFastestResponseStrategy.py +++ b/docs/examples/PGFastestResponseStrategy.py @@ -24,46 +24,47 @@ provider = SqlAlchemyPooledConnectionProvider() ConnectionProviderManager.set_connection_provider(provider) - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - dbname="postgres", - user="user", - password="password", - plugins="read_write_splitting,fastest_response_strategy", - reader_host_selector_strategy="fastest_response", - autocommit=True - ) as conn: - # Set up - with conn.cursor() as setup_cursor: - setup_cursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") - setup_cursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + dbname="postgres", + user="user", + password="password", + plugins="read_write_splitting,fastest_response_strategy", + reader_host_selector_strategy="fastest_response", + autocommit=True + ) as conn: + # Set up + with conn.cursor() as setup_cursor: + setup_cursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") + setup_cursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) - conn.read_only = True - with conn.cursor() as cursor_1: - cursor_1.execute("SELECT * FROM bank_test") - results = cursor_1.fetchall() - for record in results: - print(record) + conn.read_only = True + with conn.cursor() as cursor_1: + cursor_1.execute("SELECT * FROM bank_test") + results = cursor_1.fetchall() + for record in results: + print(record) - # Switch to writer host - conn.read_only = False + # Switch to writer host + conn.read_only = False - # Use cached host when switching back to a reader - conn.read_only = True - with conn.cursor() as cursor_2: - cursor_2.execute("SELECT * FROM bank_test") - results = cursor_2.fetchall() - for record in results: - print(record) + # Use cached host when switching back to a reader + conn.read_only = True + with conn.cursor() as cursor_2: + cursor_2.execute("SELECT * FROM bank_test") + results = cursor_2.fetchall() + for record in results: + print(record) - # Tear down - conn.read_only = False - with conn.cursor() as teardown_cursor: - teardown_cursor.execute("DROP TABLE bank_test") + # Tear down + conn.read_only = False + with conn.cursor() as teardown_cursor: + teardown_cursor.execute("DROP TABLE bank_test") + finally: + # Closes all pools and removes all cached pool connections + ConnectionProviderManager.release_resources() - # Closes all pools and removes all cached pool connections - ConnectionProviderManager.release_resources() - - # Clean up any remaining resources created by the plugins. - release_resources() + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGFederatedAuthentication.py b/docs/examples/PGFederatedAuthentication.py index 9de0fdd92..20f2a965a 100644 --- a/docs/examples/PGFederatedAuthentication.py +++ b/docs/examples/PGFederatedAuthentication.py @@ -17,25 +17,26 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-2.rds.amazonaws.com", - dbname="postgres", - plugins="federated_auth", - idp_name="adfs", - idp_endpoint="ec2amaz-ab3cdef.example.com", - iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", - iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", - iam_region="us-east-2", - idp_username="some_federated_username@example.com", - idp_password="some_password", - db_user="john", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("SELECT 1") + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-2.rds.amazonaws.com", + dbname="postgres", + plugins="federated_auth", + idp_name="adfs", + idp_endpoint="ec2amaz-ab3cdef.example.com", + iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", + iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", + iam_region="us-east-2", + idp_username="some_federated_username@example.com", + idp_password="some_password", + db_user="john", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("SELECT 1") - res = awscursor.fetchone() - print(res) - - # Clean up any remaining resources created by the plugins. - release_resources() + res = awscursor.fetchone() + print(res) + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGIamAuthentication.py b/docs/examples/PGIamAuthentication.py index 956adf43a..1b9344e35 100644 --- a/docs/examples/PGIamAuthentication.py +++ b/docs/examples/PGIamAuthentication.py @@ -17,24 +17,25 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - dbname="postgres", - user="john", - plugins="iam", - wrapper_dialect="aurora-pg", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") - awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) - awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200)) - awscursor.execute("SELECT * FROM bank_test") + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + dbname="postgres", + user="john", + plugins="iam", + wrapper_dialect="aurora-pg", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("CREATE TABLE IF NOT EXISTS bank_test (id int primary key, name varchar(40), account_balance int)") + awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (0, "Jane Doe", 200)) + awscursor.execute("INSERT INTO bank_test VALUES (%s, %s, %s)", (1, "John Smith", 200)) + awscursor.execute("SELECT * FROM bank_test") - res = awscursor.fetchall() - for record in res: - print(record) - awscursor.execute("DROP TABLE bank_test") - - # Clean up any remaining resources created by the plugins. - release_resources() + res = awscursor.fetchall() + for record in res: + print(record) + awscursor.execute("DROP TABLE bank_test") + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGInternalConnectionPoolPasswordWarning.py b/docs/examples/PGInternalConnectionPoolPasswordWarning.py index a7cc0d0b5..026870d83 100644 --- a/docs/examples/PGInternalConnectionPoolPasswordWarning.py +++ b/docs/examples/PGInternalConnectionPoolPasswordWarning.py @@ -21,47 +21,48 @@ SqlAlchemyPooledConnectionProvider if __name__ == "__main__": - params = { - # In general, you should not use instance URLs to connect. However, we will use one here to simplify this - # example, because internal connection pools are only opened when connecting to an instance URL. Normally the - # internal connection pool would be opened when read_only is set instead of when you are initially connecting. - "host": "database-instance.xyz.us-east-1.rds.amazonaws.com", - "dbname": "postgres", - "user": "john", - "plugins": "read_write_splitting,failover,host_monitoring", - "autocommit": True - } - - correct_password = "correct_password" - incorrect_password = "incorrect_password" - - provider = SqlAlchemyPooledConnectionProvider() - ConnectionProviderManager.set_connection_provider(provider) + try: + params = { + # In general, you should not use instance URLs to connect. However, we will use one here to simplify this + # example, because internal connection pools are only opened when connecting to an instance URL. Normally the + # internal connection pool would be opened when read_only is set instead of when you are initially connecting. + "host": "database-instance.xyz.us-east-1.rds.amazonaws.com", + "dbname": "postgres", + "user": "john", + "plugins": "read_write_splitting,failover,host_monitoring", + "autocommit": True + } - # Create an internal connection pool with the correct password - conn = AwsWrapperConnection.connect(psycopg.Connection.connect, **params, password=correct_password) - # Finished with connection. The connection is not actually closed here, instead it will be returned to the pool but - # will remain open. - conn.close() + correct_password = "correct_password" + incorrect_password = "incorrect_password" - # Even though we use an incorrect password, the original connection 'conn' will be returned by the pool, and we can - # still use it. - with AwsWrapperConnection.connect( - psycopg.Connection.connect, **params, password=incorrect_password) as incorrect_password_conn: - incorrect_password_conn.cursor().execute("SELECT 1") + provider = SqlAlchemyPooledConnectionProvider() + ConnectionProviderManager.set_connection_provider(provider) - # Closes all pools and removes all cached pool connections - ConnectionProviderManager.release_resources() + # Create an internal connection pool with the correct password + conn = AwsWrapperConnection.connect(psycopg.Connection.connect, **params, password=correct_password) + # Finished with connection. The connection is not actually closed here, instead it will be returned to the pool but + # will remain open. + conn.close() - try: - # Correctly throws an exception - creates a fresh connection pool which will check the password because there - # are no longer any cached pool connections. + # Even though we use an incorrect password, the original connection 'conn' will be returned by the pool, and we can + # still use it. with AwsWrapperConnection.connect( psycopg.Connection.connect, **params, password=incorrect_password) as incorrect_password_conn: - # Will not reach - exception will be thrown - pass - except Exception: - print("Failed to connect - password was incorrect") + incorrect_password_conn.cursor().execute("SELECT 1") + + # Closes all pools and removes all cached pool connections + ConnectionProviderManager.release_resources() - # Clean up any remaining resources created by the plugins. - release_resources() + try: + # Correctly throws an exception - creates a fresh connection pool which will check the password because there + # are no longer any cached pool connections. + with AwsWrapperConnection.connect( + psycopg.Connection.connect, **params, password=incorrect_password) as incorrect_password_conn: + # Will not reach - exception will be thrown + pass + except Exception: + print("Failed to connect - password was incorrect") + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGLimitless.py b/docs/examples/PGLimitless.py index 9f3f6188e..e81bce1ae 100644 --- a/docs/examples/PGLimitless.py +++ b/docs/examples/PGLimitless.py @@ -17,19 +17,20 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="limitless-cluster.limitless-xyz.us-east-1.rds.amazonaws.com", - dbname="postgres_limitless", - user="user", - password="password", - plugins="limitless", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("SELECT * FROM pg_catalog.aurora_db_instance_identifier()") + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="limitless-cluster.limitless-xyz.us-east-1.rds.amazonaws.com", + dbname="postgres_limitless", + user="user", + password="password", + plugins="limitless", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("SELECT * FROM pg_catalog.aurora_db_instance_identifier()") - res = awscursor.fetchone() - print(res) - - # Clean up any remaining resources created by the plugins. - release_resources() + res = awscursor.fetchone() + print(res) + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGOktaAuthentication.py b/docs/examples/PGOktaAuthentication.py index 7368856e6..8c365da21 100644 --- a/docs/examples/PGOktaAuthentication.py +++ b/docs/examples/PGOktaAuthentication.py @@ -17,25 +17,26 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-2.rds.amazonaws.com", - dbname="postgres", - plugins="okta", - idp_endpoint="ec2amaz-ab3cdef.example.com", - app_id="abcde1fgh3kLZTBz1S5d7", - iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", - iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", - iam_region="us-east-2", - idp_username="some_federated_username@example.com", - idp_password="some_password", - db_user="john", - autocommit=True - ) as awsconn, awsconn.cursor() as awscursor: - awscursor.execute("SELECT * FROM pg_catalog.aurora_db_instance_identifier()") + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-2.rds.amazonaws.com", + dbname="postgres", + plugins="okta", + idp_endpoint="ec2amaz-ab3cdef.example.com", + app_id="abcde1fgh3kLZTBz1S5d7", + iam_role_arn="arn:aws:iam::123456789012:role/adfs_example_iam_role", + iam_idp_arn="arn:aws:iam::123456789012:saml-provider/adfs_example", + iam_region="us-east-2", + idp_username="some_federated_username@example.com", + idp_password="some_password", + db_user="john", + autocommit=True + ) as awsconn, awsconn.cursor() as awscursor: + awscursor.execute("SELECT * FROM pg_catalog.aurora_db_instance_identifier()") - res = awscursor.fetchone() - print(res) - - # Clean up any remaining resources created by the plugins. - release_resources() + res = awscursor.fetchone() + print(res) + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGOpenTelemetry.py b/docs/examples/PGOpenTelemetry.py index 8b83d416d..d882fb7a5 100644 --- a/docs/examples/PGOpenTelemetry.py +++ b/docs/examples/PGOpenTelemetry.py @@ -56,28 +56,29 @@ tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("python_otlp_telemetry_app") as segment: - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="db-identifier-postgres.XYZ.us-east-2.rds.amazonaws.com", - dbname="test_db", - user="user", - password="password", - plugins="failover,host_monitoring", - wrapper_dialect="aurora-pg", - autocommit=True, - enable_telemetry=True, - telemetry_submit_toplevel=False, - telemetry_traces_backend="OTLP", - telemetry_metrics_backend="OTLP", - telemetry_failover_additional_top_trace=True - ) as awsconn: - awscursor = awsconn.cursor() - awscursor.execute(SQL_DBLIST) - res = awscursor.fetchall() - for record in res: - print(record) + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="db-identifier-postgres.XYZ.us-east-2.rds.amazonaws.com", + dbname="test_db", + user="user", + password="password", + plugins="failover,host_monitoring", + wrapper_dialect="aurora-pg", + autocommit=True, + enable_telemetry=True, + telemetry_submit_toplevel=False, + telemetry_traces_backend="OTLP", + telemetry_metrics_backend="OTLP", + telemetry_failover_additional_top_trace=True + ) as awsconn: + awscursor = awsconn.cursor() + awscursor.execute(SQL_DBLIST) + res = awscursor.fetchall() + for record in res: + print(record) + finally: + # Clean up global resources created by wrapper + release_resources() print("-- end of application") - - # Clean up any remaining resources created by the plugins. - release_resources() diff --git a/docs/examples/PGReadWriteSplitting.py b/docs/examples/PGReadWriteSplitting.py index b345b295c..1e1b26a9e 100644 --- a/docs/examples/PGReadWriteSplitting.py +++ b/docs/examples/PGReadWriteSplitting.py @@ -144,5 +144,5 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O """ If connection pools were enabled, close them here """ ConnectionProviderManager.release_resources() - # Clean up any remaining resources created by the plugins. - release_resources() + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGSecretsManager.py b/docs/examples/PGSecretsManager.py index c65d465aa..f6126b380 100644 --- a/docs/examples/PGSecretsManager.py +++ b/docs/examples/PGSecretsManager.py @@ -19,17 +19,18 @@ from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources if __name__ == "__main__": - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - dbname="postgres", - secrets_manager_secret_id="arn:aws:secretsmanager:::secret:Secre78tName-6RandomCharacters", - secrets_manager_region="us-east-2", - plugins="aws_secrets_manager" - ) as awsconn, awsconn.cursor() as cursor: - cursor.execute("SELECT pg_catalog.aurora_db_instance_identifier()") - for record in cursor.fetchone(): - print(record) - - # Clean up any remaining resources created by the plugins. - release_resources() + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + dbname="postgres", + secrets_manager_secret_id="arn:aws:secretsmanager:::secret:Secre78tName-6RandomCharacters", + secrets_manager_region="us-east-2", + plugins="aws_secrets_manager" + ) as awsconn, awsconn.cursor() as cursor: + cursor.execute("SELECT pg_catalog.aurora_db_instance_identifier()") + for record in cursor.fetchone(): + print(record) + finally: + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGSimpleReadWriteSplitting.py b/docs/examples/PGSimpleReadWriteSplitting.py index d13f91f7f..944cdee06 100644 --- a/docs/examples/PGSimpleReadWriteSplitting.py +++ b/docs/examples/PGSimpleReadWriteSplitting.py @@ -117,6 +117,5 @@ def execute_queries_with_failover_handling(conn: Connection, sql: str, params: O finally: with AwsWrapperConnection.connect(psycopg.Connection.connect, **params) as conn: execute_queries_with_failover_handling(conn, "DROP TABLE bank_test") - - # Clean up any remaining resources created by the plugins. - release_resources() + # Clean up global resources created by wrapper + release_resources() diff --git a/docs/examples/PGXRayTelemetry.py b/docs/examples/PGXRayTelemetry.py index e6e218e4f..56f6365a2 100644 --- a/docs/examples/PGXRayTelemetry.py +++ b/docs/examples/PGXRayTelemetry.py @@ -33,27 +33,28 @@ global_sdk_config.set_sdk_enabled(True) with xray_recorder.in_segment("python_xray_telemetry_app") as segment: - with AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="db-identifier-postgres.XYZ.us-east-2.rds.amazonaws.com", - dbname="test_db", - user="user", - password="password", - plugins="failover,host_monitoring", - wrapper_dialect="aurora-pg", - autocommit=True, - enable_telemetry=True, - telemetry_submit_toplevel=False, - telemetry_traces_backend="XRAY", - telemetry_metrics_backend="NONE" - ) as awsconn: - awscursor = awsconn.cursor() - awscursor.execute(SQL_DBLIST) - res = awscursor.fetchall() - for record in res: - print(record) + try: + with AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="db-identifier-postgres.XYZ.us-east-2.rds.amazonaws.com", + dbname="test_db", + user="user", + password="password", + plugins="failover,host_monitoring", + wrapper_dialect="aurora-pg", + autocommit=True, + enable_telemetry=True, + telemetry_submit_toplevel=False, + telemetry_traces_backend="XRAY", + telemetry_metrics_backend="NONE" + ) as awsconn: + awscursor = awsconn.cursor() + awscursor.execute(SQL_DBLIST) + res = awscursor.fetchall() + for record in res: + print(record) + finally: + # Clean up global resources created by wrapper + release_resources() print("-- end of application") - - # Clean up any remaining resources created by the plugins. - release_resources() diff --git a/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md b/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md index f1fe4f347..bb4833265 100644 --- a/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md +++ b/docs/using-the-python-driver/using-plugins/UsingTheHostMonitoringPlugin.md @@ -51,24 +51,27 @@ The Host Monitoring Connection Plugin may create new monitoring connections to c ```python import psycopg -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources props = { "monitoring-connect_timeout": 10, "monitoring-socket_timeout": 10 } - -conn = AwsWrapperConnection.connect( - psycopg.Connection.connect, - host="database.cluster-xyz.us-east-1.rds.amazonaws.com", - dbname="postgres", - user="john", - password="pwd", - plugins="host_monitoring", - # Configure the timeout values for all non-monitoring connections. - connect_timeout=30, socket_timeout=30, - # Configure different timeout values for the monitoring connections. - **props) + +try: + conn = AwsWrapperConnection.connect( + psycopg.Connection.connect, + host="database.cluster-xyz.us-east-1.rds.amazonaws.com", + dbname="postgres", + user="john", + password="pwd", + plugins="host_monitoring", + # Configure the timeout values for all non-monitoring connections. + connect_timeout=30, socket_timeout=30, + # Configure different timeout values for the monitoring connections. + **props) +finally: + release_resources() ``` > [!IMPORTANT]\ diff --git a/tests/integration/container/test_aurora_failover.py b/tests/integration/container/test_aurora_failover.py index 5ddf2a69c..2634e4568 100644 --- a/tests/integration/container/test_aurora_failover.py +++ b/tests/integration/container/test_aurora_failover.py @@ -22,8 +22,6 @@ from aws_advanced_python_wrapper.errors import ( FailoverSuccessError, TransactionResolutionUnknownError) -from aws_advanced_python_wrapper.host_monitoring_plugin import \ - MonitoringThreadContainer from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) from .utils.conditions import (disable_on_features, enable_on_deployments, @@ -34,7 +32,7 @@ if TYPE_CHECKING: from .utils.test_instance_info import TestInstanceInfo from .utils.test_driver import TestDriver - +from aws_advanced_python_wrapper import release_resources from aws_advanced_python_wrapper.utils.log import Logger from aws_advanced_python_wrapper.wrapper import AwsWrapperConnection from .utils.driver_helper import DriverHelper @@ -59,7 +57,7 @@ def setup_method(self, request): self.logger.info(f"Starting test: {request.node.name}") yield self.logger.info(f"Ending test: {request.node.name}") - MonitoringThreadContainer.clean_up() + release_resources() gc.collect() @pytest.fixture(scope='class') diff --git a/tests/integration/container/test_basic_connectivity.py b/tests/integration/container/test_basic_connectivity.py index 4de78f573..ddce9741a 100644 --- a/tests/integration/container/test_basic_connectivity.py +++ b/tests/integration/container/test_basic_connectivity.py @@ -16,8 +16,7 @@ from typing import TYPE_CHECKING -from aws_advanced_python_wrapper.host_monitoring_plugin import \ - MonitoringThreadContainer +from aws_advanced_python_wrapper import release_resources if TYPE_CHECKING: from .utils.test_instance_info import TestInstanceInfo @@ -150,4 +149,4 @@ def test_wrapper_connection_reader_cluster_with_efm_enabled(self, test_driver: T conn.close() - MonitoringThreadContainer.clean_up() + release_resources() diff --git a/tests/integration/container/test_read_write_splitting.py b/tests/integration/container/test_read_write_splitting.py index 781423e23..24a6e4856 100644 --- a/tests/integration/container/test_read_write_splitting.py +++ b/tests/integration/container/test_read_write_splitting.py @@ -17,15 +17,13 @@ import pytest from sqlalchemy import PoolProxiedConnection -from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper import AwsWrapperConnection, release_resources from aws_advanced_python_wrapper.connection_provider import \ ConnectionProviderManager from aws_advanced_python_wrapper.errors import ( AwsWrapperError, FailoverFailedError, FailoverSuccessError, ReadWriteSplittingError, TransactionResolutionUnknownError) from aws_advanced_python_wrapper.host_list_provider import RdsHostListProvider -from aws_advanced_python_wrapper.host_monitoring_plugin import \ - MonitoringThreadContainer from aws_advanced_python_wrapper.sql_alchemy_connection_provider import \ SqlAlchemyPooledConnectionProvider from aws_advanced_python_wrapper.utils.log import Logger @@ -63,7 +61,7 @@ def setup_method(self, request): yield self.logger.info(f"Ending test: {request.node.name}") - MonitoringThreadContainer.clean_up() + release_resources() gc.collect() # Plugin configurations diff --git a/tests/unit/test_monitor.py b/tests/unit/test_monitor.py index 831660e83..9eca560ec 100644 --- a/tests/unit/test_monitor.py +++ b/tests/unit/test_monitor.py @@ -18,11 +18,10 @@ import psycopg import pytest +from aws_advanced_python_wrapper import release_resources from aws_advanced_python_wrapper.host_monitoring_plugin import ( Monitor, MonitoringContext, MonitoringThreadContainer) from aws_advanced_python_wrapper.hostinfo import HostInfo -from aws_advanced_python_wrapper.thread_pool_container import \ - ThreadPoolContainer from aws_advanced_python_wrapper.utils.properties import (Properties, WrapperProperties) @@ -85,8 +84,7 @@ def monitor(mock_plugin_service, host_info, props): def release_container(): yield while MonitoringThreadContainer._instance is not None: - MonitoringThreadContainer.clean_up() - ThreadPoolContainer.release_resources() + release_resources() @pytest.fixture @@ -233,7 +231,7 @@ def test_run__no_contexts(mocker, monitor): assert container._monitor_map.get(host_alias) is None assert container._tasks_map.get(monitor) is None - MonitoringThreadContainer.clean_up() + release_resources() def test_check_connection_status__valid_then_invalid(mocker, monitor): diff --git a/tests/unit/test_monitor_service.py b/tests/unit/test_monitor_service.py index 40a0fd169..6ce28f2a5 100644 --- a/tests/unit/test_monitor_service.py +++ b/tests/unit/test_monitor_service.py @@ -16,6 +16,7 @@ import pytest from _weakref import ref +from aws_advanced_python_wrapper import release_resources from aws_advanced_python_wrapper.errors import AwsWrapperError from aws_advanced_python_wrapper.host_monitoring_plugin import ( MonitoringThreadContainer, MonitorService) @@ -73,7 +74,7 @@ def setup_teardown(mocker, mock_thread_container, mock_plugin_service, mock_moni yield while MonitoringThreadContainer._instance is not None: - MonitoringThreadContainer.clean_up() + release_resources() def test_start_monitoring( diff --git a/tests/unit/test_monitoring_thread_container.py b/tests/unit/test_monitoring_thread_container.py index 7c7c2a690..4e3dac25b 100644 --- a/tests/unit/test_monitoring_thread_container.py +++ b/tests/unit/test_monitoring_thread_container.py @@ -14,6 +14,7 @@ import pytest +from aws_advanced_python_wrapper import release_resources from aws_advanced_python_wrapper.errors import AwsWrapperError from aws_advanced_python_wrapper.host_monitoring_plugin import \ MonitoringThreadContainer @@ -61,7 +62,7 @@ def mock_monitor_supplier(mocker, mock_monitor1, mock_monitor2): def release_container(): yield while MonitoringThreadContainer._instance is not None: - MonitoringThreadContainer.clean_up() + release_resources() def test_get_or_create_monitor__monitor_created( @@ -172,3 +173,4 @@ def test_release_instance(mocker, container, mock_monitor1, mock_future): assert 0 == len(container._tasks_map) mock_future.cancel.assert_called_once() assert MonitoringThreadContainer._instance is None + release_resources() diff --git a/tests/unit/test_multithreaded_monitor_service.py b/tests/unit/test_multithreaded_monitor_service.py index db24d1153..26cb91605 100644 --- a/tests/unit/test_multithreaded_monitor_service.py +++ b/tests/unit/test_multithreaded_monitor_service.py @@ -20,6 +20,7 @@ import psycopg import pytest +from aws_advanced_python_wrapper import release_resources from aws_advanced_python_wrapper.host_monitoring_plugin import ( MonitoringContext, MonitoringThreadContainer, MonitorService) from aws_advanced_python_wrapper.hostinfo import HostInfo @@ -110,7 +111,7 @@ def verify_concurrency(mock_monitor, mock_executor, mock_future, counter, concur assert concurrent_counter.get() > 0 concurrent_counter.set(0) - MonitoringThreadContainer.clean_up() + release_resources() def test_start_monitoring__connections_to_different_hosts( @@ -137,7 +138,7 @@ def test_start_monitoring__connections_to_different_hosts( expected_create_monitor_calls = [mocker.call(host_info, props, MonitoringThreadContainer())] * num_conns mock_create_monitor.assert_has_calls(expected_create_monitor_calls) finally: - release_resources(services) + release_service_resource(services) def test_start_monitoring__connections_to_same_host( @@ -164,7 +165,7 @@ def test_start_monitoring__connections_to_same_host( expected_create_monitor_calls = [mocker.call(host_info, props, MonitoringThreadContainer())] mock_create_monitor.assert_has_calls(expected_create_monitor_calls) finally: - release_resources(services) + release_service_resource(services) def test_stop_monitoring__connections_to_different_hosts( @@ -186,7 +187,7 @@ def test_stop_monitoring__connections_to_different_hosts( expected_stop_monitoring_calls = [mocker.call(context) for context in contexts] mock_monitor.stop_monitoring.assert_has_calls(expected_stop_monitoring_calls, True) finally: - release_resources(services) + release_service_resource(services) def test_stop_monitoring__connections_to_same_host( @@ -208,7 +209,7 @@ def test_stop_monitoring__connections_to_same_host( expected_stop_monitoring_calls = [mocker.call(context) for context in contexts] mock_monitor.stop_monitoring.assert_has_calls(expected_stop_monitoring_calls, True) finally: - release_resources(services) + release_service_resource(services) def generate_host_aliases(num_aliases: int, generate_unique_aliases: bool) -> List[FrozenSet[str]]: @@ -247,7 +248,7 @@ def _generate_contexts(num_contexts: int, generate_unique_contexts) -> List[Moni return _generate_contexts -def release_resources(services): +def release_service_resource(services): for service in services: service.release_resources()