diff --git a/tests/integration/container/test_aws_secrets_manager.py b/tests/integration/container/test_aws_secrets_manager.py new file mode 100644 index 00000000..62db36f9 --- /dev/null +++ b/tests/integration/container/test_aws_secrets_manager.py @@ -0,0 +1,236 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Callable +from uuid import uuid4 + +import boto3 +import pytest + +from aws_advanced_python_wrapper import AwsWrapperConnection +from aws_advanced_python_wrapper.errors import (AwsWrapperError, + FailoverSuccessError) +from aws_advanced_python_wrapper.utils.properties import Properties +from .utils.conditions import (disable_on_features, enable_on_deployments, + enable_on_features, enable_on_num_instances) +from .utils.database_engine_deployment import DatabaseEngineDeployment +from .utils.driver_helper import DriverHelper +from .utils.rds_test_utility import RdsTestUtility +from .utils.test_environment import TestEnvironment +from .utils.test_environment_features import TestEnvironmentFeatures + + +@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER]) +class TestAwsSecretsManager: + """Test class for AWS Secrets Manager authentication""" + + @pytest.fixture(scope='class') + def props(self): + return Properties({ + "plugins": "aws_secrets_manager", + "socket_timeout": 10, + "connect_timeout": 10 + }) + + @pytest.fixture(scope='class') + def create_secret(self, conn_utils): + """Create a secret in AWS Secrets Manager with database credentials.""" + region = TestEnvironment.get_current().get_info().get_region() + client = boto3.client('secretsmanager', region_name=region) + env = TestEnvironment.get_current() + + secret_name = f"TestSecret-{uuid4()}" + + engine = "postgres" if env.get_engine() == "pg" else "mysql" + secret_value = { + "engine": engine, + "dbname": env.get_info().get_database_info().get_default_db_name(), + "host": env.get_info().get_database_info().get_cluster_endpoint(), + "username": conn_utils.user, + "password": conn_utils.password, + "description": "Test secret generated by integration tests." + } + + try: + response = client.create_secret( + Name=secret_name, + SecretString=json.dumps(secret_value) + ) + secret_arn = response['ARN'] + yield secret_name, secret_arn + finally: + try: + client.delete_secret( + SecretId=secret_name, + ForceDeleteWithoutRecovery=True + ) + except Exception: + pass + + def test_connection(self, test_driver, conn_utils, create_secret, props): + """Test basic connection using AWS Secrets Manager.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + secret_name, _ = create_secret + region = TestEnvironment.get_current().get_info().get_region() + + props.update({ + "secrets_manager_secret_id": secret_name, + "secrets_manager_region": region + }) + + self.validate_connection( + target_driver_connect, + **conn_utils.get_connect_params(user="IncorrectUser", password="IncorrectPassword"), + **props + ) + + def test_connect_with_arn(self, test_driver, conn_utils, create_secret, props): + """Test connection using secret ARN.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + _, secret_arn = create_secret + + props.update({ + "secrets_manager_secret_id": secret_arn + }) + + self.validate_connection( + target_driver_connect, + **conn_utils.get_connect_params(user="IncorrectUser", password="IncorrectPassword"), + **props + ) + + def test_incorrect_secret_id(self, test_driver, conn_utils, props): + """Test connection with incorrect secret ID should fail.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + region = TestEnvironment.get_current().get_info().get_region() + + props.update({ + "secrets_manager_secret_id": "incorrectSecretId", + "secrets_manager_region": region + }) + + with pytest.raises(AwsWrapperError): + with AwsWrapperConnection.connect( + target_driver_connect, + **conn_utils.get_connect_params(), + **props + ) as conn: + conn.cursor() + + def test_missing_secret_id(self, test_driver, conn_utils, props): + """Test connection with missing secret ID should fail.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + region = TestEnvironment.get_current().get_info().get_region() + + props.update({ + "secrets_manager_region": region + }) + + with pytest.raises(AwsWrapperError): + with AwsWrapperConnection.connect( + target_driver_connect, + **conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"), + **props + ) as conn: + conn.cursor() + + def test_invalid_region(self, test_driver, conn_utils, create_secret, props): + """Test connection with invalid region should fail.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + secret_name, _ = create_secret + + props.update({ + "secrets_manager_secret_id": secret_name, + "secrets_manager_region": "invalidRegion" + }) + + with pytest.raises(AwsWrapperError): + with AwsWrapperConnection.connect( + target_driver_connect, + **conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"), + **props + ) as conn: + conn.cursor() + + def test_missing_region(self, test_driver, conn_utils, create_secret, props): + """Test connection with missing region should fail.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + secret_name, _ = create_secret + + props.update({ + "secrets_manager_secret_id": secret_name + }) + + with pytest.raises(AwsWrapperError): + with AwsWrapperConnection.connect( + target_driver_connect, + **conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"), + **props + ) as conn: + conn.cursor() + + def test_incorrect_region(self, test_driver, conn_utils, create_secret, props): + """Test connection with incorrect region should fail.""" + target_driver_connect = DriverHelper.get_connect_func(test_driver) + secret_name, _ = create_secret + + props.update({ + "secrets_manager_secret_id": secret_name, + "secrets_manager_region": "ca-central-1" + }) + + with pytest.raises(AwsWrapperError): + with AwsWrapperConnection.connect( + target_driver_connect, + **conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"), + **props + ) as conn: + conn.cursor() + + @enable_on_num_instances(min_instances=2) + @disable_on_features([TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY, + TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT, + TestEnvironmentFeatures.PERFORMANCE]) + @enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.IAM]) + def test_failover_with_secrets_manager( + self, test_driver, props, conn_utils, create_secret): + region = TestEnvironment.get_current().get_info().get_region() + aurora_utility = RdsTestUtility(region) + target_driver_connect = DriverHelper.get_connect_func(test_driver) + initial_writer_id = aurora_utility.get_cluster_writer_instance_id() + secret_name, _ = create_secret + + props.update({ + "plugins": "failover,aws_secrets_manager", + "secrets_manager_secret_id": secret_name, + "secrets_manager_region": region + }) + + with AwsWrapperConnection.connect( + target_driver_connect, **conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"), **props) as aws_conn: + aurora_utility.failover_cluster_and_wait_until_writer_changed() + + aurora_utility.assert_first_query_throws(aws_conn, FailoverSuccessError) + + current_connection_id = aurora_utility.query_instance_id(aws_conn) + assert aurora_utility.is_db_instance_writer(current_connection_id) is True + assert current_connection_id != initial_writer_id + + def validate_connection(self, target_driver_connect: Callable, **connect_params): + with AwsWrapperConnection.connect(target_driver_connect, **connect_params) as conn, \ + conn.cursor() as cursor: + cursor.execute("SELECT 1") + records = cursor.fetchall() + assert len(records) == 1 diff --git a/tests/integration/container/test_iam_authentication.py b/tests/integration/container/test_iam_authentication.py index 4bc6ee3d..29c5eb0c 100644 --- a/tests/integration/container/test_iam_authentication.py +++ b/tests/integration/container/test_iam_authentication.py @@ -31,10 +31,15 @@ import pytest from aws_advanced_python_wrapper import AwsWrapperConnection -from aws_advanced_python_wrapper.errors import AwsWrapperError -from tests.integration.container.utils.conditions import (disable_on_features, - enable_on_features) +from aws_advanced_python_wrapper.errors import (AwsWrapperError, + FailoverSuccessError) +from tests.integration.container.utils.conditions import ( + disable_on_features, enable_on_deployments, enable_on_features, + enable_on_num_instances) +from tests.integration.container.utils.database_engine_deployment import \ + DatabaseEngineDeployment from tests.integration.container.utils.driver_helper import DriverHelper +from tests.integration.container.utils.rds_test_utility import RdsTestUtility from tests.integration.container.utils.test_environment import TestEnvironment @@ -125,6 +130,42 @@ def test_iam_valid_connection_properties_no_password( self.validate_connection(target_driver_connect, **params, **props) + @enable_on_num_instances(min_instances=2) + @enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER]) + @disable_on_features([TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY, + TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT, + TestEnvironmentFeatures.PERFORMANCE]) + @enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.IAM]) + def test_failover_with_iam( + self, test_environment: TestEnvironment, test_driver: TestDriver, props, conn_utils): + target_driver_connect = DriverHelper.get_connect_func(test_driver) + region = TestEnvironment.get_current().get_info().get_region() + aurora_utility = RdsTestUtility(region) + initial_writer_id = aurora_utility.get_cluster_writer_instance_id() + + props.update({ + "plugins": "failover,iam", + "socket_timeout": 10, + "connect_timeout": 10, + "monitoring-connect_timeout": 5, + "monitoring-socket_timeout": 5, + "topology_refresh_ms": 10, + "autocommit": True + }) + + with AwsWrapperConnection.connect( + target_driver_connect, **conn_utils.get_connect_params(user=conn_utils.iam_user), **props) as aws_conn: + # crash instance1 and nominate a new writer + aurora_utility.failover_cluster_and_wait_until_writer_changed() + + # failure occurs on Cursor invocation + aurora_utility.assert_first_query_throws(aws_conn, FailoverSuccessError) + + # assert that we are connected to the new writer after failover happens and we can reuse the cursor + current_connection_id = aurora_utility.query_instance_id(aws_conn) + assert aurora_utility.is_db_instance_writer(current_connection_id) is True + assert current_connection_id != initial_writer_id + def get_ip_address(self, hostname: str): return gethostbyname(hostname)