Skip to content

Extract a shared instrumentation emitter from the sync and async steps #26

Description

@OmarAlJarrah

Problem

The sync and async instrumentation steps duplicate ~200 lines of emit/redact/preview/metrics logic (DefaultInstrumentationStep.kt, DefaultAsyncInstrumentationStep.kt); there's a standing TODO to dedupe.

Proposed change

Extract a shared InstrumentationEmitters used by both steps. Preserve the non-truncating over-cap live-tail behaviour (PrefixThenTailSource). Natural home for the charset-aware preview work.

Acceptance

  • Single shared emitter; both steps delegate
  • Over-cap tail behaviour unchanged (test still green)

Dependencies


Priority: medium · Effort: medium


Proposed implementation

A concrete, behavior-preserving shape for the extraction — the current duplicated surface and the proposed shared InstrumentationEmitter(options, clock, logger). The single sync/async difference (whether an unknown-length response body is skipped to avoid blocking a completion thread) becomes a skipUnknownLength parameter on wrapResponseForLogging, which is also the natural seam for the sync-step fix tracked in #107.

sdk-core/src/main/kotlin/org/dexpace/sdk/core/http/pipeline/steps/DefaultAsyncInstrumentationStep.kt:78-91,235-442 — extract a shared InstrumentationEmitter

DefaultAsyncInstrumentationStep and DefaultInstrumentationStep carry an identical set
of metric instruments and emit / redact / header / metric helpers. The async file even
flags it in a TODO. Pull the shared surface into one internal class InstrumentationEmitter(options, clock, logger) that both steps own and delegate to. The
single behavioural difference — the async step skips draining an unknown-length response
body so it never blocks a completion thread — becomes a skipUnknownLength: Boolean
parameter on wrapResponseForLogging.

Old code:

        // lines 78-91 — the two lazily-registered metric instruments (identical in both steps)
        private val requestCounter: LongCounter by lazy(LazyThreadSafetyMode.PUBLICATION) {
            options.meter.counter(
                name = "http.client.request.count",
                description = "Total HTTP requests sent through this pipeline.",
                unit = "{request}",
            )
        }
        private val latencyHistogram: DoubleHistogram by lazy(LazyThreadSafetyMode.PUBLICATION) {
            options.meter.histogram(
                name = "http.client.request.duration",
                description = "End-to-end duration of the downstream pipeline per request.",
                unit = "ms",
            )
        }

        // lines 235-442 — the duplicated helper block + companion
        // ===== Helpers duplicated from DefaultInstrumentationStep =====
        // Duplication is intentional for this commit.
        // TODO(omar 2026-08-01): extract InstrumentationEmitters shared helper used by both DefaultInstrumentationStep and DefaultAsyncInstrumentationStep

        private fun shouldCaptureBody(): Boolean = options.logLevel == HttpLogLevel.BODY_AND_HEADERS

        private fun wrapResponseForLogging(response: Response): Response {
            val responseBody = response.body
            if (!shouldCaptureBody() || responseBody == null) return response
            // The bounded drain below runs on the future-completion thread. For an unknown-length
            // (streaming) body the read could block on a slow/idle producer and stall the
            // completion thread, so we skip body capture entirely for contentLength() < 0 — the
            // body streams to the caller unwrapped with no preview. Known-length bodies are safe
            // to drain up to the bounded preview size.
            if (responseBody.contentLength() < 0L) return response
            // Bound the in-memory capture to the preview size. The full body still streams to the
            // caller via the wrapper's live tail; only a preview prefix is buffered.
            val wrapped = LoggableResponseBody.bounded(responseBody, Io.provider, options.bodyPreviewMaxBytes.toLong())
            // Force drain so we have bytes to log. Done here (not in the event emit) so a logging
            // failure can't mask the drain error from the caller — they still get the wrapped body
            // with the cached exception surfaced on source().
            try {
                wrapped.snapshot(options.bodyPreviewMaxBytes)
            } catch (t: Throwable) {
                // Drain itself records the exception on the wrapper; capture for emit, but don't
                // let it propagate — the caller will see it on next source() call.
                logger.atWarning()
                    .event("http.instrumentation.response_drain_failed")
                    .field("error.type", t.javaClass.simpleName ?: "Throwable")
                    .cause(t)
                    .log()
            }
            return response.newBuilder().body(wrapped).build()
        }

        private fun emitRequestEvent(
            request: Request,
            redactedUrl: String,
        ) {
            if (options.logLevel == HttpLogLevel.NONE) return
            try {
                val ev =
                    logger.atInfo()
                        .event("http.request")
                        .field("http.request.method", request.method.name)
                        .field("url.full", redactedUrl)
                        .field("request.content.length", request.body?.contentLength() ?: -1L)
                appendHeadersFields(ev, request.headers, prefix = "http.request.header.")
                ev.log()
            } catch (t: Throwable) {
                emitInstrumentationError("http.instrumentation.emit_request_failed", "request_event", t)
            }
        }

        // emitResponseEvent / emitFailureEvent / appendHeadersFields / joinHeaderValues /
        // safeRedact / spanAttributes / recordMetrics / elapsedMillis / emitInstrumentationError
        // and the NANOS_PER_MILLI_DOUBLE companion follow here, all byte-for-byte identical to
        // the same helpers in DefaultInstrumentationStep.kt — shown in full in the New emitter below.

