diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/retry/src/compat.rs index 1294319496..79698d0797 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/retry/src/compat.rs @@ -40,6 +40,12 @@ impl ForwardCompatibleBody { pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> { combinators::Frame(self) } + + /// Returns `true` when the end of stream has been reached. + #[cfg(test)] + pub(crate) fn is_end_stream(&self) -> bool { + self.inner.is_end_stream() + } } /// Future that resolves to the next frame from a `Body`. diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index e1d7c94725..11eb53aa41 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -513,11 +513,16 @@ mod tests { tx.send_data("hello world").await; drop(tx); - let initial = body_to_string(initial).await; - assert_eq!(initial, "hello world"); - - let replay = body_to_string(replay).await; - assert_eq!(replay, "hello world"); + { + let (data, trailers) = body_to_string(initial).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } + { + let (data, trailers) = body_to_string(replay).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers, None); + } } #[tokio::test] @@ -536,19 +541,21 @@ mod tests { tx.send_data(" of fun!").await; }); - let initial = body_to_string(initial).await; + let (initial, trailers) = body_to_string(initial).await; assert_eq!(initial, "hello world, have lots of fun!"); + assert!(trailers.is_none()); - let replay = body_to_string(replay).await; + let (replay, trailers) = body_to_string(replay).await; assert_eq!(replay, "hello world, have lots of fun!"); + assert!(trailers.is_none()); } #[tokio::test] async fn replays_trailers() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -560,30 +567,43 @@ mod tests { tx.send_trailers(tlrs.clone()).await; drop(tx); - while initial.data().await.is_some() { - // do nothing - } - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + let read_trailers = |body: ReplayBody<_>| async move { + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let _ = body + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_data() + .expect("should yield data"); + let trls = body + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert!(body.frame().await.is_none()); + trls + }; - // drop the initial body to send the data to the replay - drop(initial); + let initial_tlrs = read_trailers(initial).await; + assert_eq!(&initial_tlrs, &tlrs); - while replay.data().await.is_some() { - // do nothing - } - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + let replay_tlrs = read_trailers(replay).await; + assert_eq!(&replay_tlrs, &tlrs); } #[tokio::test] async fn trailers_only() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); let mut tlrs = HeaderMap::new(); tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); @@ -593,16 +613,26 @@ mod tests { drop(tx); - assert!(dbg!(initial.data().await).is_none(), "no data in body"); - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); + let initial_tlrs = initial + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert_eq!(&initial_tlrs, &tlrs); // drop the initial body to send the data to the replay drop(initial); - assert!(dbg!(replay.data().await).is_none(), "no data in body"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + let replay_tlrs = replay + .frame() + .await + .expect("should yield a result") + .expect("should yield a frame") + .into_trailers() + .expect("should yield trailers"); + assert_eq!(&replay_tlrs, &tlrs); } #[tokio::test(flavor = "current_thread")] @@ -631,18 +661,17 @@ mod tests { tx.send_trailers(HeaderMap::new()).await; }); - assert_eq!( - body_to_string(&mut replay).await, - "hello world, have lots of fun" - ); + let (data, trailers) = body_to_string(&mut replay).await; + assert_eq!(data, "hello world, have lots of fun"); + assert!(trailers.is_some()); } #[tokio::test(flavor = "current_thread")] async fn multiple_replays() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -657,27 +686,18 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!(body_to_string(&mut initial).await, "hello world"); - - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); - - let mut replay2 = replay.clone(); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(replay); + let read = |body| async { + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + }; - assert_eq!(body_to_string(&mut replay2).await, "hello world"); + read(initial).await; - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + // Replay the body twice. + let replay2 = replay.clone(); + read(replay).await; + read(replay2).await; } #[tokio::test(flavor = "current_thread")] @@ -700,7 +720,7 @@ mod tests { drop(initial); tracing::info!("dropped initial body"); - let mut replay2 = replay.clone(); + let replay2 = replay.clone(); tx.send_data(" world").await; assert_eq!(chunk(&mut replay).await.unwrap(), "hello"); @@ -717,21 +737,17 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!( - body_to_string(&mut replay2).await, - "hello world, have lots of fun!" - ); - - let replay2_tlrs = replay2.trailers().await.expect("trailers should not error"); - assert_eq!(replay2_tlrs.as_ref(), Some(&tlrs)); + let (data, replay2_trailers) = body_to_string(replay2).await; + assert_eq!(data, "hello world, have lots of fun!"); + assert_eq!(replay2_trailers.as_ref(), Some(&tlrs)); } #[tokio::test(flavor = "current_thread")] async fn drop_clone_early() { let Test { mut tx, - mut initial, - mut replay, + initial, + replay, _trace, } = Test::new(); @@ -746,21 +762,23 @@ mod tests { tx.send_trailers(tlrs2).await; }); - assert_eq!(body_to_string(&mut initial).await, "hello world"); - - let initial_tlrs = initial.trailers().await.expect("trailers should not error"); - assert_eq!(initial_tlrs.as_ref(), Some(&tlrs)); - - // drop the initial body to send the data to the replay - drop(initial); + { + let body = initial; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } - // clone the body again and then drop it + // Clone the body, and then drop it before polling it. let replay2 = replay.clone(); drop(replay2); - assert_eq!(body_to_string(&mut replay).await, "hello world"); - let replay_tlrs = replay.trailers().await.expect("trailers should not error"); - assert_eq!(replay_tlrs.as_ref(), Some(&tlrs)); + { + let body = replay; + let (data, trailers) = body_to_string(body).await; + assert_eq!(data, "hello world"); + assert_eq!(trailers.as_ref(), Some(&tlrs)); + } } // This test is specifically for behavior across clones, so the clippy lint @@ -785,40 +803,58 @@ mod tests { async fn eos_only_when_fully_replayed() { // Test that each clone of a body is not EOS until the data has been // fully replayed. - let mut initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) + let initial = ReplayBody::try_new(BoxBody::from_static("hello world"), 64 * 1024) .expect("body must not be too large"); - let mut replay = initial.clone(); + let replay = initial.clone(); - body_to_string(&mut initial).await; - assert!(!replay.is_end_stream()); + let mut initial = crate::compat::ForwardCompatibleBody::new(initial); + let mut replay = crate::compat::ForwardCompatibleBody::new(replay); - initial.trailers().await.expect("trailers should not error"); + // Read the initial body, show that the replay does not consider itself to have reached the + // end-of-stream. Then drop the initial body, show that the replay is still not done. + assert!(!initial.is_end_stream()); + initial + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); assert!(initial.is_end_stream()); assert!(!replay.is_end_stream()); - - // drop the initial body to send the data to the replay drop(initial); - assert!(!replay.is_end_stream()); - body_to_string(&mut replay).await; + // Read the replay body. assert!(!replay.is_end_stream()); - - replay.trailers().await.expect("trailers should not error"); + replay + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay.frame().await.is_none()); assert!(replay.is_end_stream()); - // Even if we clone a body _after_ it has been driven to EOS, the clone - // must not be EOS. - let mut replay2 = replay.clone(); + // Even if we clone a body _after_ it has been driven to EOS, the clone must not be EOS. + let replay = replay.into_inner(); + let replay2 = replay.clone(); assert!(!replay2.is_end_stream()); - // drop the initial body to send the data to the replay + // Drop the first replay body to send the data to the second replay. drop(replay); - body_to_string(&mut replay2).await; - assert!(!replay2.is_end_stream()); - - replay2.trailers().await.expect("trailers should not error"); + // Read the second replay body. + let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + replay2 + .frame() + .await + .expect("yields a result") + .expect("yields a frame") + .into_data() + .expect("yields a data frame"); + assert!(replay2.frame().await.is_none()); assert!(replay2.is_end_stream()); } @@ -980,26 +1016,49 @@ mod tests { T: http_body::Body + Unpin, { tracing::trace!("waiting for a body chunk..."); - let chunk = body - .data() + let chunk = crate::compat::ForwardCompatibleBody::new(body) + .frame() .await - .map(|res| res.map_err(|_| ()).unwrap()) + .expect("yields a result") + .ok() + .expect("yields a frame") + .into_data() + .ok() .map(string); tracing::info!(?chunk); chunk } - async fn body_to_string(mut body: T) -> String + async fn body_to_string(body: B) -> (String, Option) where - T: http_body::Body + Unpin, - T::Error: std::fmt::Debug, + B: http_body::Body + Unpin, + B::Error: std::fmt::Debug, { - let mut s = String::new(); - while let Some(chunk) = chunk(&mut body).await { - s.push_str(&chunk[..]); + let mut body = crate::compat::ForwardCompatibleBody::new(body); + let mut data = String::new(); + let mut trailers = None; + + // Continue reading frames from the body until it is finished. + while let Some(frame) = body + .frame() + .await + .transpose() + .expect("reading a frame succeeds") + { + match frame.into_data().map(string) { + Ok(ref s) => data.push_str(s), + Err(frame) => { + let trls = frame + .into_trailers() + .map_err(drop) + .expect("test frame is either data or trailers"); + trailers = Some(trls); + } + } } - tracing::info!(body = ?s, "no more data"); - s + + tracing::info!(?data, ?trailers, "finished reading body"); + (data, trailers) } fn string(mut data: impl Buf) -> String {