Skip to content

feat: emit metrics from CRT HTTP engine #1017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/e7c3c7ab-749e-4371-8b25-42ea76aa870d.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "e7c3c7ab-749e-4371-8b25-42ea76aa870d",
"type": "feature",
"description": "Emit metrics from CRT HTTP engine",
"issues": [
"https://github.com/awslabs/smithy-kotlin/issues/893"
]
}
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ okio-version = "3.6.0"
otel-version = "1.32.0"
slf4j-version = "2.0.9"
slf4j-v1x-version = "1.7.36"
crt-kotlin-version = "0.8.2"
crt-kotlin-version = "0.8.4"

# codegen
smithy-version = "1.42.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import aws.smithy.kotlin.runtime.crt.SdkDefaultIO
import aws.smithy.kotlin.runtime.http.HttpErrorCode
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.ProxyConfig
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.Closeable
import aws.smithy.kotlin.runtime.net.TlsVersion
import aws.smithy.kotlin.runtime.telemetry.metrics.measureSeconds
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
Expand All @@ -27,6 +29,7 @@ import aws.smithy.kotlin.runtime.net.TlsVersion as SdkTlsVersion

internal class ConnectionManager(
private val config: CrtHttpEngineConfig,
private val metrics: HttpClientMetrics,
) : Closeable {
private val leases = Semaphore(config.maxConnections.toInt())

Expand Down Expand Up @@ -67,7 +70,9 @@ internal class ConnectionManager(
// get a permit to acquire a connection (limits overall connections since managers are per/host)
leases.acquire()
leaseAcquired = true
manager.acquireConnection()
metrics.connectionAcquireDuration.measureSeconds {
manager.acquireConnection()
}
}

LeasedConnection(conn)
Expand All @@ -82,8 +87,21 @@ internal class ConnectionManager(
}

throw httpEx
} finally {
emitMetrics()
}
}

private fun emitMetrics() {
val (acquiredConnections, idleConnections) = connManagers
.values
.map { it.managerMetrics }
.fold(0L to 0L) { (a, i), m -> a + m.leasedConcurrency to i + m.availableConcurrency }

metrics.acquiredConnections = acquiredConnections
metrics.idleConnections = idleConnections
}

private suspend fun getManagerForUri(uri: Uri, proxyConfig: ProxyConfig): HttpClientConnectionManager = mutex.withLock {
connManagers.getOrPut(uri.authority) {
val connOpts = options.apply {
Expand All @@ -105,6 +123,7 @@ internal class ConnectionManager(
HttpClientConnectionManager(connOpts)
}
}

override fun close() {
connManagers.forEach { entry -> entry.value.close() }
crtTlsContext.close()
Expand All @@ -117,6 +136,7 @@ internal class ConnectionManager(
delegate.close()
} finally {
leases.release()
emitMetrics()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@ import aws.smithy.kotlin.runtime.http.config.EngineFactory
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngine
import aws.smithy.kotlin.runtime.http.engine.HttpClientEngineBase
import aws.smithy.kotlin.runtime.http.engine.callContext
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.internal.SdkDispatchers
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds
import aws.smithy.kotlin.runtime.time.Instant
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlin.time.TimeSource

internal const val DEFAULT_WINDOW_SIZE_BYTES: Int = 16 * 1024
internal const val CHUNK_BUFFER_SIZE: Long = 64 * 1024

private const val TELEMETRY_SCOPE = "aws.smithy.kotlin.runtime.http.engine.crt"

/**
* [HttpClientEngine] based on the AWS Common Runtime HTTP client
*/
Expand Down Expand Up @@ -51,43 +56,54 @@ public class CrtHttpEngine(public override val config: CrtHttpEngineConfig) : Ht
// }

private val requestLimiter = Semaphore(config.maxConcurrency.toInt())
private val connectionManager = ConnectionManager(config)

override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall = requestLimiter.withPermit {
val callContext = callContext()
val logger = callContext.logger<CrtHttpEngine>()

// LIFETIME: connection will be released back to the pool/manager when
// the response completes OR on exception (both handled by the completion handler registered on the stream
// handler)
val conn = connectionManager.acquire(request)
logger.trace { "Acquired connection ${conn.id}" }

val respHandler = SdkStreamResponseHandler(conn, callContext)
callContext.job.invokeOnCompletion {
logger.trace { "completing handler; cause=$it" }
// ensures the stream is driven to completion regardless of what the downstream consumer does
respHandler.complete()
}
private val metrics = HttpClientMetrics(TELEMETRY_SCOPE, config.telemetryProvider).apply {
connectionsLimit = config.maxConnections.toLong()
requestConcurrencyLimit = config.maxConcurrency.toLong()
}
private val connectionManager = ConnectionManager(config, metrics)

override suspend fun roundTrip(context: ExecutionContext, request: HttpRequest): HttpCall {
metrics.incrementQueuedRequests()
val enqueued = TimeSource.Monotonic.markNow()
return requestLimiter.withPermit {
metrics.requestsQueuedDuration.recordSeconds(enqueued.elapsedNow())
metrics.decrementQueuedRequests()

val callContext = callContext()
val logger = callContext.logger<CrtHttpEngine>()

// LIFETIME: connection will be released back to the pool/manager when
// the response completes OR on exception (both handled by the completion handler registered on the stream
// handler)
val conn = connectionManager.acquire(request)
logger.trace { "Acquired connection ${conn.id}" }

val respHandler = SdkStreamResponseHandler(conn, callContext, context, metrics)
callContext.job.invokeOnCompletion {
logger.trace { "completing handler; cause=$it" }
// ensures the stream is driven to completion regardless of what the downstream consumer does
respHandler.complete()
}

val reqTime = Instant.now()
val engineRequest = request.toCrtRequest(callContext)
val reqTime = Instant.now()
val engineRequest = request.toCrtRequest(callContext, metrics)

val stream = mapCrtException {
conn.makeRequest(engineRequest, respHandler).also { stream ->
stream.activate()
val stream = mapCrtException {
conn.makeRequest(engineRequest, respHandler).also { stream ->
stream.activate()
}
}
}

if (request.isChunked) {
withContext(SdkDispatchers.IO) {
stream.sendChunkedBody(request.body)
if (request.isChunked) {
withContext(SdkDispatchers.IO) {
stream.sendChunkedBody(request.body, metrics)
}
}
}

val resp = respHandler.waitForResponse()
val resp = respHandler.waitForResponse()

return HttpCall(request, resp, reqTime, Instant.now(), callContext)
HttpCall(request, resp, reqTime, Instant.now(), callContext)
}
}

override fun shutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package aws.smithy.kotlin.runtime.http.engine.crt
import aws.sdk.kotlin.crt.CRT
import aws.sdk.kotlin.crt.CrtRuntimeException
import aws.sdk.kotlin.crt.http.HeadersBuilder
import aws.sdk.kotlin.crt.http.HttpRequestBodyStream
import aws.sdk.kotlin.crt.http.HttpStream
import aws.sdk.kotlin.crt.io.Protocol
import aws.sdk.kotlin.crt.io.Uri
Expand All @@ -18,10 +17,13 @@ import aws.smithy.kotlin.runtime.crt.SdkSourceBodyStream
import aws.smithy.kotlin.runtime.http.HttpBody
import aws.smithy.kotlin.runtime.http.HttpErrorCode
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.request.HttpRequest
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.buffer
import aws.smithy.kotlin.runtime.io.readToByteArray
import aws.smithy.kotlin.runtime.io.source
import kotlinx.coroutines.job
import kotlin.coroutines.CoroutineContext

Expand All @@ -42,18 +44,27 @@ internal val HttpRequest.uri: Uri
}
}

internal fun HttpRequest.toCrtRequest(callContext: CoroutineContext): aws.sdk.kotlin.crt.http.HttpRequest {
internal fun HttpRequest.toCrtRequest(
callContext: CoroutineContext,
metrics: HttpClientMetrics,
): aws.sdk.kotlin.crt.http.HttpRequest {
val body = this.body
check(!body.isDuplex) { "CrtHttpEngine does not yet support full duplex streams" }
val bodyStream = if (isChunked) {
null
} else {
when (body) {
is HttpBody.Empty -> null
is HttpBody.Bytes -> HttpRequestBodyStream.fromByteArray(body.bytes())
is HttpBody.ChannelContent -> ReadChannelBodyStream(body.readFrom(), callContext)
is HttpBody.Bytes -> {
val source = body.bytes().source().reportingTo(metrics.bytesSent)
SdkSourceBodyStream(source)
}
is HttpBody.ChannelContent -> {
val source = body.readFrom().reportingTo(metrics.bytesSent)
ReadChannelBodyStream(source, callContext)
}
is HttpBody.SourceContent -> {
val source = body.readFrom()
val source = body.readFrom().reportingTo(metrics.bytesSent)
callContext.job.invokeOnCompletion {
source.close()
}
Expand Down Expand Up @@ -85,10 +96,10 @@ internal val HttpRequest.isChunked: Boolean get() = (this.body is HttpBody.Sourc
* Send a chunked body using the CRT writeChunk bindings.
* @param body an HTTP body that has a chunked content encoding. Must be [HttpBody.SourceContent] or [HttpBody.ChannelContent]
*/
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
internal suspend fun HttpStream.sendChunkedBody(body: HttpBody, metrics: HttpClientMetrics) {
when (body) {
is HttpBody.SourceContent -> {
val source = body.readFrom()
val source = body.readFrom().reportingTo(metrics.bytesSent)
val bufferedSource = source.buffer()

while (!bufferedSource.exhausted()) {
Expand All @@ -97,7 +108,7 @@ internal suspend fun HttpStream.sendChunkedBody(body: HttpBody) {
}
}
is HttpBody.ChannelContent -> {
val chan = body.readFrom()
val chan = body.readFrom().reportingTo(metrics.bytesSent)
var buffer = SdkBuffer()
val nextBuffer = SdkBuffer()
var sentFirstChunk = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import aws.sdk.kotlin.crt.io.Buffer
import aws.smithy.kotlin.runtime.http.*
import aws.smithy.kotlin.runtime.http.HeadersBuilder
import aws.smithy.kotlin.runtime.http.HttpException
import aws.smithy.kotlin.runtime.http.engine.EngineAttributes
import aws.smithy.kotlin.runtime.http.engine.crt.io.reportingTo
import aws.smithy.kotlin.runtime.http.engine.internal.HttpClientMetrics
import aws.smithy.kotlin.runtime.http.response.HttpResponse
import aws.smithy.kotlin.runtime.http.response.copy
import aws.smithy.kotlin.runtime.io.SdkBuffer
import aws.smithy.kotlin.runtime.io.SdkByteChannel
import aws.smithy.kotlin.runtime.io.SdkByteReadChannel
import aws.smithy.kotlin.runtime.io.source
import aws.smithy.kotlin.runtime.operation.ExecutionContext
import aws.smithy.kotlin.runtime.telemetry.logging.logger
import aws.smithy.kotlin.runtime.telemetry.metrics.recordSeconds
import aws.smithy.kotlin.runtime.time.Instant
import aws.smithy.kotlin.runtime.time.fromEpochNanoseconds
import aws.smithy.kotlin.runtime.util.derivedName
import kotlinx.atomicfu.locks.reentrantLock
import kotlinx.atomicfu.locks.withLock
import kotlinx.coroutines.*
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -30,6 +41,8 @@ import kotlin.coroutines.CoroutineContext
internal class SdkStreamResponseHandler(
private val conn: HttpClientConnection,
private val callContext: CoroutineContext,
private val execContext: ExecutionContext,
private val clientMetrics: HttpClientMetrics,
) : HttpStreamResponseHandler {
// TODO - need to cancel the stream when the body is closed from the caller side early.
// There is no great way to do that currently without either (1) closing the connection or (2) throwing an
Expand Down Expand Up @@ -57,6 +70,10 @@ internal class SdkStreamResponseHandler(

private var streamCompleted = false

init {
clientMetrics.incrementInflightRequests()
}

/**
* Called by the response read channel as data is consumed
* @param size the number of bytes consumed
Expand Down Expand Up @@ -184,7 +201,30 @@ internal class SdkStreamResponseHandler(
}

internal suspend fun waitForResponse(): HttpResponse =
responseReady.receive()
responseReady.receive().wrapBody()

private fun HttpResponse.wrapBody(): HttpResponse {
val wrappedBody = when (val originalBody = body) {
is HttpBody.Empty -> return this // Don't need an object copy since we're not wrapping the body
is HttpBody.Bytes ->
originalBody
.bytes()
.source()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
is HttpBody.SourceContent ->
originalBody
.readFrom()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
is HttpBody.ChannelContent ->
originalBody
.readFrom()
.reportingTo(clientMetrics.bytesReceived)
.toHttpBody(originalBody.contentLength)
}
return copy(body = wrappedBody)
}

/**
* Invoked only after the consumer is finished with the response and it is safe to cleanup resources
Expand All @@ -197,6 +237,8 @@ internal class SdkStreamResponseHandler(
// and more data is pending arrival). It can also happen if the coroutine for this request is cancelled
// before onResponseComplete fires.
lock.withLock {
clientMetrics.decrementInflightRequests()

val forceClose = !streamCompleted

if (forceClose) {
Expand All @@ -210,4 +252,19 @@ internal class SdkStreamResponseHandler(
conn.close()
}
}

override fun onMetrics(stream: HttpStream, metrics: HttpStreamMetrics) {
val sendEnd = positiveInstantOrNull(metrics.sendEndTimestampNs)
val receiveStart = positiveInstantOrNull(metrics.receiveStartTimestampNs)

if (sendEnd != null && receiveStart != null) {
val ttfb = receiveStart - sendEnd
if (ttfb.isPositive()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: have you seen any instances where this is negative?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly but event streams may begin receiving traffic before sending has concluded since the communication is bidirectional and duplexed. It seemed prudent in that situation to not report TTFB.

clientMetrics.timeToFirstByteDuration.recordSeconds(ttfb)
execContext[EngineAttributes.TimeToFirstByte] = ttfb
}
}
}
}

private fun positiveInstantOrNull(ns: Long): Instant? = if (ns > 0) Instant.fromEpochNanoseconds(ns) else null
Loading