DefaultInstrumentationStep.kt holds the same instruments at lines 78-91 and the same
helpers at lines 149-345 (its wrapResponseForLogging is the only one that differs — it
omits the contentLength() < 0 early-return).

New code:

/*
 * Copyright (c) 2026 dexpace and Omar Aljarrah
 *
 * Licensed under the MIT License. See LICENSE in the project root.
 * SPDX-License-Identifier: MIT
 */

package org.dexpace.sdk.core.http.pipeline.steps

import org.dexpace.sdk.core.http.common.Headers
import org.dexpace.sdk.core.http.common.HttpHeaderName
import org.dexpace.sdk.core.http.request.LoggableRequestBody
import org.dexpace.sdk.core.http.request.Request
import org.dexpace.sdk.core.http.response.LoggableResponseBody
import org.dexpace.sdk.core.http.response.Response
import org.dexpace.sdk.core.instrumentation.ClientLogger
import org.dexpace.sdk.core.instrumentation.LoggingEvent
import org.dexpace.sdk.core.instrumentation.UrlRedactor
import org.dexpace.sdk.core.instrumentation.metrics.DoubleHistogram
import org.dexpace.sdk.core.instrumentation.metrics.LongCounter
import org.dexpace.sdk.core.io.Io
import org.dexpace.sdk.core.util.Clock

/**
 * Shared instrumentation emitter for [DefaultInstrumentationStep] and
 * [DefaultAsyncInstrumentationStep]. Owns the request-count / latency instruments and every
 * `http.request` / `http.response` emit, header-redaction, metric-recording, and URL-redaction
 * helper the two steps previously duplicated byte-for-byte. Each step constructs one emitter and
 * delegates; the sole sync/async difference — whether an unknown-length response body is skipped
 * to avoid blocking a completion thread — is the [skipUnknownLength] parameter on
 * [wrapResponseForLogging].
 *
 * Stateless after construction (the [ClientLogger], counter, and histogram are reused). Safe to
 * share across concurrent requests.
 */
internal class InstrumentationEmitter(
    private val options: HttpInstrumentationOptions,
    private val clock: Clock,
    private val logger: ClientLogger,
) {
    // Lazily constructed so a step instance never installed in a pipeline doesn't pay
    // the counter/histogram registration cost. `PUBLICATION` mode avoids the synchronized
    // block that the default `SYNCHRONIZED` mode uses for first-read coordination — that
    // monitor would pin a virtual-thread carrier during init (see CLAUDE.md "ReentrantLock
    // over synchronized"). With OTel meter implementations being idempotent on register-by-
    // name, racing initializers cost at most one redundant registration that is discarded.
    private val requestCounter: LongCounter by lazy(LazyThreadSafetyMode.PUBLICATION) {
        options.meter.counter(
            name = "http.client.request.count",
            description = "Total HTTP requests sent through this pipeline.",
            unit = "{request}",
        )
    }
    private val latencyHistogram: DoubleHistogram by lazy(LazyThreadSafetyMode.PUBLICATION) {
        options.meter.histogram(
            name = "http.client.request.duration",
            description = "End-to-end duration of the downstream pipeline per request.",
            unit = "ms",
        )
    }

    internal fun shouldCaptureBody(): Boolean = options.logLevel == HttpLogLevel.BODY_AND_HEADERS

    /**
     * Wraps [response]'s body for preview capture. When [skipUnknownLength] is true an
     * unknown-length (`contentLength() < 0`) body is left unwrapped — the async step passes true
     * so a streaming body never blocks the future-completion thread; the sync step passes false.
     */
    internal fun wrapResponseForLogging(
        response: Response,
        skipUnknownLength: Boolean,
    ): Response {
        val responseBody = response.body
        if (!shouldCaptureBody() || responseBody == null) return response
        if (skipUnknownLength && responseBody.contentLength() < 0L) return response
        // Bound the in-memory capture to the preview size. The full body still streams to the
        // caller via the wrapper's live tail; only a preview prefix is buffered.
        val wrapped = LoggableResponseBody.bounded(responseBody, Io.provider, options.bodyPreviewMaxBytes.toLong())
        // Force drain so we have bytes to log. Done here (not in the event emit) so a logging
        // failure can't mask the drain error from the caller — they still get the wrapped body
        // with the cached exception surfaced on source().
        try {
            wrapped.snapshot(options.bodyPreviewMaxBytes)
        } catch (t: Throwable) {
            // Drain itself records the exception on the wrapper; capture for emit, but don't
            // let it propagate — the caller will see it on next source() call.
            logger.atWarning()
                .event("http.instrumentation.response_drain_failed")
                .field("error.type", t.javaClass.simpleName ?: "Throwable")
                .cause(t)
                .log()
        }
        return response.newBuilder().body(wrapped).build()
    }

    internal fun emitRequestEvent(
        request: Request,
        redactedUrl: String,
    ) {
        if (options.logLevel == HttpLogLevel.NONE) return
        try {
            val ev =
                logger.atInfo()
                    .event("http.request")
                    .field("http.request.method", request.method.name)
                    .field("url.full", redactedUrl)
                    .field("request.content.length", request.body?.contentLength() ?: -1L)
            appendHeadersFields(ev, request.headers, prefix = "http.request.header.")
            ev.log()
        } catch (t: Throwable) {
            emitInstrumentationError("http.instrumentation.emit_request_failed", "request_event", t)
        }
    }

    internal fun emitResponseEvent(
        request: Request,
        response: Response,
        redactedUrl: String,
        elapsedMs: Double,
        requestBody: LoggableRequestBody?,
    ) {
        if (options.logLevel == HttpLogLevel.NONE) return
        try {
            val ev =
                logger.atInfo()
                    .event("http.response")
                    .field("http.request.method", request.method.name)
                    .field("url.full", redactedUrl)
                    .field("http.response.status_code", response.status.code.toLong())
                    .field("http.response.duration_ms", elapsedMs)
                    .field("response.content.length", response.body?.contentLength() ?: -1L)
            appendHeadersFields(ev, response.headers, prefix = "http.response.header.")
            if (shouldCaptureBody()) {
                if (requestBody != null) {
                    val preview = requestBody.snapshot(options.bodyPreviewMaxBytes)
                    ev.field("request.body.size", preview.size.toLong())
                        .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType()))
                }
                val responseBody = response.body
                if (responseBody is LoggableResponseBody) {
                    val preview = responseBody.snapshot(options.bodyPreviewMaxBytes)
                    ev.field("response.body.size", preview.size.toLong())
                        .field("response.body.preview", BodyPreview.render(preview, responseBody.mediaType()))
                    responseBody.captureException?.let {
                        ev.field("response.body.drain_error", it.javaClass.simpleName ?: "Throwable")
                    }
                }
            }
            ev.log()
        } catch (t: Throwable) {
            emitInstrumentationError("http.instrumentation.emit_response_failed", "response_event", t)
        }
    }

    internal fun emitFailureEvent(
        request: Request,
        redactedUrl: String,
        cause: Throwable,
        elapsedMs: Double,
        requestBody: LoggableRequestBody?,
    ) {
        if (options.logLevel == HttpLogLevel.NONE) return
        try {
            val ev =
                logger.atWarning()
                    .event("http.response")
                    .field("http.request.method", request.method.name)
                    .field("url.full", redactedUrl)
                    .field("error.type", cause.javaClass.simpleName ?: "Throwable")
                    .field("http.response.duration_ms", elapsedMs)
                    .cause(cause)
            if (shouldCaptureBody() && requestBody != null) {
                val preview = requestBody.snapshot(options.bodyPreviewMaxBytes)
                ev.field("request.body.size", preview.size.toLong())
                    .field("request.body.preview", BodyPreview.render(preview, requestBody.mediaType()))
            }
            ev.log()
        } catch (t: Throwable) {
            emitInstrumentationError("http.instrumentation.emit_failure_failed", "failure_event", t)
        }
    }

    private fun appendHeadersFields(
        ev: LoggingEvent,
        headers: Headers,
        prefix: String,
    ) {
        // Iterate the headers actually present rather than the allow-list — the allow-list is
        // usually larger than the headers on any one request.
        for ((nameLower, values) in headers.entries()) {
            val typed = HttpHeaderName.fromString(nameLower)
            when {
                options.allowedHeaderNames.contains(typed) ->
                    ev.field(prefix + nameLower, joinHeaderValues(values))
                options.isRedactedHeaderNamesLoggingEnabled ->
                    ev.field(prefix + nameLower, "REDACTED")
                // else: silently omit
            }
        }
    }

    private fun joinHeaderValues(values: List<String>): String =
        if (values.size == 1) values[0] else values.joinToString(", ")

    internal fun safeRedact(request: Request): String =
        try {
            UrlRedactor.redact(request.url, options.allowedQueryParamNames)
        } catch (t: Throwable) {
            "[malformed url]"
        }

    internal fun spanAttributes(
        request: Request,
        redactedUrl: String,
    ): Map<String, Any> =
        mapOf(
            "http.request.method" to request.method.name,
            "url.full" to redactedUrl,
        )

    internal fun recordMetrics(
        request: Request,
        statusCode: Int,
        elapsedMs: Double,
        errorType: String?,
    ) {
        val attrs: Map<String, Any> =
            if (errorType != null) {
                mapOf(
                    "http.request.method" to request.method.name,
                    "error.type" to errorType,
                )
            } else {
                mapOf(
                    "http.request.method" to request.method.name,
                    "http.response.status_code" to statusCode,
                )
            }
        requestCounter.add(1L, attrs)
        latencyHistogram.record(elapsedMs, attrs)
    }

    internal fun elapsedMillis(startNanos: Long): Double = (clock.monotonic() - startNanos) / NANOS_PER_MILLI_DOUBLE

    private fun emitInstrumentationError(
        event: String,
        phase: String,
        t: Throwable,
    ) {
        // Best-effort secondary log. If this also throws, swallow — we're inside the logging
        // path already, and a thrown exception would corrupt the outer caller's flow.
        try {
            logger.atWarning()
                .event(event)
                .field("error.phase", phase)
                .field("error.type", t.javaClass.simpleName ?: "Throwable")
                .cause(t)
                .log()
        } catch (_: Throwable) {
            // Intentionally swallowed — logging is best-effort.
        }
    }

    private companion object {
        // Nanoseconds in one millisecond, expressed as Double so the division returns
        // millisecond fractions (e.g. 1.234 ms) for high-resolution latency histograms.
        private const val NANOS_PER_MILLI_DOUBLE = 1_000_000.0
    }
}

Usage — before → after: (the async step's handleCompletion success branch is
representative; the sync process body is the same shape)

// DefaultAsyncInstrumentationStep — fields + handleCompletion success branch, before
private val requestCounter: LongCounter by lazy(LazyThreadSafetyMode.PUBLICATION) { /* ... */ }
private val latencyHistogram: DoubleHistogram by lazy(LazyThreadSafetyMode.PUBLICATION) { /* ... */ }
// ...
return mdc.withMdc {
    val wrapped = wrapResponseForLogging(raw!!)
    emitResponseEvent(outgoing, wrapped, redactedUrl, elapsedMs, wrappedRequestBody)
    recordMetrics(request, statusCode = wrapped.status.code, elapsedMs, errorType = null)
    span.end()
    wrapped
}
// DefaultAsyncInstrumentationStep — one emitter field + delegating calls, after
private val emitter = InstrumentationEmitter(options, clock, logger)
// ...
return mdc.withMdc {
    val wrapped = emitter.wrapResponseForLogging(raw!!, skipUnknownLength = true)
    emitter.emitResponseEvent(outgoing, wrapped, redactedUrl, elapsedMs, wrappedRequestBody)
    emitter.recordMetrics(request, statusCode = wrapped.status.code, elapsedMs, errorType = null)
    span.end()
    wrapped
}
// DefaultInstrumentationStep delegates identically but calls
// emitter.wrapResponseForLogging(raw, skipUnknownLength = false)

Why: removes ~200 lines of byte-for-byte copy-paste, so the next change to header
redaction, body-preview capture, or metric attributes is made once instead of twice; the
one real sync/async difference is now an explicit named argument instead of a silent
divergence between two files.

Build: new file needs the standard MIT header. The helper-only imports
(Headers, HttpHeaderName, LoggableResponseBody, LoggingEvent, UrlRedactor,
DoubleHistogram, LongCounter) move from both step files into the emitter; the step
files must drop them or allWarningsAsErrors will fail on the now-unused imports. Each
step keeps Io and LoggableRequestBody (still used for request-body wrapping). Internal
only — no apiCheck impact.


Metadata

Metadata

Assignees

No one assigned

    Labels

    sdk-coresdk-core toolkittech-debtsimplification / cleanup

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions