Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions tests/integration/container/test_aws_secrets_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
from typing import Callable
from uuid import uuid4

import boto3
import pytest

from aws_advanced_python_wrapper import AwsWrapperConnection
from aws_advanced_python_wrapper.errors import (AwsWrapperError,
FailoverSuccessError)
from aws_advanced_python_wrapper.utils.properties import Properties
from .utils.conditions import (disable_on_features, enable_on_deployments,
enable_on_features, enable_on_num_instances)
from .utils.database_engine_deployment import DatabaseEngineDeployment
from .utils.driver_helper import DriverHelper
from .utils.rds_test_utility import RdsTestUtility
from .utils.test_environment import TestEnvironment
from .utils.test_environment_features import TestEnvironmentFeatures


@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
class TestAwsSecretsManager:
"""Test class for AWS Secrets Manager authentication"""

@pytest.fixture(scope='class')
def props(self):
return Properties({
"plugins": "aws_secrets_manager",
"socket_timeout": 10,
"connect_timeout": 10
})

@pytest.fixture(scope='class')
def create_secret(self, conn_utils):
"""Create a secret in AWS Secrets Manager with database credentials."""
region = TestEnvironment.get_current().get_info().get_region()
client = boto3.client('secretsmanager', region_name=region)
env = TestEnvironment.get_current()

secret_name = f"TestSecret-{uuid4()}"

engine = "postgres" if env.get_engine() == "pg" else "mysql"
secret_value = {
"engine": engine,
"dbname": env.get_info().get_database_info().get_default_db_name(),
"host": env.get_info().get_database_info().get_cluster_endpoint(),
"username": conn_utils.user,
"password": conn_utils.password,
"description": "Test secret generated by integration tests."
}

try:
response = client.create_secret(
Name=secret_name,
SecretString=json.dumps(secret_value)
)
secret_arn = response['ARN']
yield secret_name, secret_arn
finally:
try:
client.delete_secret(
SecretId=secret_name,
ForceDeleteWithoutRecovery=True
)
except Exception:
pass

def test_connection(self, test_driver, conn_utils, create_secret, props):
"""Test basic connection using AWS Secrets Manager."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
secret_name, _ = create_secret
region = TestEnvironment.get_current().get_info().get_region()

props.update({
"secrets_manager_secret_id": secret_name,
"secrets_manager_region": region
})

self.validate_connection(
target_driver_connect,
**conn_utils.get_connect_params(user="IncorrectUser", password="IncorrectPassword"),
**props
)

def test_connect_with_arn(self, test_driver, conn_utils, create_secret, props):
"""Test connection using secret ARN."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
_, secret_arn = create_secret

props.update({
"secrets_manager_secret_id": secret_arn
})

self.validate_connection(
target_driver_connect,
**conn_utils.get_connect_params(user="IncorrectUser", password="IncorrectPassword"),
**props
)

def test_incorrect_secret_id(self, test_driver, conn_utils, props):
"""Test connection with incorrect secret ID should fail."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
region = TestEnvironment.get_current().get_info().get_region()

props.update({
"secrets_manager_secret_id": "incorrectSecretId",
"secrets_manager_region": region
})

with pytest.raises(AwsWrapperError):
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_connect_params(),
**props
) as conn:
conn.cursor()

def test_missing_secret_id(self, test_driver, conn_utils, props):
"""Test connection with missing secret ID should fail."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
region = TestEnvironment.get_current().get_info().get_region()

props.update({
"secrets_manager_region": region
})

with pytest.raises(AwsWrapperError):
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"),
**props
) as conn:
conn.cursor()

def test_invalid_region(self, test_driver, conn_utils, create_secret, props):
"""Test connection with invalid region should fail."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
secret_name, _ = create_secret

props.update({
"secrets_manager_secret_id": secret_name,
"secrets_manager_region": "invalidRegion"
})

with pytest.raises(AwsWrapperError):
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"),
**props
) as conn:
conn.cursor()

def test_missing_region(self, test_driver, conn_utils, create_secret, props):
"""Test connection with missing region should fail."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
secret_name, _ = create_secret

props.update({
"secrets_manager_secret_id": secret_name
})

with pytest.raises(AwsWrapperError):
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"),
**props
) as conn:
conn.cursor()

def test_incorrect_region(self, test_driver, conn_utils, create_secret, props):
"""Test connection with incorrect region should fail."""
target_driver_connect = DriverHelper.get_connect_func(test_driver)
secret_name, _ = create_secret

props.update({
"secrets_manager_secret_id": secret_name,
"secrets_manager_region": "ca-central-1"
})

with pytest.raises(AwsWrapperError):
with AwsWrapperConnection.connect(
target_driver_connect,
**conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"),
**props
) as conn:
conn.cursor()

