Skip to content

fix: Use OkHttp EventLister instead of ConnectionListener for idle connection monitoring #1312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
52d84fd
Reimplement idle connection monitoring using `okhttp3.EventListener`
lauzadis Jul 2, 2025
e0e9680
Add changelog
lauzadis Jul 2, 2025
57fda03
Upgrade to latest version of OkHttp
lauzadis Jul 2, 2025
8c81ac9
Changelog
lauzadis Jul 2, 2025
e75ab3b
EventListenerChain
lauzadis Jul 3, 2025
5355ea3
Create a single connection monitor, reuse it
lauzadis Jul 3, 2025
d5655eb
update comment
lauzadis Jul 3, 2025
0480652
monospace
lauzadis Jul 3, 2025
20d96f5
clenaup
lauzadis Jul 3, 2025
44aa0ff
fix build
lauzadis Jul 3, 2025
c35e62d
ktlint
lauzadis Jul 3, 2025
b575879
Simplify connection monitoring listener registration
lauzadis Jul 3, 2025
6dcb6af
add EventListenerChain tests
lauzadis Jul 3, 2025
5ca0d1c
move test class to the bottom
lauzadis Jul 3, 2025
d5a8ce7
make EventListenerChain internal
lauzadis Jul 3, 2025
4aa6b51
update cache events
lauzadis Jul 3, 2025
02a051d
remove `poolOverride`
lauzadis Jul 3, 2025
d5a4b28
remove `poolOverride` apiDump
lauzadis Jul 3, 2025
1e0dc1f
Use `connection` instead of `connId` in user-facing logging
lauzadis Jul 3, 2025
9c42e30
Make ConnectionMonitoringEventListener `internal`
lauzadis Jul 3, 2025
1f9b907
Revert "Upgrade to latest version of OkHttp"
lauzadis Jul 3, 2025
a23bef9
Revert "Changelog"
lauzadis Jul 3, 2025
ff9e771
Revert how `soTimeout` is updated
lauzadis Jul 3, 2025
d79cc34
Add missing letter
lauzadis Jul 3, 2025
d695ac5
Upgrade to OkHttp 5.1.0 and Okio 3.15.0
lauzadis Jul 8, 2025
0457ba7
Merge branch 'v1.5-main' of github.com:smithy-lang/smithy-kotlin into…
lauzadis Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .brazil.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

