Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ where
chain: self.beacon_chain.clone(),
db_path: self.db_path.clone(),
freezer_db_path: self.freezer_db_path.clone(),
data_dir: Some(self.http_api_config.data_dir.clone()),
gossipsub_registry: self.libp2p_registry.take().map(std::sync::Mutex::new),
});

Expand Down
6 changes: 3 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use eth2::types::{
ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorId,
};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use health_metrics::observe::Observe;
use lighthouse_network::Enr;
use lighthouse_network::NetworkGlobals;
use lighthouse_network::PeerId;
Expand Down Expand Up @@ -2754,9 +2753,10 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("health"))
.and(warp::path::end())
.and(task_spawner_filter.clone())
.then(|task_spawner: TaskSpawner<T::EthSpec>| {
.and(data_dir_filter.clone())
.then(|task_spawner: TaskSpawner<T::EthSpec>, data_dir: PathBuf| {
task_spawner.blocking_json_task(Priority::P0, move || {
eth2::lighthouse::Health::observe()
health_metrics::observe::observe_health_with_data_dir(&data_dir)
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::custom_bad_request)
})
Expand Down
1 change: 1 addition & 0 deletions beacon_node/http_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct Context<T: BeaconChainTypes> {
pub chain: Option<Arc<BeaconChain<T>>>,
pub db_path: Option<PathBuf>,
pub freezer_db_path: Option<PathBuf>,
pub data_dir: Option<PathBuf>,
pub gossipsub_registry: Option<std::sync::Mutex<Registry>>,
}

Expand Down
5 changes: 4 additions & 1 deletion beacon_node/http_metrics/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ pub fn gather_prometheus_metrics<T: BeaconChainTypes>(

network_utils::discovery_metrics::scrape_discovery_metrics();

health_metrics::metrics::scrape_health_metrics();
match ctx.data_dir.as_ref() {
Some(data_dir) => health_metrics::metrics::scrape_health_metrics_for_data_dir(data_dir),
None => health_metrics::metrics::scrape_health_metrics(),
};

// It's important to ensure these metrics are explicitly enabled in the case that users aren't
// using glibc and this function causes panics.
Expand Down
1 change: 1 addition & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ pub fn get_config<E: EthSpec>(
client_config.monitoring_api = Some(monitoring_api::Config {
db_path: None,
freezer_db_path: None,
data_dir: Some(client_config.data_dir().clone()),
update_period_secs,
monitoring_endpoint: monitoring_endpoint.to_string(),
});
Expand Down
18 changes: 14 additions & 4 deletions common/health_metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::observe::Observe;
use crate::observe::{Observe, observe_system_health_with_data_dir};
use eth2::lighthouse::{ProcessHealth, SystemHealth};
use metrics::*;
use std::path::Path;
use std::sync::LazyLock;

pub static PROCESS_NUM_THREADS: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
Expand Down Expand Up @@ -124,7 +125,12 @@ pub static BOOT_TIME: LazyLock<Result<IntGauge>> = LazyLock::new(|| {

pub fn scrape_health_metrics() {
scrape_process_health_metrics();
scrape_system_health_metrics();
scrape_system_health_metrics(None);
}

pub fn scrape_health_metrics_for_data_dir(data_dir: &Path) {
scrape_process_health_metrics();
scrape_system_health_metrics(Some(data_dir));
}

pub fn scrape_process_health_metrics() {
Expand All @@ -139,10 +145,14 @@ pub fn scrape_process_health_metrics() {
}
}

pub fn scrape_system_health_metrics() {
pub fn scrape_system_health_metrics(data_dir: Option<&Path>) {
// This will silently fail if we are unable to observe the health. This is desired behaviour
// since we don't support `Health` for all platforms.
if let Ok(health) = SystemHealth::observe() {
let health_result = match data_dir {
Some(dir) => observe_system_health_with_data_dir(dir),
None => SystemHealth::observe(),
};
if let Ok(health) = health_result {
set_gauge(&SYSTEM_VIRT_MEM_TOTAL, health.sys_virt_mem_total as i64);
set_gauge(
&SYSTEM_VIRT_MEM_AVAILABLE,
Expand Down
135 changes: 79 additions & 56 deletions common/health_metrics/src/observe.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use eth2::lighthouse::{Health, ProcessHealth, SystemHealth};
use std::path::Path;

#[cfg(target_os = "linux")]
use {
Expand All @@ -10,6 +11,21 @@ pub trait Observe: Sized {
fn observe() -> Result<Self, String>;
}

/// Observe health metrics, reporting disk usage for the filesystem containing `data_dir`
/// instead of the root filesystem.
#[cfg(not(target_os = "linux"))]
pub fn observe_health_with_data_dir(_data_dir: &Path) -> Result<Health, String> {
Err("Health is only available on Linux".into())
}

#[cfg(target_os = "linux")]
pub fn observe_health_with_data_dir(data_dir: &Path) -> Result<Health, String> {
Ok(Health {
process: ProcessHealth::observe()?,
system: observe_system_health_with_data_dir(data_dir)?,
})
}

impl Observe for Health {
#[cfg(not(target_os = "linux"))]
fn observe() -> Result<Self, String> {
Expand All @@ -18,13 +34,71 @@ impl Observe for Health {

#[cfg(target_os = "linux")]
fn observe() -> Result<Self, String> {
Ok(Self {
process: ProcessHealth::observe()?,
system: SystemHealth::observe()?,
})
observe_health_with_data_dir(Path::new("/"))
}
}

/// Observe system health metrics, reporting disk usage for the filesystem containing
/// `data_dir` instead of the root filesystem.
#[cfg(not(target_os = "linux"))]
pub fn observe_system_health_with_data_dir(_data_dir: &Path) -> Result<SystemHealth, String> {
Err("Health is only available on Linux".into())
}

#[cfg(target_os = "linux")]
pub fn observe_system_health_with_data_dir(data_dir: &Path) -> Result<SystemHealth, String> {
let vm = psutil::memory::virtual_memory()
.map_err(|e| format!("Unable to get virtual memory: {:?}", e))?;
let loadavg = psutil::host::loadavg().map_err(|e| format!("Unable to get loadavg: {:?}", e))?;

let cpu = psutil::cpu::cpu_times().map_err(|e| format!("Unable to get cpu times: {:?}", e))?;

let disk_usage = psutil::disk::disk_usage(data_dir)
.map_err(|e| format!("Unable to get disk usage info: {:?}", e))?;

let disk = psutil::disk::DiskIoCountersCollector::default()
.disk_io_counters()
.map_err(|e| format!("Unable to get disk counters: {:?}", e))?;

let net = psutil::network::NetIoCountersCollector::default()
.net_io_counters()
.map_err(|e| format!("Unable to get network io counters: {:?}", e))?;

let boot_time = psutil::host::boot_time()
.map_err(|e| format!("Unable to get system boot time: {:?}", e))?
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| format!("Boot time is lower than unix epoch: {}", e))?
.as_secs();

Ok(SystemHealth {
sys_virt_mem_total: vm.total(),
sys_virt_mem_available: vm.available(),
sys_virt_mem_used: vm.used(),
sys_virt_mem_free: vm.free(),
sys_virt_mem_cached: vm.cached(),
sys_virt_mem_buffers: vm.buffers(),
sys_virt_mem_percent: vm.percent(),
sys_loadavg_1: loadavg.one,
sys_loadavg_5: loadavg.five,
sys_loadavg_15: loadavg.fifteen,
cpu_cores: psutil::cpu::cpu_count_physical(),
cpu_threads: psutil::cpu::cpu_count(),
system_seconds_total: cpu.system().as_secs(),
cpu_time_total: cpu.total().as_secs(),
user_seconds_total: cpu.user().as_secs(),
iowait_seconds_total: cpu.iowait().as_secs(),
idle_seconds_total: cpu.idle().as_secs(),
disk_node_bytes_total: disk_usage.total(),
disk_node_bytes_free: disk_usage.free(),
disk_node_reads_total: disk.read_count(),
disk_node_writes_total: disk.write_count(),
network_node_bytes_total_received: net.bytes_recv(),
network_node_bytes_total_transmit: net.bytes_sent(),
misc_node_boot_ts_seconds: boot_time,
misc_os: std::env::consts::OS.to_string(),
})
}

impl Observe for SystemHealth {
#[cfg(not(target_os = "linux"))]
fn observe() -> Result<Self, String> {
Expand All @@ -33,58 +107,7 @@ impl Observe for SystemHealth {

#[cfg(target_os = "linux")]
fn observe() -> Result<Self, String> {
let vm = psutil::memory::virtual_memory()
.map_err(|e| format!("Unable to get virtual memory: {:?}", e))?;
let loadavg =
psutil::host::loadavg().map_err(|e| format!("Unable to get loadavg: {:?}", e))?;

let cpu =
psutil::cpu::cpu_times().map_err(|e| format!("Unable to get cpu times: {:?}", e))?;

let disk_usage = psutil::disk::disk_usage("/")
.map_err(|e| format!("Unable to disk usage info: {:?}", e))?;

let disk = psutil::disk::DiskIoCountersCollector::default()
.disk_io_counters()
.map_err(|e| format!("Unable to get disk counters: {:?}", e))?;

let net = psutil::network::NetIoCountersCollector::default()
.net_io_counters()
.map_err(|e| format!("Unable to get network io counters: {:?}", e))?;

let boot_time = psutil::host::boot_time()
.map_err(|e| format!("Unable to get system boot time: {:?}", e))?
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| format!("Boot time is lower than unix epoch: {}", e))?
.as_secs();

Ok(Self {
sys_virt_mem_total: vm.total(),
sys_virt_mem_available: vm.available(),
sys_virt_mem_used: vm.used(),
sys_virt_mem_free: vm.free(),
sys_virt_mem_cached: vm.cached(),
sys_virt_mem_buffers: vm.buffers(),
sys_virt_mem_percent: vm.percent(),
sys_loadavg_1: loadavg.one,
sys_loadavg_5: loadavg.five,
sys_loadavg_15: loadavg.fifteen,
cpu_cores: psutil::cpu::cpu_count_physical(),
cpu_threads: psutil::cpu::cpu_count(),
system_seconds_total: cpu.system().as_secs(),
cpu_time_total: cpu.total().as_secs(),
user_seconds_total: cpu.user().as_secs(),
iowait_seconds_total: cpu.iowait().as_secs(),
idle_seconds_total: cpu.idle().as_secs(),
disk_node_bytes_total: disk_usage.total(),
disk_node_bytes_free: disk_usage.free(),
disk_node_reads_total: disk.read_count(),
disk_node_writes_total: disk.write_count(),
network_node_bytes_total_received: net.bytes_recv(),
network_node_bytes_total_transmit: net.bytes_sent(),
misc_node_boot_ts_seconds: boot_time,
misc_os: std::env::consts::OS.to_string(),
})
observe_system_health_with_data_dir(Path::new("/"))
}
}

Expand Down
14 changes: 12 additions & 2 deletions common/monitoring_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{path::PathBuf, time::Duration};

use eth2::lighthouse::SystemHealth;
use gather::{gather_beacon_metrics, gather_validator_metrics};
use health_metrics::observe::Observe;
use health_metrics::observe::{Observe, observe_system_health_with_data_dir};
use reqwest::{IntoUrl, Response};
pub use reqwest::{StatusCode, Url};
use sensitive_url::SensitiveUrl;
Expand Down Expand Up @@ -56,6 +56,8 @@ pub struct Config {
/// Path for the cold database required for fetching beacon db size metrics.
/// Note: not relevant for validator and system metrics.
pub freezer_db_path: Option<PathBuf>,
/// Data directory path used for reporting disk usage of the correct filesystem.
pub data_dir: Option<PathBuf>,
/// User-defined update period in seconds.
pub update_period_secs: Option<u64>,
}
Expand All @@ -67,6 +69,8 @@ pub struct MonitoringHttpClient {
db_path: Option<PathBuf>,
/// Path to the freezer database.
freezer_db_path: Option<PathBuf>,
/// Data directory path used for reporting disk usage of the correct filesystem.
data_dir: Option<PathBuf>,
update_period: Duration,
monitoring_endpoint: SensitiveUrl,
}
Expand All @@ -77,6 +81,7 @@ impl MonitoringHttpClient {
client: reqwest::Client::new(),
db_path: config.db_path.clone(),
freezer_db_path: config.freezer_db_path.clone(),
data_dir: config.data_dir.clone(),
update_period: Duration::from_secs(
config.update_period_secs.unwrap_or(DEFAULT_UPDATE_DURATION),
),
Expand Down Expand Up @@ -158,8 +163,13 @@ impl MonitoringHttpClient {
}

/// Gets system metrics by observing capturing the SystemHealth metrics.
/// Reports disk usage for the datadir filesystem when available.
pub fn get_system_metrics(&self) -> Result<MonitoringMetrics, Error> {
let system_health = SystemHealth::observe().map_err(Error::SystemMetricsFailed)?;
let system_health = match &self.data_dir {
Some(dir) => observe_system_health_with_data_dir(dir),
None => SystemHealth::observe(),
}
.map_err(Error::SystemMetricsFailed)?;
Ok(MonitoringMetrics {
metadata: Metadata::new(ProcessType::System),
process_metrics: Process::System(system_health.into()),
Expand Down
14 changes: 12 additions & 2 deletions validator_client/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
})
});

// Optional data_dir filter for health endpoints that should not reject when validator_dir
// is unavailable. Falls back to root filesystem disk reporting.
let inner_data_dir = ctx.validator_dir.clone();
let data_dir_filter = warp::any().map(move || inner_data_dir.clone());

let inner_secrets_dir = ctx.secrets_dir.clone();
let secrets_dir_filter = warp::any().map(move || inner_secrets_dir.clone()).and_then(
|secrets_dir: Option<_>| async move {
Expand Down Expand Up @@ -301,9 +306,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_lighthouse_health = warp::path("lighthouse")
.and(warp::path("health"))
.and(warp::path::end())
.then(|| {
.and(data_dir_filter.clone())
.then(|data_dir: Option<PathBuf>| {
blocking_json_task(move || {
eth2::lighthouse::Health::observe()
let health = match data_dir {
Some(ref dir) => health_metrics::observe::observe_health_with_data_dir(dir),
None => eth2::lighthouse::Health::observe(),
};
health
.map(api_types::GenericResponse::from)
.map_err(warp_utils::reject::custom_bad_request)
})
Expand Down
7 changes: 6 additions & 1 deletion validator_client/http_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::info;
Expand Down Expand Up @@ -51,6 +52,7 @@ pub struct Shared<E> {
pub struct Context<E> {
pub config: Config,
pub shared: RwLock<Shared<E>>,
pub data_dir: Option<PathBuf>,
}

/// Configuration for the HTTP server.
Expand Down Expand Up @@ -206,7 +208,10 @@ pub fn gather_prometheus_metrics<E: EthSpec>(
scrape_allocator_metrics();
}

health_metrics::metrics::scrape_health_metrics();
match ctx.data_dir.as_ref() {
Some(data_dir) => health_metrics::metrics::scrape_health_metrics_for_data_dir(data_dir),
None => health_metrics::metrics::scrape_health_metrics(),
};

encoder
.encode(&metrics::gather(), &mut buffer)
Expand Down
1 change: 1 addition & 0 deletions validator_client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ impl Config {
config.monitoring_api = Some(monitoring_api::Config {
db_path: None,
freezer_db_path: None,
data_dir: Some(config.validator_dir.clone()),
update_period_secs,
monitoring_endpoint: monitoring_endpoint.to_string(),
});
Expand Down
1 change: 1 addition & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
Arc::new(validator_http_metrics::Context {
config: config.http_metrics.clone(),
shared: RwLock::new(shared),
data_dir: Some(config.validator_dir.clone()),
});

let exit = context.executor.exit();
Expand Down