From 0aaf7c058788a760910d82971a9e79db2426b596 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Wed, 11 Dec 2024 01:15:25 -0500 Subject: [PATCH] chore(app/inbound): address hyper deprecations in http/1 tests this is a follow-up commit related to 24dc5d8a (#3445). see for more information on upgrading to hyper 1.0. --- this addresses hyper deprecations in the http/1 tests for the inbound proxy. prior, we made use of `tower::ServiceExt::oneshot`, which consumes a service and drops it after sending a request and polling the response future to completion. tower is not a 1.0 library yet, so `SendRequest` does not provide an implementation of `tower::Service` in hyper's 1.0 interface: - - consequentially, we must drop the sender ourselves after receiving a response now. --- this commit *also* addresses hyper deprecations in the http/1 downgrade tests for the inbound proxy. because these tests involve a http/2 client and an http/1 server, we take the choice of inlining the body of `http_util::connect_and_accept()` rather than introducing a new, third `http_util::connect_and_accept_http_downgrade()` function. we will refactor these helper functions in follow-on commits. NB: because `ContextError` is internal to the `linkerd-app-test` crate, we do not wrap the errors. these are allegedly used by the fuzzing tests (_see f.ex #986 and #989_), but for our purposes with respect to the inbound proxy we can elide them rather than making `ctx()` a public method. --- Signed-off-by: katelyn martin --- linkerd/app/inbound/src/http/tests.rs | 113 +++++++++++++++++++------- linkerd/app/test/src/http_util.rs | 45 ++++++++++ 2 files changed, 129 insertions(+), 29 deletions(-) diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index 524971713d..3a86df8fde 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -48,8 +48,7 @@ where #[tokio::test(flavor = "current_thread")] async fn unmeshed_http1_hello_world() { let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -64,7 +63,7 @@ async fn unmeshed_http1_hello_world() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; let req = Request::builder() .method(http::Method::GET) @@ -72,7 +71,7 @@ async fn unmeshed_http1_hello_world() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -81,6 +80,7 @@ async fn unmeshed_http1_hello_world() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -92,9 +92,7 @@ async fn unmeshed_http1_hello_world() { async fn downgrade_origin_form() { // Reproduces https://github.com/linkerd/linkerd2/issues/5298 let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); + let client = hyper::client::conn::http2::Builder::new(TracingExecutor); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -109,7 +107,35 @@ async fn downgrade_origin_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = { + tracing::info!(settings = ?client, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server.oneshot(server_io).await?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) + }; let req = Request::builder() .method(http::Method::GET) @@ -119,7 +145,7 @@ async fn downgrade_origin_form() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -128,6 +154,7 @@ async fn downgrade_origin_form() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -137,10 +164,8 @@ async fn downgrade_origin_form() { #[tokio::test(flavor = "current_thread")] async fn downgrade_absolute_form() { + let client = hyper::client::conn::http2::Builder::new(TracingExecutor); let server = hyper::server::conn::http1::Builder::new(); - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); - client.http2_only(true); let _trace = trace_init(); // Build a mock "connector" that returns the upstream "server" IO. @@ -155,7 +180,36 @@ async fn downgrade_absolute_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + + let (mut client, bg) = { + tracing::info!(settings = ?client, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server.oneshot(server_io).await?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) + }; let req = Request::builder() .method(http::Method::GET) @@ -165,7 +219,7 @@ async fn downgrade_absolute_form() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -174,6 +228,7 @@ async fn downgrade_absolute_form() { assert_eq!(body, "Hello world!"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -190,8 +245,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { // Build a client using the connect that always errors so that responses // are BAD_GATEWAY. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -199,7 +253,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -221,6 +275,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { check_error_header(rsp.headers(), "server is not listening"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -237,8 +292,7 @@ async fn http1_bad_gateway_unmeshed_response() { // Build a client using the connect that always errors so that responses // are BAD_GATEWAY. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -246,7 +300,7 @@ async fn http1_bad_gateway_unmeshed_response() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -256,7 +310,7 @@ async fn http1_bad_gateway_unmeshed_response() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -267,6 +321,7 @@ async fn http1_bad_gateway_unmeshed_response() { ); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -285,8 +340,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -294,7 +348,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -304,7 +358,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -317,6 +371,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { check_error_header(rsp.headers(), "connect timed out after 1s"); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() @@ -335,8 +390,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { // Build a client using the connect that always sleeps so that responses // are GATEWAY_TIMEOUT. - #[allow(deprecated)] // linkerd/linkerd2#8733 - let mut client = hyper::client::conn::Builder::new(); + let mut client = hyper::client::conn::http1::Builder::new(); let profiles = profile::resolver(); let profile_tx = profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap()); @@ -344,7 +398,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (mut client, bg) = http_util::connect_and_accept_http1(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -354,7 +408,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { .body(Body::default()) .unwrap(); let rsp = client - .oneshot(req) + .send_request(req) .await .expect("HTTP client request failed"); tracing::info!(?rsp); @@ -365,6 +419,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { ); // Wait for all of the background tasks to complete, panicking if any returned an error. + drop(client); bg.join_all() .await .into_iter() diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index f675263b27..b4be313603 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -55,6 +55,51 @@ pub async fn connect_and_accept( (client, bg) } +/// Connects a client and server, running a proxy between them. +/// +/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and +/// await a response, and (2) a [`JoinSet`] running background tasks. +pub async fn connect_and_accept_http1( + client_settings: &mut hyper::client::conn::http1::Builder, + server: BoxServer, +) -> ( + hyper::client::conn::http1::SendRequest, + JoinSet>, +) { + tracing::info!(settings = ?client_settings, "connecting client with"); + let (client_io, server_io) = io::duplex(4096); + + let (client, conn) = client_settings + .handshake(client_io) + .await + .expect("Client must connect"); + + let mut bg = tokio::task::JoinSet::new(); + bg.spawn( + async move { + server + .oneshot(server_io) + .await + .map_err(ContextError::ctx("proxy background task failed"))?; + tracing::info!("proxy serve task complete"); + Ok(()) + } + .instrument(tracing::info_span!("proxy")), + ); + bg.spawn( + async move { + conn.await + .map_err(ContextError::ctx("client background task failed")) + .map_err(Error::from)?; + tracing::info!("client background complete"); + Ok(()) + } + .instrument(tracing::info_span!("client_bg")), + ); + + (client, bg) +} + /// Connects a client and server, running a proxy between them. /// /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and