"com.squareup.okhttp3:okhttp-coroutines:5.*": "OkHttp3Coroutines-5.x",
"com.squareup.okhttp3:okhttp:5.*": "OkHttp3-5.x",
"com.squareup.okhttp3:okhttp-jvm:5.*": "OkHttp3-5.x",
"com.squareup.okio:okio-jvm:3.*": "OkioJvm-3.x",
"io.opentelemetry:opentelemetry-api:1.*": "Maven-io-opentelemetry_opentelemetry-api-1.x",
"io.opentelemetry:opentelemetry-extension-kotlin:1.*": "Maven-io-opentelemetry_opentelemetry-extension-kotlin-1.x",
Expand Down
8 changes: 8 additions & 0 deletions .changes/db001c20-3788-4cfe-9ec2-284fd86a80bd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "db001c20-3788-4cfe-9ec2-284fd86a80bd",
"type": "bugfix",
"description": "Reimplement idle connection monitoring using `okhttp3.EventListener` instead of now-internal `okhttp3.ConnectionListener`",
"issues": [
"https://github.com/smithy-lang/smithy-kotlin/issues/1311"
]
}
4 changes: 2 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ aws-kotlin-repo-tools-version = "0.4.31"
# libs
coroutines-version = "1.10.2"
atomicfu-version = "0.29.0"
okhttp-version = "5.0.0-alpha.14"
okhttp-version = "5.1.0"
okhttp4-version = "4.12.0"
okio-version = "3.9.1"
okio-version = "3.15.0"
otel-version = "1.45.0"
slf4j-version = "2.0.16"
slf4j-v1x-version = "1.7.36"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConf
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineKt {
public static final fun buildClient (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;)Lokhttp3/OkHttpClient;
public static final fun buildClient (Laws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpEngineConfig;Laws/smithy/kotlin/runtime/http/engine/internal/HttpClientMetrics;[Lokhttp3/EventListener;)Lokhttp3/OkHttpClient;
}

public final class aws/smithy/kotlin/runtime/http/engine/okhttp/OkHttpHeadersAdapter : aws/smithy/kotlin/runtime/http/Headers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.io.Closeable
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import kotlinx.coroutines.*
import okhttp3.Call
import okhttp3.Connection
import okhttp3.ConnectionListener
import okhttp3.ExperimentalOkHttpApi
import okhttp3.*
import okhttp3.internal.closeQuietly
import okio.IOException
import okio.buffer
Expand All @@ -22,12 +20,20 @@ import kotlin.coroutines.coroutineContext
import kotlin.time.Duration
import kotlin.time.measureTime

@OptIn(ExperimentalOkHttpApi::class)
internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionListener() {
/**
* An [okhttp3.EventListener] implementation that monitors connections for remote closure.
* This replaces the functionality previously provided by the now-internal [okhttp3.ConnectionListener].
*/
internal class ConnectionMonitoringEventListener(private val pollInterval: Duration) :
EventListener(),
Closeable {
private val monitorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val monitors = ConcurrentHashMap<Connection, Job>()

fun close(): Unit = runBlocking {
/**
* Close all active connection monitors.
*/
override fun close(): Unit = runBlocking {
val monitorJob = requireNotNull(monitorScope.coroutineContext[Job]) {
"Connection idle monitor scope cannot be cancelled because it does not have a job: $this"
}
Expand All @@ -40,13 +46,16 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
?.callContext
?: Dispatchers.IO

override fun connectionAcquired(connection: Connection, call: Call) {
// Cancel monitoring when a connection is acquired
override fun connectionAcquired(call: Call, connection: Connection) {
super.connectionAcquired(call, connection)

// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionAcquired` event and will be complete before any future `connectionReleased` event could fire for
// the same connection.
monitors.remove(connection)?.let { monitor ->
val context = call.callContext()
val logger = context.logger<ConnectionIdleMonitor>()
val logger = context.logger<ConnectionMonitoringEventListener>()
logger.trace { "Cancel monitoring for $connection" }

// Use `runBlocking` because this _must_ finish before OkHttp goes to use the connection
Expand All @@ -58,13 +67,18 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
}
}

override fun connectionReleased(connection: Connection, call: Call) {
// Start monitoring when a connection is released
override fun connectionReleased(call: Call, connection: Connection) {
super.connectionReleased(call, connection)

val connId = System.identityHashCode(connection)
val callContext = call.callContext()

// Start monitoring
val monitor = monitorScope.launch(CoroutineName("okhttp-conn-monitor-for-$connId")) {
doMonitor(connection, callContext)
}
callContext.logger<ConnectionIdleMonitor>().trace { "Launched coroutine $monitor to monitor $connection" }
callContext.logger<ConnectionMonitoringEventListener>().trace { "Launched coroutine $monitor to monitor $connection" }

// Non-locking map access is okay here because this code will only execute synchronously as part of a
// `connectionReleased` event and will be complete before any future `connectionAcquired` event could fire for
Expand All @@ -73,7 +87,7 @@ internal class ConnectionIdleMonitor(val pollInterval: Duration) : ConnectionLis
}

private suspend fun doMonitor(conn: Connection, callContext: CoroutineContext) {
val logger = callContext.logger<ConnectionIdleMonitor>()
val logger = callContext.logger<ConnectionMonitoringEventListener>()

val socket = conn.socket()
val source = try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.smithy.kotlin.runtime.http.engine.okhttp

import aws.smithy.kotlin.runtime.io.closeIfCloseable
import okhttp3.*
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.Proxy

/**
* An [okhttp3.EventListener] that delegates to a chain of EventListeners.
* Start events are sent in forward order, terminal events are sent in reverse order
*/
internal class EventListenerChain(
private val listeners: List<EventListener>,
) : EventListener() {
private val reverseListeners = listeners.reversed()

fun close() {
listeners.forEach {
it.closeIfCloseable()
}
}

override fun callStart(call: Call): Unit =
listeners.forEach { it.callStart(call) }

override fun dnsStart(call: Call, domainName: String): Unit =
listeners.forEach { it.dnsStart(call, domainName) }

override fun dnsEnd(call: Call, domainName: String, inetAddressList: List<InetAddress>): Unit =
reverseListeners.forEach { it.dnsEnd(call, domainName, inetAddressList) }

override fun proxySelectStart(call: Call, url: HttpUrl): Unit =
listeners.forEach { it.proxySelectStart(call, url) }

override fun proxySelectEnd(call: Call, url: HttpUrl, proxies: List<Proxy>): Unit =
reverseListeners.forEach { it.proxySelectEnd(call, url, proxies) }

override fun connectStart(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy): Unit =
listeners.forEach { it.connectStart(call, inetSocketAddress, proxy) }

override fun secureConnectStart(call: Call): Unit =
listeners.forEach { it.secureConnectStart(call) }

override fun secureConnectEnd(call: Call, handshake: Handshake?): Unit =
reverseListeners.forEach { it.secureConnectEnd(call, handshake) }

override fun connectEnd(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy, protocol: Protocol?): Unit =
reverseListeners.forEach { it.connectEnd(call, inetSocketAddress, proxy, protocol) }

override fun connectFailed(call: Call, inetSocketAddress: InetSocketAddress, proxy: Proxy, protocol: Protocol?, ioe: IOException): Unit =
reverseListeners.forEach { it.connectFailed(call, inetSocketAddress, proxy, protocol, ioe) }

override fun connectionAcquired(call: Call, connection: Connection): Unit =
listeners.forEach { it.connectionAcquired(call, connection) }

override fun connectionReleased(call: Call, connection: Connection): Unit =
reverseListeners.forEach { it.connectionReleased(call, connection) }

override fun requestHeadersStart(call: Call): Unit =
listeners.forEach { it.requestHeadersStart(call) }

override fun requestHeadersEnd(call: Call, request: Request): Unit =
reverseListeners.forEach { it.requestHeadersEnd(call, request) }

override fun requestBodyStart(call: Call): Unit =
listeners.forEach { it.requestBodyStart(call) }

override fun requestBodyEnd(call: Call, byteCount: Long): Unit =
reverseListeners.forEach { it.requestBodyEnd(call, byteCount) }

override fun requestFailed(call: Call, ioe: IOException): Unit =
reverseListeners.forEach { it.requestFailed(call, ioe) }

override fun responseHeadersStart(call: Call): Unit =
listeners.forEach { it.responseHeadersStart(call) }

override fun responseHeadersEnd(call: Call, response: Response): Unit =
reverseListeners.forEach { it.responseHeadersEnd(call, response) }

override fun responseBodyStart(call: Call): Unit =
listeners.forEach { it.responseBodyStart(call) }

override fun responseBodyEnd(call: Call, byteCount: Long): Unit =
reverseListeners.forEach { it.responseBodyEnd(call, byteCount) }

override fun responseFailed(call: Call, ioe: IOException): Unit =
reverseListeners.forEach { it.responseFailed(call, ioe) }

override fun callEnd(call: Call): Unit =
reverseListeners.forEach { it.callEnd(call) }

override fun callFailed(call: Call, ioe: IOException): Unit =
reverseListeners.forEach { it.callFailed(call, ioe) }

override fun canceled(call: Call): Unit =
reverseListeners.forEach { it.canceled(call) }

override fun satisfactionFailure(call: Call, response: Response): Unit =
reverseListeners.forEach { it.satisfactionFailure(call, response) }

override fun cacheConditionalHit(call: Call, cachedResponse: Response): Unit =
listeners.forEach { it.cacheConditionalHit(call, cachedResponse) }

override fun cacheHit(call: Call, response: Response): Unit =
listeners.forEach { it.cacheHit(call, response) }

override fun cacheMiss(call: Call): Unit =
listeners.forEach { it.cacheMiss(call) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory
import aws.smithy.kotlin.runtime.http.engine.*
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.closeIfCloseable
import aws.smithy.kotlin.runtime.net.TlsVersion
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.time.Instant
Expand Down Expand Up @@ -44,9 +45,14 @@ public class OkHttpEngine(
override val engineConstructor: (OkHttpEngineConfig.Builder.() -> Unit) -> OkHttpEngine = ::invoke
}

// Create a single shared connection monitoring listener if idle polling is enabled
private val connectionMonitoringListener: EventListener? =
config.connectionIdlePollingInterval?.let {
ConnectionMonitoringEventListener(it)
}

private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider)
private val connectionIdleMonitor = config.connectionIdlePollingInterval?.let { ConnectionIdleMonitor(it) }
private val client = config.buildClientWithConnectionListener(metrics, connectionIdleMonitor)
private val client = config.buildClient(metrics, connectionMonitoringListener)

@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
Expand All @@ -73,16 +79,20 @@ public class OkHttpEngine(
}

override fun shutdown() {
connectionIdleMonitor?.close()
connectionMonitoringListener?.closeIfCloseable()
client.connectionPool.evictAll()
client.dispatcher.executorService.shutdown()
metrics.close()
}
}

private fun OkHttpEngineConfig.buildClientFromConfig(
/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
@InternalApi
public fun OkHttpEngineConfig.buildClient(
metrics: HttpClientMetrics,
poolOverride: ConnectionPool? = null,
vararg clientScopedEventListeners: EventListener?,
): OkHttpClient {
val config = this

Expand All @@ -102,7 +112,7 @@ private fun OkHttpEngineConfig.buildClientFromConfig(
writeTimeout(config.socketWriteTimeout.toJavaDuration())

// use our own pool configured with the timeout settings taken from config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This comment isn't accurate anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. We are creating our own pool using timeout settings taken from config

val pool = poolOverride ?: ConnectionPool(
val pool = ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = config.connectionIdleTimeout.inWholeMilliseconds,
TimeUnit.MILLISECONDS,
Expand All @@ -116,7 +126,14 @@ private fun OkHttpEngineConfig.buildClientFromConfig(
dispatcher(dispatcher)

// Log events coming from okhttp. Allocate a new listener per-call to facilitate dedicated trace spans.
eventListenerFactory { call -> HttpEngineEventListener(pool, config.hostResolver, dispatcher, metrics, call) }
eventListenerFactory { call ->
EventListenerChain(
listOfNotNull(
HttpEngineEventListener(pool, config.hostResolver, dispatcher, metrics, call),
*clientScopedEventListeners,
),
)
}

// map protocols
if (config.tlsContext.alpn.isNotEmpty()) {
Expand All @@ -140,34 +157,6 @@ private fun OkHttpEngineConfig.buildClientFromConfig(
}.build()
}

/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
// Used by OkHttp4Engine - OkHttp4 does NOT have `connectionListener`
// TODO - Refactor in next minor version - Move this to OkHttp4Engine and make it private
@InternalApi
public fun OkHttpEngineConfig.buildClient(
metrics: HttpClientMetrics,
): OkHttpClient = this.buildClientFromConfig(metrics)

/**
* Convert SDK version of HTTP configuration to OkHttp specific configuration and return the configured client
*/
// Used by OkHttpEngine - OkHttp5 does have `connectionListener`
@OptIn(ExperimentalOkHttpApi::class)
private fun OkHttpEngineConfig.buildClientWithConnectionListener(
metrics: HttpClientMetrics,
connectionListener: ConnectionIdleMonitor?,
): OkHttpClient = this.buildClientFromConfig(
metrics,
ConnectionPool(
maxIdleConnections = 5, // The default from the no-arg ConnectionPool() constructor
keepAliveDuration = this.connectionIdleTimeout.inWholeMilliseconds,
timeUnit = TimeUnit.MILLISECONDS,
connectionListener = connectionListener ?: ConnectionListener.NONE,
),
)

private fun minTlsConnectionSpec(tlsContext: TlsContext): ConnectionSpec {
val minVersion = tlsContext.minVersion ?: TlsVersion.TLS_1_2
val okHttpTlsVersions = SdkTlsVersion
Expand Down
Loading
Loading