From 7c0c3dce98b6a0b382a4798c763b2ee5457c3c16 Mon Sep 17 00:00:00 2001 From: Aaron Date: Thu, 8 May 2025 17:04:29 -0400 Subject: [PATCH] Cleanup on connection drop --- crates/rmcp/src/transport/sse_server.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/rmcp/src/transport/sse_server.rs b/crates/rmcp/src/transport/sse_server.rs index 5389ea9da..d23f5c547 100644 --- a/crates/rmcp/src/transport/sse_server.rs +++ b/crates/rmcp/src/transport/sse_server.rs @@ -89,6 +89,8 @@ async fn sse_handler( use tokio_util::sync::PollSender; let (from_client_tx, from_client_rx) = tokio::sync::mpsc::channel(64); let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(64); + let to_client_tx_clone = to_client_tx.clone(); + app.txs .write() .await @@ -123,6 +125,19 @@ async fn sse_handler( Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), } })); + + tokio::spawn(async move { + // Wait for connection closure + to_client_tx_clone.closed().await; + + // Clean up session + let session_id = session.clone(); + let tx_store = app.txs.clone(); + let mut txs = tx_store.write().await; + txs.remove(&session_id); + tracing::debug!(%session_id, "Closed session and cleaned up resources"); + }); + Ok(Sse::new(stream).keep_alive(KeepAlive::new().interval(ping_interval))) }