@enable_on_num_instances(min_instances=2)
@disable_on_features([TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY,
TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT,
TestEnvironmentFeatures.PERFORMANCE])
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.IAM])
def test_failover_with_secrets_manager(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this test to the failover file - esp since the iam test is there instead of in the iam file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to keep this one here since we create the secrets during the test once during the test class and remove it. If we moved it to the failover file then we would have to create a secret there.

I could move the IAM one though to the IAM tests with failover to make it more consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could work, will require adding the timeout props into that test when moving it over

self, test_driver, props, conn_utils, create_secret):
region = TestEnvironment.get_current().get_info().get_region()
aurora_utility = RdsTestUtility(region)
target_driver_connect = DriverHelper.get_connect_func(test_driver)
initial_writer_id = aurora_utility.get_cluster_writer_instance_id()
secret_name, _ = create_secret

props.update({
"plugins": "failover,aws_secrets_manager",
"secrets_manager_secret_id": secret_name,
"secrets_manager_region": region
})

with AwsWrapperConnection.connect(
target_driver_connect, **conn_utils.get_connect_params(user="incorrectUser", password="incorrectPassword"), **props) as aws_conn:
aurora_utility.failover_cluster_and_wait_until_writer_changed()

aurora_utility.assert_first_query_throws(aws_conn, FailoverSuccessError)

current_connection_id = aurora_utility.query_instance_id(aws_conn)
assert aurora_utility.is_db_instance_writer(current_connection_id) is True
assert current_connection_id != initial_writer_id

def validate_connection(self, target_driver_connect: Callable, **connect_params):
with AwsWrapperConnection.connect(target_driver_connect, **connect_params) as conn, \
conn.cursor() as cursor:
cursor.execute("SELECT 1")
records = cursor.fetchall()
assert len(records) == 1
47 changes: 44 additions & 3 deletions tests/integration/container/test_iam_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
import pytest

from aws_advanced_python_wrapper import AwsWrapperConnection
from aws_advanced_python_wrapper.errors import AwsWrapperError
from tests.integration.container.utils.conditions import (disable_on_features,
enable_on_features)
from aws_advanced_python_wrapper.errors import (AwsWrapperError,
FailoverSuccessError)
from tests.integration.container.utils.conditions import (
disable_on_features, enable_on_deployments, enable_on_features,
enable_on_num_instances)
from tests.integration.container.utils.database_engine_deployment import \
DatabaseEngineDeployment
from tests.integration.container.utils.driver_helper import DriverHelper
from tests.integration.container.utils.rds_test_utility import RdsTestUtility
from tests.integration.container.utils.test_environment import TestEnvironment


Expand Down Expand Up @@ -125,6 +130,42 @@ def test_iam_valid_connection_properties_no_password(

self.validate_connection(target_driver_connect, **params, **props)

@enable_on_num_instances(min_instances=2)
@enable_on_deployments([DatabaseEngineDeployment.AURORA, DatabaseEngineDeployment.RDS_MULTI_AZ_CLUSTER])
@disable_on_features([TestEnvironmentFeatures.RUN_AUTOSCALING_TESTS_ONLY,
TestEnvironmentFeatures.BLUE_GREEN_DEPLOYMENT,
TestEnvironmentFeatures.PERFORMANCE])
@enable_on_features([TestEnvironmentFeatures.FAILOVER_SUPPORTED, TestEnvironmentFeatures.IAM])
def test_failover_with_iam(
self, test_environment: TestEnvironment, test_driver: TestDriver, props, conn_utils):
target_driver_connect = DriverHelper.get_connect_func(test_driver)
region = TestEnvironment.get_current().get_info().get_region()
aurora_utility = RdsTestUtility(region)
initial_writer_id = aurora_utility.get_cluster_writer_instance_id()

props.update({
"plugins": "failover,iam",
"socket_timeout": 10,
"connect_timeout": 10,
"monitoring-connect_timeout": 5,
"monitoring-socket_timeout": 5,
"topology_refresh_ms": 10,
"autocommit": True
})

with AwsWrapperConnection.connect(
target_driver_connect, **conn_utils.get_connect_params(user=conn_utils.iam_user), **props) as aws_conn:
# crash instance1 and nominate a new writer
aurora_utility.failover_cluster_and_wait_until_writer_changed()

# failure occurs on Cursor invocation
aurora_utility.assert_first_query_throws(aws_conn, FailoverSuccessError)

# assert that we are connected to the new writer after failover happens and we can reuse the cursor
current_connection_id = aurora_utility.query_instance_id(aws_conn)
assert aurora_utility.is_db_instance_writer(current_connection_id) is True
assert current_connection_id != initial_writer_id

def get_ip_address(self, hostname: str):
return gethostbyname(hostname)

Expand Down
Loading