Skip to content

[S-security] Add Connection Pool Limits to Prevent Resource Exhaustion #99

Description

@adolago

Problem

The connection pool implementation in src/connection/mod.rs:778-828 has no configurable limits on:

  1. Maximum connections per host: Unlimited connections can be opened to the same host
  2. Maximum total connections: No global limit on total pool size
  3. Idle connection timeout: Connections are never closed for inactivity
  4. Connection timeout: Per-host connection timeout but no pool-wide timeout

From src/connection/mod.rs:600-607:

pub fn new(config: ConnectionConfig) -> Self {
    Self {
        config: Arc::new(config),
        pool: Arc::new(RwLock::new(ConnectionPool::new(10))), // Default pool size of 10
    }
}

The pool size parameter exists but:

  • It's a fixed size for all hosts combined, not per-host
  • There's no per-host limit enforcement
  • No idle timeout to release unused connections
  • No connection lifetime limits

Impact

  • Resource Exhaustion: Can exhaust file descriptors and network connections
  • Memory Leaks: Idle connections accumulate without cleanup
  • Denial of Service: Large playbooks could overwhelm network capacity
  • Instability: System may fail under high concurrency

Proposed Solution

Add comprehensive connection pool configuration:

pub struct ConnectionPoolConfig {
    /// Maximum connections per host (default: 5)
    pub max_connections_per_host: usize,
    
    /// Maximum total connections across all hosts (default: 100)
    pub max_total_connections: usize,
    
    /// Idle connection timeout in seconds (default: 300)
    pub idle_timeout: Option<Duration>,
    
    /// Maximum connection lifetime in seconds (default: 3600)
    pub max_connection_lifetime: Option<Duration>,
    
    /// Connection acquisition timeout in seconds (default: 30)
    pub acquire_timeout: Duration,
}

impl Default for ConnectionPoolConfig {
    fn default() -> Self {
        Self {
            max_connections_per_host: 5,
            max_total_connections: 100,
            idle_timeout: Some(Duration::from_secs(300)),
            max_connection_lifetime: Some(Duration::from_secs(3600)),
            acquire_timeout: Duration::from_secs(30),
        }
    }
}

pub struct ConnectionPool {
    config: ConnectionPoolConfig,
    /// Active connections by pool key
    connections: HashMap<String, PooledConnection>,
    /// Per-host connection counts
    host_counts: HashMap<String, usize>,
    /// Total connection count
    total_count: usize,
}

struct PooledConnection {
    connection: Arc<dyn Connection + Send + Sync>,
    created_at: Instant,
    last_used: Instant,
    host_key: String,
}

Update ConnectionPool methods:

impl ConnectionPool {
    pub fn new(config: ConnectionPoolConfig) -> Self {
        Self {
            config,
            connections: HashMap::new(),
            host_counts: HashMap::new(),
            total_count: 0,
        }
    }
    
    pub fn get(&mut self, key: &str) -> Option<Arc<dyn Connection + Send + Sync>> {
        let now = Instant::now();
        
        // Clean up expired connections first
        self.cleanup_expired(now);
        
        // Get existing connection
        if let Some(pooled) = self.connections.get_mut(key) {
            // Check if still alive
            if !pooled.connection.is_alive().now_or(false) {
                self.remove(key);
                return None;
            }
            
            // Update last used time
            pooled.last_used = now;
            return Some(pooled.connection.clone());
        }
        
        None
    }
    
    pub fn put(&mut self, key: String, conn: Arc<dyn Connection + Send + Sync>) -> ConnectionResult<()> {
        let host_key = Self::extract_host_key(&key)?;
        let now = Instant::now();
        
        // Check per-host limit
        let host_count = self.host_counts.get(&host_key).copied().unwrap_or(0);
        if host_count >= self.config.max_connections_per_host {
            return Err(ConnectionError::PoolExhausted);
        }
        
        // Check total limit
        if self.total_count >= self.config.max_total_connections {
            return Err(ConnectionError::PoolExhausted);
        }
        
        // Add to pool
        self.connections.insert(
            key.clone(),
            PooledConnection {
                connection: conn,
                created_at: now,
                last_used: now,
                host_key: host_key.clone(),
            }
        );
        
        // Update counts
        self.host_counts.insert(host_key, host_count + 1);
        self.total_count += 1;
        
        Ok(())
    }
    
