From 28c80b8a936b7a9c72d819991de8b17a46ac936a Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 1/5] refactor(http/retry): `replays_trailers()` uses `Frame` see https://github.com/linkerd/linkerd2/issues/8733. this commit upgrades a test that uses defunct `data()` and `trailers()` futures. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay.rs | 39 ++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index e1d7c94725..f183f8ac65 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -547,8 +547,8 @@ mod tests { async fn replays_trailers() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -560,20 +560,31 @@ mod tests { tx.send_trailers(tlrs.clone()).await; drop(tx); - while initial.data().await.is_some() { - // do nothing - } - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + let read_trailers = |body: ReplayBody<_>| async move { + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let _ = body + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_data() + .expect("should yield data"); + let trls = body + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert!(body.frame().await.is_none()); + trls + }; - // drop the initial body to send the data to the replay - drop(initial); + let initial_tlrs = read_trailers(initial).await; + assert_eq!(&initial_tlrs, &tlrs); - while replay.data().await.is_some() { - // do nothing - } - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + let replay_tlrs = read_trailers(replay).await; + assert_eq!(&replay_tlrs, &tlrs); } #[tokio::test] From cc7adcb530e8e892a48f55b280a72ca641b56540 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 2/5] refactor(http/retry): `trailers_only()` uses `Frame` see https://github.com/linkerd/linkerd2/issues/8733. this commit upgrades a test that uses defunct `data()` and `trailers()` futures. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay.rs | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index f183f8ac65..5003d6c1d0 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -591,10 +591,12 @@ mod tests { async fn trailers_only() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); let mut tlrs = HeaderMap::new(); tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); @@ -604,16 +606,26 @@ mod tests { drop(tx); - assert!(dbg!(initial.data().await).is_none(), "no data in body"); - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + let initial_tlrs = initial + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert_eq!(&initial_tlrs, &tlrs); // drop the initial body to send the data to the replay drop(initial); - assert!(dbg!(replay.data().await).is_none(), "no data in body"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + let replay_tlrs = replay + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert_eq!(&replay_tlrs, &tlrs); } #[tokio::test(flavor = "current_thread")] From 29aa70ea1798b722941893c10272bffb787b2249 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 3/5] feat(http/retry): `ForwardCompatibleBody::is_end_stream()` this commit adds a method that exposes the inner `B`-typed body's `is_end_stream()` trait method, gated for use in tests. Signed-off-by: katelyn martin --- linkerd/http/retry/src/compat.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs index 1294319496..79698d0797 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/retry/src/compat.rs @@ -40,6 +40,12 @@ impl ForwardCompatibleBody { pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> { combinators::Frame(self) } + + /// Returns `true` when the end of stream has been reached. + #[cfg(test)] + pub(crate) fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } } /// Future that resolves to the next frame from a `Body`. From af035aa4bbd01b5fac962568eb25e322c4435934 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 4/5] refactor(http/retry): `body_to_string()` helper uses `Frame` this is a refactoring commit, upgrading more of the replay body test to work in terms of `Frame`. this updates the `body_to_string()` helper in particular. Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay.rs | 192 ++++++++++++++++++------------- 1 file changed, 112 insertions(+), 80 deletions(-) diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 5003d6c1d0..97452bc8b7 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -513,11 +513,16 @@ mod tests { tx.send_data("hello world").await; drop(tx); - let initial = body_to_string(initial).await; - assert_eq!(initial, "hello world"); - - let replay = body_to_string(replay).await; - assert_eq!(replay, "hello world"); + { + let (data, trailers) = body_to_string(initial).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } + { + let (data, trailers) = body_to_string(replay).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } } #[tokio::test] @@ -536,11 +541,13 @@ mod tests { tx.send_data(" of fun!").await; }); - let initial = body_to_string(initial).await; + let (initial, trailers) = body_to_string(initial).await; assert_eq!(initial, "hello world, have lots of fun!"); + assert!(trailers.is_none()); - let replay = body_to_string(replay).await; + let (replay, trailers) = body_to_string(replay).await; assert_eq!(replay, "hello world, have lots of fun!"); + assert!(trailers.is_none()); } #[tokio::test] @@ -654,18 +661,17 @@ mod tests { tx.send_trailers(HeaderMap::new()).await; }); - assert_eq!( - body_to_string(&mut replay).await, - "hello world, have lots of fun" - ); + let (data, trailers) = body_to_string(&mut replay).await; + assert_eq!(data, "hello world, have lots of fun"); + assert!(trailers.is_some()); } #[tokio::test(flavor = "current_thread")] async fn multiple_replays() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -680,27 +686,18 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!(body_to_string(&mut initial).await, "hello world"); - - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - let mut replay2 = replay.clone(); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(replay); + let read = |body| async { + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + }; - assert_eq!(body_to_string(&mut replay2).await, "hello world"); + read(initial).await; - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + // Replay the body twice. + let replay2 = replay.clone(); + read(replay).await; + read(replay2).await; } #[tokio::test(flavor = "current_thread")] @@ -723,7 +720,7 @@ mod tests { drop(initial); tracing::info!("dropped initial body"); - let mut replay2 = replay.clone(); + let replay2 = replay.clone(); tx.send_data(" world").await; assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); @@ -740,21 +737,17 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!( - body_to_string(&mut replay2).await, - "hello world, have lots of fun!" - ); - - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + let (data, replay2_trailers) = body_to_string(replay2).await; + assert_eq!(data, "hello world, have lots of fun!"); + assert_eq!(replay2_trailers.as_ref(), Some(&tlrs)); } #[tokio::test(flavor = "current_thread")] async fn drop_clone_early() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -769,21 +762,23 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!(body_to_string(&mut initial).await, "hello world"); - - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); + { + let body = initial; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } - // clone the body again and then drop it + // Clone the body, and then drop it before polling it. let replay2 = replay.clone(); drop(replay2); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + { + let body = replay; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } } // This test is specifically for behavior across clones, so the clippy lint @@ -808,40 +803,58 @@ mod tests { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) + let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) .expect("body must not be too large"); - let mut replay = initial.clone(); + let replay = initial.clone(); - body_to_string(&mut initial).await; - assert!(!replay.is_end_stream()); + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); - initial.trailers().await.expect("trailers should not error"); + // Read the initial body, show that the replay does not consider itself to have reached the + // end-of-stream. Then drop the initial body, show that the replay is still not done. + assert!(!initial.is_end_stream()); + initial + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); assert!(initial.is_end_stream()); assert!(!replay.is_end_stream()); - - // drop the initial body to send the data to the replay drop(initial); - assert!(!replay.is_end_stream()); - body_to_string(&mut replay).await; + // Read the replay body. assert!(!replay.is_end_stream()); - - replay.trailers().await.expect("trailers should not error"); + replay + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay.frame().await.is_none()); assert!(replay.is_end_stream()); - // Even if we clone a body _after_ it has been driven to EOS, the clone - // must not be EOS. - let mut replay2 = replay.clone(); + // Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS. + let replay = replay.into_inner(); + let replay2 = replay.clone(); assert!(!replay2.is_end_stream()); - // drop the initial body to send the data to the replay + // Drop the first replay body to send the data to the second replay. drop(replay); - body_to_string(&mut replay2).await; - assert!(!replay2.is_end_stream()); - - replay2.trailers().await.expect("trailers should not error"); + // Read the second replay body. + let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + replay2 + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay2.frame().await.is_none()); assert!(replay2.is_end_stream()); } @@ -1012,17 +1025,36 @@ mod tests { chunk } - async fn body_to_string(mut body: T) -> String + async fn body_to_string(body: B) -> (String, Option) where - T: http_body::Body + Unpin, - T::Error: std::fmt::Debug, + B: http_body::Body + Unpin, + B::Error: std::fmt::Debug, { - let mut s = String::new(); - while let Some(chunk) = chunk(&mut body).await { - s.push_str(&chunk[..]); + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let mut data = String::new(); + let mut trailers = None; + + // Continue reading frames from the body until it is finished. + while let Some(frame) = body + .frame() + .await + .transpose() + .expect("reading a frame succeeds") + { + match frame.into_data().map(string) { + Ok(ref s) => data.push_str(s), + Err(frame) => { + let trls = frame + .into_trailers() + .map_err(drop) + .expect("test frame is either data or trailers"); + trailers = Some(trls); + } + } } - tracing::info!(body = ?s, "no more data"); - s + + tracing::info!(?data, ?trailers, "finished reading body"); + (data, trailers) } fn string(mut data: impl Buf) -> String { From 0b83e90006404ae3ec296d32ad8e7c5121be1ffd Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH 5/5] refactor(http/retry): `chunk()` helper uses `Frame` Signed-off-by: katelyn martin --- linkerd/http/retry/src/replay.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 97452bc8b7..11eb53aa41 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -1016,10 +1016,14 @@ mod tests { T: http_body::Body + Unpin, { tracing::trace!("waiting for a body chunk..."); - let chunk = body - .data() + let chunk = crate::compat::ForwardCompatibleBody::new(body) + .frame() .await - .map(|res| res.map_err(|_| ()).unwrap()) + .expect("yields a result") + .ok() + .expect("yields a frame") + .into_data() + .ok() .map(string); tracing::info!(?chunk); chunk