Skip to content
Merged
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,7 @@ dependencies = [
"linkerd-io",
"linkerd-stack",
"pin-project",
"thiserror 2.0.11",
"tokio",
"tower",
"tracing",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/upgrade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, default-features = false, features = ["deprecated", "client"] }
pin-project = "1"
thiserror = "2"
tokio = { version = "1", default-features = false }
tower = { version = "0.4", default-features = false }
tracing = "0.1"
Expand Down
7 changes: 6 additions & 1 deletion linkerd/http/upgrade/src/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ impl<B> PinnedDrop for UpgradeBody<B> {
let this = self.project();
// If an HTTP/1 upgrade was wanted, send the upgrade future.
if let Some((upgrade, on_upgrade)) = this.upgrade.take() {
upgrade.insert_half(on_upgrade);
if let Err(error) = upgrade.insert_half(on_upgrade) {
tracing::warn!(
?error,
"upgrade body could not send upgrade future upon completion"
);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http/upgrade/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub mod upgrade;
/// > fields' semantics. This includes but is not limited to:
/// >
/// > - `Proxy-Connection` (Appendix C.2.2 of [HTTP/1.1])
/// > - `Keep-Alive` (Section 19.7.1 of [RFC2068])
/// > - `Keep-Alive` (Section 19.7.1 of \[RFC2068\])
/// > - `TE` (Section 10.1.4)
/// > - `Transfer-Encoding` (Section 6.1 of [HTTP/1.1])
/// > - `Upgrade` (Section 7.8)
Expand Down
59 changes: 45 additions & 14 deletions linkerd/http/upgrade/src/upgrade.rs
Comment thread
cratelyn marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use try_lock::TryLock;
/// inserted into the `Request::extensions()`. If the HTTP1 client service
/// also detects an upgrade, the two `OnUpgrade` futures will be joined
/// together with the glue in this type.
// Note: this relies on there only having been 2 Inner clones, so don't
// implement `Clone` for this type.
pub struct Http11Upgrade {
half: Half,
inner: Arc<Inner>,
inner: Option<Arc<Inner>>,
}

/// A named "tuple" returned by [`Http11Upgade::halves()`] of the two halves of
Expand All @@ -50,7 +48,7 @@ struct Inner {
upgrade_drain_signal: Option<drain::Watch>,
}

#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
enum Half {
Server,
Client,
Expand All @@ -63,6 +61,13 @@ pub struct Service<S> {
upgrade_drain_signal: drain::Watch,
}

#[derive(Debug, thiserror::Error)]
#[error("OnUpgrade future has already been inserted: half={half:?}")]
pub struct AlreadyInserted {
half: Half,
pub upgrade: OnUpgrade,
}

// === impl Http11Upgrade ===

impl Http11Upgrade {
Expand All @@ -80,35 +85,42 @@ impl Http11Upgrade {
Http11UpgradeHalves {
server: Http11Upgrade {
half: Half::Server,
inner: inner.clone(),
inner: Some(inner.clone()),
},
client: Http11Upgrade {
half: Half::Client,
inner,
inner: Some(inner.clone()),
},
}
}

pub fn insert_half(self, upgrade: OnUpgrade) {
match self.half {
Half::Server => {
let mut lock = self
.inner
pub fn insert_half(self, upgrade: OnUpgrade) -> Result<(), AlreadyInserted> {
match self {
Self {
inner: Some(inner),
half: Half::Server,
} => {
let mut lock = inner
.server
.try_lock()
.expect("only Half::Server touches server TryLock");
debug_assert!(lock.is_none());
*lock = Some(upgrade);
Ok(())
}
Half::Client => {
let mut lock = self
.inner
Self {
inner: Some(inner),
half: Half::Client,
} => {
let mut lock = inner
.client
.try_lock()
.expect("only Half::Client touches client TryLock");
debug_assert!(lock.is_none());
*lock = Some(upgrade);
Ok(())
}
Self { inner: None, half } => Err(AlreadyInserted { half, upgrade }),
}
}
}
Expand All @@ -121,6 +133,25 @@ impl fmt::Debug for Http11Upgrade {
}
}

/// An [`Http11Upgrade`] can be cloned.
///
/// NB: Only the original copy of this extension may insert an [`OnUpgrade`] future into its half
/// of the channel. Calling [`insert_half()`][Http11Upgrade::insert_half] on any clones of an
/// upgrade extension will result in an error.
// See the [`Drop`] implementation provided by `Inner` for more information.
impl Clone for Http11Upgrade {
fn clone(&self) -> Self {
Self {
half: self.half,
// We do *NOT* deeply clone our reference to `Inner`.
//
// `Http11Upgrade::insert_half()` and the `Inner` type's `Drop` glue rely on there only
// being one copy of the client and sender halves of the upgrade channel.
inner: None,
}
}
}

/// When both halves have dropped, check if both sides are inserted,
/// and if so, spawn the upgrade task.
impl Drop for Inner {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ where
client.as_ref().unwrap().request(req)
};

Box::pin(rsp_fut.err_into().map_ok(move |mut rsp| {
Box::pin(async move {
let mut rsp = rsp_fut.await?;
if is_http_connect {
// Add an extension to indicate that this a response to a CONNECT request.
debug_assert!(
Expand All @@ -161,14 +162,14 @@ where
if is_upgrade(&rsp) {
trace!("Client response is HTTP/1.1 upgrade");
if let Some(upgrade) = upgrade {
upgrade.insert_half(hyper::upgrade::on(&mut rsp));
upgrade.insert_half(hyper::upgrade::on(&mut rsp))?;
}
} else {
linkerd_http_upgrade::strip_connection_headers(rsp.headers_mut());
}

rsp.map(BoxBody::new)
}))
Ok(rsp.map(BoxBody::new))
})
}
}

Expand Down