Skip to content

feat: Refactor AWS plugin with pool connection and async session #1849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Apr 6, 2025

Conversation

jayeshp19
Copy link
Contributor

@jayeshp19 jayeshp19 commented Apr 1, 2025

clean up over pr #1837

Copy link
Contributor

github-actions bot commented Apr 1, 2025

⚠️ Changeset Required

We detected changes in the following package(s) but no changeset file was found. Please add one for proper versioning:

  • livekit-plugins-aws

👉 Create a changeset file by clicking here.

@jayeshp19 jayeshp19 requested a review from longcw April 1, 2025 18:03
@andormarkus
Copy link
Contributor

andormarkus commented Apr 1, 2025

Hi @jayeshp19

I got two PR because internals of Livekit changed from 0.x to 1.x like renaming on the _utils.py.

This PR #1849 is addressing 1.x version and not backwards compatible.

The implemented get_aws_credentials will not work in AWS environment and should be removed. You should simple get a session.
Depending the execution environment session.get_credentials() will return optionally aws_session_token as well.

You can move the region check into utils to make the code DRY

        if not self._session:
            self._region = (
                region
                or os.environ.get("AWS_REGION")
                or os.environ.get("AWS_DEFAULT_REGION")
                or DEFAULT_REGION
            )

@jayeshp19
Copy link
Contributor Author

jayeshp19 commented Apr 1, 2025

The implemented get_aws_credentials will not work in AWS environment and should be removed. You should simple get a session. Depending the execution environment session.get_credentials() will return optionally aws_session_token as well.

You can move the region check into utils to make the code DRY

the purpose of get_aws_credentials is to just validate credentials. we're getting session separately from get_aws_async_session

are you getting any errors, if so can you share them?

@andormarkus
Copy link
Contributor

The get_aws_credentials implementation is problematic for AWS environments using temporary credentials (EC2 instance profiles, EKS pod identity, or explicit STS credentials) for two major reasons:

  1. Credential freezing: By calling session.get_credentials() at initialization time, you're extracting temporary credentials at a single point in time. These will eventually expire (typically after 1 hour), with no mechanism to refresh them.
  2. Missing session token: When temporary credentials are used in AWS, they include a session token alongside the access key and secret key. This implementation drops the session token, which will cause authentication failures.

A better approach for credential validation would be to use get_caller_identity() from the STS service:

def validate_aws_credentials(api_key=None, api_secret=None, session_token=None, region=None):
    """Validate AWS credentials by attempting to call STS get_caller_identity."""
    try:
        session = boto3.Session(
            aws_access_key_id=api_key,
            aws_secret_access_key=api_secret,
            aws_session_token=session_token,
            region_name=region or DEFAULT_REGION
        )
        sts = session.client('sts')
        identity = sts.get_caller_identity()
        return True, identity
    except Exception as e:
        return False, str(e)

This approach:

  • Validates credentials without extracting and freezing them
  • Returns the identity details which can be useful for debugging
  • Allows boto3's credential provider chain to handle credential refreshing naturally

@andormarkus
Copy link
Contributor

Here is full test code. You can run it EC2 or CloudShell

import boto3
import time
import botocore

# This simulates their current implementation
def get_aws_credentials():
    # Gets and "freezes" credentials at a point in time
    session = boto3.Session()
    creds = session.get_credentials()
    print(f"Got initial credentials: {creds.access_key[:4]}...{creds.secret_key[:4]}... (token present: {'Yes' if creds.token else 'No'})")
    return creds

# Extract credentials at the beginning (like in their __init__)
frozen_creds = get_aws_credentials()

# Function that uses the frozen credentials
def test_with_frozen_creds():
    try:
        # Create a new session with only the access key and secret key (dropping token)
        print(f"Using frozen credentials from earlier")
        session = boto3.Session(
            aws_access_key_id=frozen_creds.access_key,
            aws_secret_access_key=frozen_creds.secret_key
            # Notice: no token is passed here
        )
        sts = session.client('sts')
        identity = sts.get_caller_identity()
        print(f"Success! Identity: {identity['Arn']}")
        return True
    except Exception as e:
        print(f"Failed with error: {str(e)}")
        return False

# Function using credential provider chain properly
def test_with_provider_chain():
    try:
        print(f"Using credential provider chain")
        session = boto3.Session()
        sts = session.client('sts')
        identity = sts.get_caller_identity()
        print(f"Success! Identity: {identity['Arn']}")
        return True
    except Exception as e:
        print(f"Failed with error: {str(e)}")
        return False

print("Testing immediately after extraction:")
test_with_frozen_creds()
test_with_provider_chain()

Here are my results

~ $ python test.py 
Got initial credentials: ASIA...B6DW... (token present: Yes)
Testing immediately after extraction:
Using frozen credentials from earlier
Failed with error: An error occurred (InvalidClientTokenId) when calling the GetCallerIdentity operation: The security token included in the request is invalid.
Using credential provider chain
Success! Identity: arn:aws:sts::<MY_ACCOUNT>:assumed-role/myrole

@jayeshp19
Copy link
Contributor Author

The get_aws_credentials implementation is problematic for AWS environments using temporary credentials (EC2 instance profiles, EKS pod identity, or explicit STS credentials) for two major reasons:

  1. Credential freezing: By calling session.get_credentials() at initialization time, you're extracting temporary credentials at a single point in time. These will eventually expire (typically after 1 hour), with no mechanism to refresh them.
  2. Missing session token: When temporary credentials are used in AWS, they include a session token alongside the access key and secret key. This implementation drops the session token, which will cause authentication failures.

A better approach for credential validation would be to use get_caller_identity() from the STS service:

def validate_aws_credentials(api_key=None, api_secret=None, session_token=None, region=None):
    """Validate AWS credentials by attempting to call STS get_caller_identity."""
    try:
        session = boto3.Session(
            aws_access_key_id=api_key,
            aws_secret_access_key=api_secret,
            aws_session_token=session_token,
            region_name=region or DEFAULT_REGION
        )
        sts = session.client('sts')
        identity = sts.get_caller_identity()
        return True, identity
    except Exception as e:
        return False, str(e)

This approach:

  • Validates credentials without extracting and freezing them
  • Returns the identity details which can be useful for debugging
  • Allows boto3's credential provider chain to handle credential refreshing naturally

I've made some changes to make it less confusing

  • validate_aws_credentials() only validates credentials, does not return credential object
  • get_aws_async_session will create a new session and that will be used for all tasks

@andormarkus
Copy link
Contributor

While reviewing the code, I noticed that our implementation for AWS credential handling might not work correctly with long-term AWS credentials (direct access/secret key pairs without session tokens).

In the STT class's _create_client method, we're unconditionally passing frozen_credentials.token to the StaticCredentialResolver:

return TranscribeStreamingClient(
    region=self._config.region,
    credential_resolver=StaticCredentialResolver(
        access_key_id=frozen_credentials.access_key,
        secret_access_key=frozen_credentials.secret_key,
        session_token=frozen_credentials.token,  # This could be None with long-term credentials
    ),
)

Long-term AWS credentials don't include a token component, which is only present when using temporary credentials from environment variables, IAM roles, or AWS STS.

Suggested Fix
Since I don't have the ability to test with long-term credentials right now, could we make the code more robust by conditionally including the session token only when it exists?

resolver_args = {
    "access_key_id": frozen_credentials.access_key,
    "secret_access_key": frozen_credentials.secret_key,
}
if frozen_credentials.token:
    resolver_args["session_token"] = frozen_credentials.token

return TranscribeStreamingClient(
    region=self._config.region,
    credential_resolver=StaticCredentialResolver(**resolver_args),
)

This approach would ensure compatibility with both temporary and long-term credentials without requiring us to test every credential type directly.

@jayeshp19
Copy link
Contributor Author

While reviewing the code, I noticed that our implementation for AWS credential handling might not work correctly with long-term AWS credentials (direct access/secret key pairs without session tokens).

In the STT class's _create_client method, we're unconditionally passing frozen_credentials.token to the StaticCredentialResolver:

return TranscribeStreamingClient(
    region=self._config.region,
    credential_resolver=StaticCredentialResolver(
        access_key_id=frozen_credentials.access_key,
        secret_access_key=frozen_credentials.secret_key,
        session_token=frozen_credentials.token,  # This could be None with long-term credentials
    ),
)

Long-term AWS credentials don't include a token component, which is only present when using temporary credentials from environment variables, IAM roles, or AWS STS.

Suggested Fix Since I don't have the ability to test with long-term credentials right now, could we make the code more robust by conditionally including the session token only when it exists?

resolver_args = {
    "access_key_id": frozen_credentials.access_key,
    "secret_access_key": frozen_credentials.secret_key,
}
if frozen_credentials.token:
    resolver_args["session_token"] = frozen_credentials.token

return TranscribeStreamingClient(
    region=self._config.region,
    credential_resolver=StaticCredentialResolver(**resolver_args),
)

This approach would ensure compatibility with both temporary and long-term credentials without requiring us to test every credential type directly.

This will already be handled by StaticCredentialResolver

@andormarkus
Copy link
Contributor

LGTM for Livekit version 1.x

@jayeshp19 jayeshp19 merged commit f9df4fe into main Apr 6, 2025
6 of 7 checks passed
@jayeshp19 jayeshp19 deleted the pr-1722 branch April 6, 2025 10:38
@jayeshp19 jayeshp19 restored the pr-1722 branch April 6, 2025 10:48
davidzhao pushed a commit that referenced this pull request Apr 8, 2025
jayesh-mivi pushed a commit to mivi-dev-org/custom-livekit-agents that referenced this pull request Jun 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants