Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink app hosted in AWS trying to publish to GCP and throwing NPE at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource #1538

Open
Ghilherme opened this issue Oct 16, 2024 · 5 comments
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@Ghilherme
Copy link

Environment details

We are using Flink 1.19 with Java 11 hosted on an EC2 in AWS environment and trying to publish messages to GCP pub/sub. We are using Workload Identity Federation to exchange tokens between AWS and GCP.

We are using PubSubSink connector from Flink Docs: https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/pubsub/#pubsub-sink

Our code only generate some mock data and let the connector publish without any complex logic, only for validation.

public class FlinkToPubSub {
    private static final Logger log = LoggerFactory.getLogger(FlinkToPubSub.class.getName());



    public static void main(String[] args) throws Exception {
        System.setProperty("java.net.preferIPv4Stack" , "true");
        log.info("Initializing application...");
        S3Client s3 = S3Client.create();
        String bucketName = "bucketCompanyName";
        String key = "key-aws-gcp-pp.json";
        String localPath = "/opt/response_test_"   LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)  ".json" ;

        s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(key).build(), Paths.get(localPath));

        ExternalAccountCredentials credentials = ExternalAccountCredentials.fromStream(new FileInputStream(localPath));

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1000L);

        DataStream<String> testDataStream = env.addSource(new TestDataGenerator());

        testDataStream.addSink(createPubSink(credentials));

        env.execute("Flink to GCP Pub/Sub Test");
    }

    private static PubSubSink<String> createPubSink(ExternalAccountCredentials credentials) throws IOException {
        String projectName = "project-non-prod";
        String topicName = "topicName";

        return PubSubSink.newBuilder()
                .withSerializationSchema(new SimpleStringSchema())
                .withProjectName(projectName)
                .withTopicName(topicName)
                .withCredentials(credentials).build();
    }

    private static class TestDataGenerator implements SourceFunction<String> {
        private boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) {
      
            while (running) {
                var time = LocalDateTime.now();
                ctx.collect(time  "somemockdatamessage");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

Stack trace

Caused by: java.lang.RuntimeException: Failed trying to publish message
	at org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink$FailureHandler.onFailure(PubSubSink.java:342)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at org.apache.flink.util.concurrent.Executors$DirectExecutor.execute(Executors.java:60)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92)
	at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74)
	at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:51)
	at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.onFailure(Publisher.java:571)
	at com.google.cloud.pubsub.v1.Publisher$OutstandingBatch.access$1900(Publisher.java:538)
	at com.google.cloud.pubsub.v1.Publisher$3.onFailure(Publisher.java:514)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
	at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:200)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
	at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92)
	at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74)
	at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1130)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1298)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1059)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:809)
	at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
	at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
	at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
	at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
	at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
	at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:489)
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:453)
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:486)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: com.google.api.gax.rpc.UnauthenticatedException: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Failed computing credential metadata
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:116)
	at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:41)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:86)
	at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66)
	... 28 more
Caused by: io.grpc.StatusRuntimeException: UNAUTHENTICATED: Failed computing credential metadata
	at io.grpc.Status.asRuntimeException(Status.java:533)
	... 20 more
Caused by: java.lang.NullPointerException
	at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource(InternalAwsSecurityCredentialsSupplier.java:204)
	at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource(InternalAwsSecurityCredentialsSupplier.java:193)
	at com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.getRegion(InternalAwsSecurityCredentialsSupplier.java:151)
	at com.google.auth.oauth2.AwsCredentials.retrieveSubjectToken(AwsCredentials.java:138)
	at com.google.auth.oauth2.AwsCredentials.refreshAccessToken(AwsCredentials.java:121)
	at com.google.auth.oauth2.OAuth2Credentials$1.call(OAuth2Credentials.java:270)
	at com.google.auth.oauth2.OAuth2Credentials$1.call(OAuth2Credentials.java:267)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at com.google.auth.oauth2.OAuth2Credentials$RefreshTask.run(OAuth2Credentials.java:635)
	... 3 more

The error looks in this line of the code inside this repo:
com.google.auth.oauth2.InternalAwsSecurityCredentialsSupplier.retrieveResource(InternalAwsSecurityCredentialsSupplier.java:204)

How can we solve this?

Thanks!

@Ghilherme
Copy link
Author

pom.xml:

`

4.0.0

<groupId>org.example</groupId>
<artifactId>gcp-pub-sub</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.19.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-gcp-pubsub</artifactId>
        <version>3.1.0-1.19</version>
        <scope>compile</scope>
    </dependency>

    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>s3</artifactId>
        <version>2.20.68</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>io.grpc</groupId>
        <artifactId>grpc-netty-shaded</artifactId>
        <version>1.62.2</version> <!-- Use the appropriate version -->
    </dependency>

    <dependency>
        <groupId>com.google.http-client</groupId>
        <artifactId>google-http-client</artifactId>
        <version>1.44.1</version>
    </dependency>

    <dependency>
        <groupId>com.google.http-client</groupId>
        <artifactId>google-http-client-apache-v2</artifactId>
        <version>1.44.1</version>
    </dependency>
</dependencies>



<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.8.1</version>
            <configuration>
                <source>11</source>
                <target>11</target>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.4.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>org.example.FlinkToPubSub</mainClass>
                            </transformer>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

`

@stankiewicz
Copy link

I'm curious if it's not related to #1408

@lqiu96 lqiu96 added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Nov 20, 2024
@lqiu96
Copy link
Contributor

lqiu96 commented Nov 20, 2024

@lsirac This is probably something that you are more familiar with. Potentially dealing with Workload Identity Federation.

@stankiewicz
Copy link

hi, we were able to diagnose the issue.
Root cause for it is that some libraries like flink pubsub try to serialize credentials which is not great idea but may work for GoogleCredentials.
In this case
AwsCredentials extends ExternalAccountCredentials which have transient transportFactory. After deserialization such credentials are unusable as they throw NPE when accessing transportFactory.

I haven't found a way to fix/patch transportFactory on existing ExternalAccountCredentials object.

solution for this is to use different Flink connector that is correctly setting up credentials from local ADC via provider function.

@lqiu96
Copy link
Contributor

lqiu96 commented Dec 4, 2024

Thanks for the context and explanation.

It seems like using a different Flink connector (PubSubSinkV2) is probably the suggested workaround for this unless that brings other a different set of issues.

I'm not too sure context of serializing the Credentials via Serializable or why transportFactory is marked as transient.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

3 participants