    pub fn remove(&mut self, key: &str) {
        if let Some(pooled) = self.connections.remove(key) {
            // Update counts
            let host_count = self.host_counts.get(&pooled.host_key).copied().unwrap_or(0);
            if host_count <= 1 {
                self.host_counts.remove(&pooled.host_key);
            } else {
                self.host_counts.insert(pooled.host_key.clone(), host_count - 1);
            }
            self.total_count -= 1;
        }
    }
    
    fn cleanup_expired(&mut self, now: Instant) {
        let mut expired_keys = Vec::new();
        
        for (key, pooled) in &self.connections {
            // Check idle timeout
            if let Some(idle_timeout) = self.config.idle_timeout {
                if now.duration_since(pooled.last_used) > idle_timeout {
                    expired_keys.push(key.clone());
                    continue;
                }
            }
            
            // Check max lifetime
            if let Some(max_lifetime) = self.config.max_connection_lifetime {
                if now.duration_since(pooled.created_at) > max_lifetime {
                    expired_keys.push(key.clone());
                }
            }
        }
        
        for key in expired_keys {
            self.remove(&key);
            trace!("Expired connection: {}", key);
        }
    }
}

Update ConnectionFactory to use new config:

impl ConnectionFactory {
    pub fn new(config: ConnectionConfig) -> Self {
        Self {
            config: Arc::new(config),
            pool: Arc::new(RwLock::new(ConnectionPool::new(
                ConnectionPoolConfig::default()
            ))),
        }
    }
    
    pub fn with_pool_config(
        config: ConnectionConfig, 
        pool_config: ConnectionPoolConfig
    ) -> Self {
        Self {
            config: Arc::new(config),
            pool: Arc::new(RwLock::new(ConnectionPool::new(pool_config))),
        }
    }
}

Implementation Details

  1. Add ConnectionPoolConfig struct in src/connection/mod.rs:

    • Configurable limits for per-host and total connections
    • Idle timeout and max lifetime settings
    • Reasonable defaults (5 per-host, 100 total, 5 min idle, 1 hour lifetime)
  2. Track per-host connection counts:

    • Extract host key from pool key (e.g., ssh://user@host:porthost:port)
    • Enforce per-host limits before adding connections
    • Decrement counts on connection removal
  3. Add connection lifecycle tracking:

    • Track creation time and last-used time
    • Implement periodic cleanup of expired connections
    • Close expired connections before returning from pool
  4. Update ConnectionFactory API:

    • Accept ConnectionPoolConfig parameter
    • Provide sensible defaults
    • Make pool limits configurable via config file
  5. Add periodic cleanup task:

    • Use tokio spawn to run cleanup every 60 seconds
    • Remove idle and expired connections
    • Log cleanup statistics

Testing Strategy

  • Unit tests for connection pool limits:

    #[test]
    fn test_pool_per_host_limit() {
        let config = ConnectionPoolConfig {
            max_connections_per_host: 2,
            ..Default::default()
        };
        let mut pool = ConnectionPool::new(config);
        
        // Add 2 connections to same host
        pool.put("ssh://host1:22".into(), conn1).unwrap();
        pool.put("ssh://host1:22".into(), conn2).unwrap();
        
        // Third connection should fail
        assert!(pool.put("ssh://host1:22".into(), conn3).is_err());
    }
    
    #[test]
    fn test_pool_total_limit() {
        let config = ConnectionPoolConfig {
            max_total_connections: 5,
            ..Default::default()
        };
        let mut pool = ConnectionPool::new(config);
        
        // Add connections to different hosts
        for i in 0..5 {
            pool.put(format!("ssh://host{}:22", i), conn).unwrap();
        }
        
        // 6th connection should fail
        assert!(pool.put("ssh://host5:22".into(), conn6).is_err());
    }
  • Integration tests with high-concurrency playbooks:

    • Run playbook targeting 100+ hosts
    • Verify pool limits are respected
    • Monitor resource usage (fd count, memory)
  • Load testing:

    • Simulate 1000+ simultaneous connection requests
    • Verify pool doesn't exhaust system resources
    • Test connection recycling works correctly

Alternatives Considered

  1. Use existing crates (deadpool, bb8): Good but would add dependency
  2. Use async-aware pooling crate: Better for async but may not fit existing architecture
  3. Keep simple limit: Just limit total connections without per-host tracking
  4. Use connection timeout only: Doesn't address idle connection accumulation

Chosen approach: Implement custom pool with per-host tracking and lifecycle management, designed specifically for SSH connection patterns.

Related Issues

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions