Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public class JdkHttpTransport private constructor(
/**
* Asynchronously executes [request]. Returns a [CompletableFuture] that completes with
* the [Response] on success or completes exceptionally with the transport failure on
* error.
* error — including a request-adaptation failure, which runs on the calling thread but is
* delivered through the future rather than thrown synchronously.
*
* Cancelling the returned future cancels the underlying JDK exchange, and a response that
* arrives in the cancellation race window is closed so its connection is not leaked (see
Expand All @@ -137,7 +138,19 @@ public class JdkHttpTransport private constructor(
* completions to release the body's connection back to the pool.
*/
override fun executeAsync(request: Request): CompletableFuture<Response> {
val jdkRequest = requestAdapter.adapt(request, responseTimeout)
val jdkRequest =
try {
requestAdapter.adapt(request, responseTimeout)
} catch (e: Exception) {
// The async contract is that errors arrive through the returned future. Request
// adaptation runs on the caller's thread and can throw (e.g. a CONNECT request the
// JDK client rejects), so route the failure into a failed future instead of
// throwing synchronously where a future-composing caller's .exceptionally/.handle
// would never observe it. Errors (OOM and other JVM-fatal conditions) are left to
// propagate up the caller's stack rather than be packaged into a future that may
// never be awaited.
return CompletableFuture.failedFuture<Response>(e)
}
val inFlight = client.sendAsync(jdkRequest, HttpResponse.BodyHandlers.ofInputStream())
return bridgeAsyncResponse(inFlight) { jdkResponse -> responseAdapter.adapt(request, jdkResponse) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import java.time.Duration
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletionException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Flow
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
Expand All @@ -44,6 +45,7 @@ import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue

Expand Down Expand Up @@ -144,6 +146,29 @@ class JdkHttpTransportTest {
assertEquals(payload, recorded.body?.utf8())
}

// -------- async adaptation failures --------

@Test
fun `executeAsyncDeliversAdaptationFailureThroughFuture`() {
// A CONNECT request makes request adaptation throw synchronously inside executeAsync
// (the JDK client reserves CONNECT for internal tunnelling). The contract is that
// executeAsync completes exceptionally on error, so the failure must arrive through the
// returned future — a future-composing caller's .exceptionally/.handle would never
// observe a synchronous throw.
val request =
Request.builder()
.method(Method.CONNECT)
.url(server.url("/async-adapt-fail").toUrl())
.build()
// Must return a future rather than throwing on the caller's thread.
val future = transport.executeAsync(request)
val ex = assertFailsWith<ExecutionException> { future.get(5, TimeUnit.SECONDS) }
assertTrue(
ex.cause is IllegalArgumentException,
"adaptation failure must surface as the future's cause, was: ${ex.cause?.let { it::class }}",
)
}

// -------- headers round-trip --------

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,24 @@ public class OkHttpTransport private constructor(
/**
* Asynchronously executes [request]. Returns a [CompletableFuture] that completes with
* the [Response] on success or completes exceptionally with the transport failure on
* error. Cancelling the future cancels the underlying OkHttp [Call].
* error — including a request-adaptation failure, which runs on the calling thread but is
* delivered through the future rather than thrown synchronously. Cancelling the future
* cancels the underlying OkHttp [Call].
*/
override fun executeAsync(request: Request): CompletableFuture<Response> {
val okRequest = requestAdapter.adapt(request)
val okRequest =
try {
requestAdapter.adapt(request)
} catch (e: Exception) {
// The async contract is that errors arrive through the returned future. Request
// adaptation runs on the caller's thread and can throw (e.g. a method/body
// mismatch OkHttp rejects), so route the failure into a completed-exceptionally
// future instead of throwing synchronously where a future-composing caller's
// .exceptionally/.handle would never observe it. Errors (OOM and other JVM-fatal
// conditions) are left to propagate up the caller's stack rather than be packaged
// into a future that may never be awaited.
return failedFuture(e)
}
val call = client.newCall(okRequest)
val future = CompletableFuture<Response>()
call.enqueue(
Expand Down Expand Up @@ -148,6 +162,14 @@ public class OkHttpTransport private constructor(
return future
}

/**
* A future already completed exceptionally with [t]. Routes a synchronous request-adaptation
* failure through the async contract's single error channel rather than throwing on the
* caller's thread. ([CompletableFuture.failedFuture] is JDK 9+; this module targets Java 8.)
*/
private fun failedFuture(t: Throwable): CompletableFuture<Response> =
CompletableFuture<Response>().apply { completeExceptionally(t) }

/**
* Best-effort close on a discard path. Used when an adapted [Response] loses the race to a
* cancelled future: the response is already being discarded, so a failure to close it has
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import java.time.Duration
import java.util.concurrent.CancellationException
import java.util.concurrent.CompletionException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -46,6 +47,7 @@ import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertFails
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
Expand Down Expand Up @@ -134,6 +136,29 @@ class OkHttpTransportTest {
assertEquals(payload, recorded.body?.utf8())
}

// -------- async adaptation failures --------

@Test
fun executeAsyncDeliversAdaptationFailureThroughFuture() {
// A body on a method OkHttp forbids one for (GET) makes request adaptation throw
// synchronously inside executeAsync. The contract is that executeAsync completes
// exceptionally on error, so the failure must arrive through the returned future — a
// future-composing caller's .exceptionally/.handle would never observe a synchronous throw.
val request =
Request.builder()
.method(Method.GET)
.url(server.url("/async-adapt-fail").toUrl())
.body(RequestBody.create("x".toByteArray()))
.build()
// Must return a future rather than throwing on the caller's thread.
val future = transport.executeAsync(request)
val ex = assertFailsWith<ExecutionException> { future.get(5, TimeUnit.SECONDS) }
assertTrue(
ex.cause is IllegalArgumentException,
"adaptation failure must surface as the future's cause, was: ${ex.cause?.let { it::class }}",
)
}

// -------- headers round-trip --------

@Test
Expand Down