Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing OpenTelemetry Context #5455

Open
vromaniks opened this issue Jan 21, 2025 · 0 comments
Open

Missing OpenTelemetry Context #5455

vromaniks opened this issue Jan 21, 2025 · 0 comments
Labels

Comments

@vromaniks
Copy link

Version

4.5.11

Context

We notice that the log messages from the Postgres ConnectionFactory do not include the correct opentelemetry trace_id and span_id parameters. Please see the reproducer below.

Do you have a reproducer?

package reproducer

import io.opentelemetry.api.common.Attributes
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.api.trace.StatusCode
import io.opentelemetry.api.trace.Tracer
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator
import io.opentelemetry.context.ContextStorage
import io.opentelemetry.context.Scope
import io.opentelemetry.context.propagation.ContextPropagators
import io.opentelemetry.sdk.OpenTelemetrySdk
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor
import io.vertx.core.Context
import io.vertx.core.Future
import io.vertx.core.Promise
import io.vertx.core.Vertx
import io.vertx.core.impl.ContextInternal
import io.vertx.kotlin.coroutines.dispatcher
import io.vertx.pgclient.PgConnectOptions
import io.vertx.pgclient.PgConnection
import io.vertx.pgclient.impl.PgConnectionFactory
import io.vertx.pgclient.spi.PgDriver
import io.vertx.sqlclient.ClientBuilder
import io.vertx.sqlclient.Pool
import io.vertx.sqlclient.Row
import io.vertx.sqlclient.RowSet
import io.vertx.sqlclient.SqlConnectOptions
import io.vertx.sqlclient.SqlConnection
import io.vertx.sqlclient.spi.ConnectionFactory
import io.vertx.tracing.opentelemetry.OpenTelemetryTracingFactory
import io.vertx.tracing.opentelemetry.VertxContextStorageProvider
import java.net.ConnectException
import java.util.function.Supplier
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows

class PgClientLoggerTest() {
    val spanExporter: InMemorySpanExporter = InMemorySpanExporter.create()
    val tracerProvider: SdkTracerProvider =
        SdkTracerProvider.builder().addSpanProcessor(SimpleSpanProcessor.create(spanExporter)).build()
    val openTelemetry: OpenTelemetrySdk =
        OpenTelemetrySdk.builder()
            .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
            .setTracerProvider(tracerProvider)
            .build()
    val tracer: Tracer = openTelemetry.tracerProvider.get("extended-tracer")
    val vertx: Vertx = Vertx.builder().withTracer(OpenTelemetryTracingFactory(openTelemetry)).build()

    @Test
    fun `invalid password test`(): Unit = runBlocking {
        val hostOptions = PgConnectOptions().apply {
            user = "user"
            password = "pass"
            host = "localhost"
            port = 5512
            database = "db"
        }

        val driver = object : PgDriver() {
            override fun createConnectionFactory(
                vertx: Vertx,
                ignored: Supplier<out Future<out SqlConnectOptions>>?,
            ): ConnectionFactory {
                val delegate = super.createConnectionFactory(vertx, ignored) as PgConnectionFactory
                return PgConnectionFactory(delegate, hostOptions)
            }
        }
        trace(tracer, "test", SpanKind.INTERNAL) {
            val pool = ClientBuilder.pool(driver)
                .using(vertx)
                .build()
            val pgClient = PgClient(vertx, pool, tracer)

            assertThrows<ConnectException> {
                pgClient.withConnection<RowSet<Row>> { conn -> conn.query("SELECT * FROM table").execute() }
            }
        }
    }
}

fun validateTraceContext(message: String) {
    var traceId: String? = null
    var spanId: String? = null
    var currContext = Vertx.currentContext()
    if (currContext!= null) {
        val otelContext = vertxContextProvider.get().current()
        require(otelContext!= null) { "Missing OpenTelemetry context: [$message]" }

        var spanContext = Span.fromContext(otelContext).spanContext
        traceId = spanContext.traceId
        spanId = spanContext.spanId
    }
    println("traceId: $traceId, spanId: $spanId, message: $message")
}

class PgClient(private val vertx: Vertx, private val pool: Pool, private val tracer: Tracer) {
    suspend fun <T> withConnection(block: (PgConnection) -> Future<T>): T =
        spanOnVertx(tracer, vertx.getOrCreateContext(), "SQL connection", SpanKind.CLIENT) {
            pool.connection
                .compose { conn ->
                    conn as PgConnection
                    block(conn).onComplete {
                        conn.close()
                    }
                }
                .onFailure { e ->
                    validateTraceContext("Error during database request")
                }
        }
}

class PgConnectionFactory(
    private val delegate: PgConnectionFactory,
    private val options: PgConnectOptions
) : ConnectionFactory {

    override fun connect(context: Context): Future<SqlConnection> = Future.future { promise ->
        validateTraceContext("Connecting to a new Postgres host...")
        delegate
            .connect(context, options)
            .onFailure { cause ->
                try {
                    validateTraceContext("connection error: ${cause.cause}")
                    promise.fail(cause)
                } catch (e: Throwable) {
                    promise.fail(e)
                }
            }
    }

    override fun close(completion: Promise<Void>) = delegate.close(completion)
    override fun connect(context: Context, options: SqlConnectOptions): Future<SqlConnection> = TODO()
}

// -------------------------------------------------
val vertxContextProvider = VertxContextStorageProvider()

val CoroutineContext.tracingContext: io.opentelemetry.context.Context
    get() = get(OpenTelemetryContextElement)?.context ?: error("Tracing context is missing from coroutine context")

internal val coroutinesOtelContextStorage: ContextStorage = ContextStorage.defaultStorage()

class OpenTelemetryContextElement(val context: io.opentelemetry.context.Context) :
    AbstractCoroutineContextElement(OpenTelemetryContextElement), ThreadContextElement<Scope> {
    // use thread-local storage to avoid vertx storage
    override fun updateThreadContext(context: CoroutineContext): Scope =
        coroutinesOtelContextStorage.attach(this.context)

    override fun restoreThreadContext(context: CoroutineContext, oldState: Scope) {
        oldState.close()
    }

    companion object Key : CoroutineContext.Key<OpenTelemetryContextElement>
}

/** Creates a new trace (no parent) trace */
suspend fun <T> trace(
    tracer: Tracer,
    name: String,
    kind: SpanKind,
    attrs: Attributes = Attributes.empty(),
    block: suspend CoroutineScope.() -> T,
): T {
    val span = tracer.spanBuilder(name).setSpanKind(kind).setNoParent().setAllAttributes(attrs).startSpan()
    val context = OpenTelemetryContextElement(coroutinesOtelContextStorage.root().with(span))

    return withContext(context) {
        try {
            val result = block()
            span.setStatus(StatusCode.OK)
            result
        } catch (expected: Throwable) {
            span.setStatus(StatusCode.ERROR)
            span.recordException(expected)
            throw expected
        } finally {
            span.end()
        }
    }
}

/** Creates a new span (requires parent) trace and switches to Vert.x dispatcher */
suspend fun <T> spanOnVertx(
    tracer: Tracer,
    context: Context,
    name: String,
    kind: SpanKind,
    attrs: Attributes = Attributes.empty(),
    block: () -> Future<T>,
): T {
    // extract context from the coroutine
    val tracingContext = coroutineContext.tracingContext
    val vertxContext = context

    val span = tracer.spanBuilder(name).setSpanKind(kind).setParent(tracingContext).setAllAttributes(attrs).startSpan()

    // create a new span and put it in the context
    val newContext = tracingContext.with(span)
    val duplicatedContext = (vertxContext as ContextInternal).duplicate()

    // use withContext to bridge to Vert.x dispatcher
    return withContext(duplicatedContext.dispatcher()) {
        // register new context on Vert.x thread
        val contextStorage = vertxContextProvider.get()
        val scope = contextStorage.attach(newContext)

        // run async operation and close scope once the operation is completed
        scope.use { spanOnVertx(span, block).await() }
    }
}

private fun <T> spanOnVertx(
    span: Span,
    block: () -> Future<T>,
): Deferred<T> {
    val deferred = CompletableDeferred<T>()
    val future = block()
    future.onComplete { asyncResult ->
        if (asyncResult.succeeded()) {
            val result: T = asyncResult.result()
            span.setStatus(StatusCode.OK)

            deferred.complete(result)
        } else {
            val cause: Throwable = asyncResult.cause()
            span.setStatus(StatusCode.ERROR)
            span.recordException(cause)

            deferred.completeExceptionally(cause)
        }

        span.end()
    }

    return deferred
}

Extra

Oracle JDK23
Kotlin 2.1.0

@vromaniks vromaniks added the bug label Jan 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant