diff --git a/go.mod b/go.mod index 8eff473c..8f7aa56c 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.7 require ( github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0 + github.com/dustin/go-humanize v1.0.1 github.com/ethereum/go-ethereum v1.17.3 github.com/ethpandaops/ethwallclock v0.4.0 github.com/ethpandaops/go-eth2-client v0.1.2 @@ -23,6 +24,8 @@ require ( github.com/mashingan/smapping v0.1.19 github.com/pressly/goose/v3 v3.27.1 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 + github.com/prometheus/common v0.66.1 github.com/protolambda/zrnt v0.34.1 github.com/protolambda/ztyp v0.2.2 github.com/prysmaticlabs/go-bitfield v0.0.0-20240618144021-706c95b2dd15 @@ -38,6 +41,7 @@ require ( github.com/wealdtech/go-eth2-types/v2 v2.8.2 github.com/wealdtech/go-eth2-util v1.8.2 golang.org/x/text v0.37.0 + google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -58,7 +62,6 @@ require ( github.com/crate-crypto/go-eth-kzg v1.5.0 // indirect github.com/deckarep/golang-set/v2 v2.6.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/dot v1.6.4 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.6 // indirect github.com/ferranbt/fastssz v0.1.4 // indirect @@ -90,8 +93,6 @@ require ( github.com/pk910/dynamic-ssz v1.3.2-0.20260505131440-111bcb265c8f // indirect github.com/pk910/hashtree-bindings v0.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_model v0.6.2 // indirect - github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.20.1 // indirect github.com/protolambda/bls12-381-util v0.1.0 // indirect github.com/r3labs/sse/v2 v2.10.0 // indirect @@ -118,7 +119,6 @@ require ( golang.org/x/sys v0.43.0 // indirect golang.org/x/time v0.15.0 // indirect golang.org/x/tools v0.44.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect modernc.org/libc v1.72.1 // indirect modernc.org/mathutil v1.7.1 // indirect diff --git a/pkg/tasks/check_http_metrics/README.md b/pkg/tasks/check_http_metrics/README.md new file mode 100644 index 00000000..54526471 --- /dev/null +++ b/pkg/tasks/check_http_metrics/README.md @@ -0,0 +1,131 @@ +## `check_http_metrics` Task + +### Description +The `check_http_metrics` task fetches metrics from an HTTP Prometheus endpoint and evaluates assertions against metric values. + +#### Task Behavior +- The task polls the metrics endpoint at regular intervals. +- By default, the task returns immediately when all assertions pass. +- Use `continueOnPass: true` to keep monitoring even after success. +- Use `failOnCheckMiss: true` to fail immediately when assertions are not met. + +### Configuration Parameters + +- **`url`**:\ + HTTP URL of the Prometheus metrics endpoint. Required. + +- **`headers`**:\ + Optional HTTP request headers (e.g., for authentication). Default: `{}`. + +- **`pollInterval`**:\ + Interval between metric scrapes. Default: `10s`. + +- **`requestTimeout`**:\ + Timeout for a single HTTP request. Default: `5s`. + +- **`maxResponseSize`**:\ + Maximum response body size. Must be positive. Default: `10MB`. + +- **`failOnCheckMiss`**:\ + If `true`, fail immediately when assertions are not met. If `false`, keep polling until timeout or success. Default: `false`. + +- **`continueOnPass`**:\ + If `true`, continue checking after all assertions pass. Default: `false`. + +- **`missingMetric`**:\ + Behavior when a metric family is missing: `wait`, `fail`, or `pass`. Default: `wait`. + +- **`missingSeries`**:\ + Behavior when no time series matches the label selector: `wait`, `fail`, or `pass`. Default: `wait`. + +- **`resetBehavior`**:\ + Behavior when a COUNTER metric's value drops below baseline (indicating restart): `fail`, `rebaseline`, or `ignore`. Only applies to COUNTER type metrics. Default: `fail`. + +- **`assertions`**:\ + List of metric assertions. At least one required. + +#### Assertion Configuration + +- **`name`**: Unique assertion name. Required. +- **`metric`**: Prometheus metric name. Required. +- **`labels`**: Label selector (subset matching). Must match exactly one series. +- **`mode`**: `value` (current value) or `delta` (change since baseline). Default: `value`. +- **`operator`**: Comparison operator: `eq`, `neq`, `gt`, `gte`, `lt`, `lte`. Required. +- **`value`**: Expected numeric value. Required. +- **`missingMetric`**: Per-assertion override for global `missingMetric`. +- **`missingSeries`**: Per-assertion override for global `missingSeries`. + +#### Delta Mode + +In `delta` mode, the task tracks changes over time: +1. First scrape: records the current value as baseline (waits, does not evaluate) +2. Subsequent scrapes: computes `delta = current - baseline` and evaluates + +Negative deltas are valid for GAUGE and UNTYPED metrics. For COUNTER metrics, a decrease triggers `resetBehavior`. + +#### Examples + +```yaml +assertions: + # Check counter increased by at least 1 + - name: counter_increased + metric: my_counter + labels: + env: prod + mode: delta + operator: gte + value: 1 + + # Check gauge decreased (negative delta) + - name: gauge_dropped + metric: my_gauge + mode: delta + operator: lte + value: -1 + + # Check current value is above threshold + - name: value_above_threshold + metric: my_metric + operator: gt + value: 100 +``` + +#### Metric Type Handling + +| Type | Value Extracted | +|------|-----------------| +| COUNTER | Counter value | +| GAUGE | Gauge value | +| UNTYPED | Untyped value | +| SUMMARY | Sample sum | +| HISTOGRAM | Sample sum | + +Counter reset detection only applies to COUNTER type. SUMMARY and HISTOGRAM use sample sum; bucket/quantile helpers are not supported. + +### Outputs + +- **`passedAssertions`**: Array of assertion names that passed. +- **`failedAssertions`**: Array of assertion names that failed. +- **`values`**: Map of assertion name to latest observed value. +- **`deltas`**: Map of assertion name to computed delta (for `delta` mode). +- **`baselines`**: Map of assertion name to baseline value (for `delta` mode). +- **`scrapeErrors`**: Number of HTTP/parsing errors. +- **`assertionErrors`**: Number of assertion evaluation errors. + +### Defaults + +```yaml +- name: check_http_metrics + config: + url: "" + headers: {} + pollInterval: 10s + requestTimeout: 5s + maxResponseSize: 10MB + failOnCheckMiss: false + continueOnPass: false + missingMetric: wait + missingSeries: wait + resetBehavior: fail + assertions: [] +``` diff --git a/pkg/tasks/check_http_metrics/config.go b/pkg/tasks/check_http_metrics/config.go new file mode 100644 index 00000000..af67d4e8 --- /dev/null +++ b/pkg/tasks/check_http_metrics/config.go @@ -0,0 +1,233 @@ +package checkhttpmetrics + +import ( + "fmt" + "math" + "time" + + "github.com/dustin/go-humanize" + "github.com/ethpandaops/assertoor/pkg/helper" +) + +// MissingBehavior controls what happens when a metric or series cannot be found. +// "wait" keeps polling, "fail" marks the assertion as failed, "pass" treats it as passed. +type MissingBehavior string + +const ( + MissingBehaviorWait MissingBehavior = "wait" + MissingBehaviorFail MissingBehavior = "fail" + MissingBehaviorPass MissingBehavior = "pass" +) + +// DefaultMaxResponseSize is the default maximum response body size for metrics scraping. +const DefaultMaxResponseSize = "10MB" + +// ResetBehavior controls what happens when a counter value drops below its baseline, +// which typically indicates a service restart or counter reset. +type ResetBehavior string + +const ( + ResetBehaviorFail ResetBehavior = "fail" + ResetBehaviorRebaseline ResetBehavior = "rebaseline" + ResetBehaviorIgnore ResetBehavior = "ignore" +) + +// AssertionMode determines whether to compare the raw metric value or the change since baseline. +// Delta mode requires at least two scrapes: one to record baseline, one to evaluate. +type AssertionMode string + +const ( + AssertionModeValue AssertionMode = "value" + AssertionModeDelta AssertionMode = "delta" +) + +// Operator specifies the comparison operation between actual and expected values. +type Operator string + +const ( + OperatorEq Operator = "eq" + OperatorNeq Operator = "neq" + OperatorGt Operator = "gt" + OperatorGte Operator = "gte" + OperatorLt Operator = "lt" + OperatorLte Operator = "lte" +) + +// AssertionConfig defines a single metric assertion to evaluate. +// Labels are a subset selector that must match exactly one time series; +// matching zero or multiple series is an error. +type AssertionConfig struct { + Name string `yaml:"name" json:"name" desc:"Unique human-readable assertion name."` + Metric string `yaml:"metric" json:"metric" desc:"Prometheus metric name."` + Labels map[string]string `yaml:"labels" json:"labels" desc:"Label selector; all specified labels must match and select exactly one time series."` + Mode AssertionMode `yaml:"mode" json:"mode" desc:"Evaluation mode: 'value' (current value) or 'delta' (change since baseline)."` + Operator Operator `yaml:"operator" json:"operator" desc:"Comparison operator: eq, neq, gt, gte, lt, lte."` + Value float64 `yaml:"value" json:"value" desc:"Expected value for comparison."` + MissingMetric *MissingBehavior `yaml:"missingMetric" json:"missingMetric,omitempty" desc:"Override global missingMetric behavior for this assertion."` + MissingSeries *MissingBehavior `yaml:"missingSeries" json:"missingSeries,omitempty" desc:"Override global missingSeries behavior for this assertion."` +} + +// Config holds the task configuration for scraping a Prometheus metrics endpoint +// and evaluating assertions against the scraped values. +type Config struct { + URL string `yaml:"url" json:"url" require:"A" desc:"HTTP URL of the Prometheus metrics endpoint."` + Headers map[string]string `yaml:"headers" json:"headers" desc:"Optional HTTP request headers."` + PollInterval helper.Duration `yaml:"pollInterval" json:"pollInterval" desc:"Interval between metric scrapes."` + RequestTimeout helper.Duration `yaml:"requestTimeout" json:"requestTimeout" desc:"Timeout for a single HTTP request."` + MaxResponseSize string `yaml:"maxResponseSize" json:"maxResponseSize" desc:"Maximum response body size (e.g., '10MB')."` + FailOnCheckMiss bool `yaml:"failOnCheckMiss" json:"failOnCheckMiss" desc:"If true, fail immediately when assertions are not met."` + ContinueOnPass bool `yaml:"continueOnPass" json:"continueOnPass" desc:"If true, continue checking after all assertions pass."` + MissingMetric MissingBehavior `yaml:"missingMetric" json:"missingMetric" desc:"Behavior when metric doesn't exist: wait, fail, pass."` + MissingSeries MissingBehavior `yaml:"missingSeries" json:"missingSeries" desc:"Behavior when no series matches labels: wait, fail, pass."` + ResetBehavior ResetBehavior `yaml:"resetBehavior" json:"resetBehavior" desc:"Behavior on counter reset in delta mode: fail, rebaseline, ignore."` + Assertions []AssertionConfig `yaml:"assertions" json:"assertions" require:"B" desc:"List of metric assertions to evaluate."` + + // Parsed values (not from YAML) + maxResponseSizeBytes int64 +} + +func DefaultConfig() Config { + return Config{ + PollInterval: helper.Duration{Duration: 10 * time.Second}, + RequestTimeout: helper.Duration{Duration: 5 * time.Second}, + MaxResponseSize: DefaultMaxResponseSize, + MissingMetric: MissingBehaviorWait, + MissingSeries: MissingBehaviorWait, + ResetBehavior: ResetBehaviorFail, + } +} + +func (c *Config) Validate() error { + if c.URL == "" { + return fmt.Errorf("url is required") + } + + if len(c.Assertions) == 0 { + return fmt.Errorf("at least one assertion is required") + } + + // Validate intervals to prevent tight-loops or disabled timeouts + if c.PollInterval.Duration <= 0 { + return fmt.Errorf("pollInterval must be positive") + } + + if c.RequestTimeout.Duration <= 0 { + return fmt.Errorf("requestTimeout must be positive") + } + + // Set default and parse max response size + if c.MaxResponseSize == "" { + c.MaxResponseSize = DefaultMaxResponseSize + } + + size, err := humanize.ParseBytes(c.MaxResponseSize) + if err != nil { + return fmt.Errorf("invalid maxResponseSize %q: %w", c.MaxResponseSize, err) + } + + if size == 0 { + return fmt.Errorf("maxResponseSize must be positive") + } + + if size > math.MaxInt64 { + return fmt.Errorf("maxResponseSize %q exceeds maximum allowed value", c.MaxResponseSize) + } + + c.maxResponseSizeBytes = int64(size) + + // Validate assertions and check for duplicate names + seenNames := make(map[string]bool, len(c.Assertions)) + + for i, a := range c.Assertions { + if a.Name == "" { + return fmt.Errorf("assertion[%d]: name is required", i) + } + + if seenNames[a.Name] { + return fmt.Errorf("assertion[%d]: duplicate name %q", i, a.Name) + } + + seenNames[a.Name] = true + + if a.Metric == "" { + return fmt.Errorf("assertion[%d] %q: metric is required", i, a.Name) + } + + if err := validateMode(a.Mode); err != nil { + return fmt.Errorf("assertion[%d] %q: %w", i, a.Name, err) + } + + if err := validateOperator(a.Operator); err != nil { + return fmt.Errorf("assertion[%d] %q: %w", i, a.Name, err) + } + + if a.MissingMetric != nil { + if err := validateMissingBehavior(*a.MissingMetric); err != nil { + return fmt.Errorf("assertion[%d] %q: missingMetric: %w", i, a.Name, err) + } + } + + if a.MissingSeries != nil { + if err := validateMissingBehavior(*a.MissingSeries); err != nil { + return fmt.Errorf("assertion[%d] %q: missingSeries: %w", i, a.Name, err) + } + } + } + + // Validate global enums + if err := validateMissingBehavior(c.MissingMetric); err != nil { + return fmt.Errorf("missingMetric: %w", err) + } + + if err := validateMissingBehavior(c.MissingSeries); err != nil { + return fmt.Errorf("missingSeries: %w", err) + } + + if err := validateResetBehavior(c.ResetBehavior); err != nil { + return fmt.Errorf("resetBehavior: %w", err) + } + + return nil +} + +func (c *Config) GetMaxResponseSizeBytes() int64 { + return c.maxResponseSizeBytes +} + +func validateMissingBehavior(b MissingBehavior) error { + switch b { + case MissingBehaviorWait, MissingBehaviorFail, MissingBehaviorPass, "": + return nil + default: + return fmt.Errorf("invalid value %q, must be one of: wait, fail, pass", b) + } +} + +func validateResetBehavior(b ResetBehavior) error { + switch b { + case ResetBehaviorFail, ResetBehaviorRebaseline, ResetBehaviorIgnore, "": + return nil + default: + return fmt.Errorf("invalid value %q, must be one of: fail, rebaseline, ignore", b) + } +} + +func validateMode(m AssertionMode) error { + switch m { + case AssertionModeValue, AssertionModeDelta, "": + return nil + default: + return fmt.Errorf("invalid mode %q, must be one of: value, delta", m) + } +} + +func validateOperator(o Operator) error { + switch o { + case OperatorEq, OperatorNeq, OperatorGt, OperatorGte, OperatorLt, OperatorLte: + return nil + case "": + return fmt.Errorf("operator is required") + default: + return fmt.Errorf("invalid operator %q, must be one of: eq, neq, gt, gte, lt, lte", o) + } +} diff --git a/pkg/tasks/check_http_metrics/task.go b/pkg/tasks/check_http_metrics/task.go new file mode 100644 index 00000000..2ddedbc8 --- /dev/null +++ b/pkg/tasks/check_http_metrics/task.go @@ -0,0 +1,599 @@ +package checkhttpmetrics + +import ( + "context" + "errors" + "fmt" + "io" + "math" + "net/http" + "strings" + "time" + + "github.com/ethpandaops/assertoor/pkg/types" + "github.com/ethpandaops/assertoor/pkg/vars" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + "github.com/sirupsen/logrus" +) + +const outputTypeObject = "object" + +// Compile-time interface compliance check. +var _ types.Task = (*Task)(nil) + +var ( + TaskName = "check_http_metrics" + TaskDescriptor = &types.TaskDescriptor{ + Name: TaskName, + Description: "Checks HTTP Prometheus metrics endpoint and evaluates assertions against metric values.", + Category: "utility", + Config: DefaultConfig(), + Outputs: []types.TaskOutputDefinition{ + { + Name: "passedAssertions", + Type: "array", + Description: "Array of assertion names that passed.", + }, + { + Name: "failedAssertions", + Type: "array", + Description: "Array of assertion names that failed.", + }, + { + Name: "values", + Type: outputTypeObject, + Description: "Map of assertion name to latest observed value.", + }, + { + Name: "deltas", + Type: outputTypeObject, + Description: "Map of assertion name to computed delta for delta-mode assertions.", + }, + { + Name: "baselines", + Type: outputTypeObject, + Description: "Map of assertion name to baseline value for delta-mode assertions.", + }, + { + Name: "scrapeErrors", + Type: "int", + Description: "Number of HTTP/parsing errors encountered.", + }, + { + Name: "assertionErrors", + Type: "int", + Description: "Number of assertion evaluation errors.", + }, + }, + NewTask: NewTask, + } +) + +type Task struct { + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger + httpClient *http.Client + + // State + baselines map[string]float64 + scrapeCount int + scrapeErrors int + assertionErrors int +} + +func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { + return &Task{ + ctx: ctx, + options: options, + logger: ctx.Logger.GetLogger(), + baselines: make(map[string]float64, 8), + }, nil +} + +func (t *Task) Config() interface{} { + return t.config +} + +func (t *Task) Timeout() time.Duration { + return t.options.Timeout.Duration +} + +func (t *Task) LoadConfig() error { + config := DefaultConfig() + + // parse static config + if t.options.Config != nil { + if err := t.options.Config.Unmarshal(&config); err != nil { + return fmt.Errorf("error parsing task config for %v: %w", TaskName, err) + } + } + + // load dynamic vars + if err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars); err != nil { + return err + } + + // validate config + if err := config.Validate(); err != nil { + return err + } + + t.config = config + + t.httpClient = &http.Client{ + Timeout: config.RequestTimeout.Duration, + } + + return nil +} + +func (t *Task) Execute(ctx context.Context) error { + for { + t.scrapeCount++ + + done, err := t.runCheck(ctx) + if done { + return err + } + + select { + case <-time.After(t.config.PollInterval.Duration): + case <-ctx.Done(): + return ctx.Err() + } + } +} + +type assertionResult struct { + name string + passed bool + value float64 + delta float64 + err error + waiting bool // true if we should keep waiting (missing metric/series with wait behavior) +} + +func (t *Task) runCheck(ctx context.Context) (bool, error) { + // Scrape metrics + metricFamilies, err := t.scrapeMetrics(ctx) + if err != nil { + t.scrapeErrors++ + t.logger.Warnf("Scrape error (attempt %d): %v", t.scrapeCount, err) + + if t.config.FailOnCheckMiss { + t.ctx.SetResult(types.TaskResultFailure) + t.setOutputs(nil) + + return true, fmt.Errorf("scrape failed: %w", err) + } + + t.ctx.SetResult(types.TaskResultNone) + t.ctx.ReportProgress(0, fmt.Sprintf("Scrape failed, retrying... (attempt %d)", t.scrapeCount)) + t.setOutputs(nil) // Update outputs so scrapeErrors is visible + + return false, nil + } + + // Evaluate assertions + results := t.evaluateAssertions(metricFamilies) + + // Collect results + var passedAssertions, failedAssertions []string + + var anyWaiting, anyFailed bool + + values := make(map[string]float64, len(results)) + deltas := make(map[string]float64, len(results)) + + for _, r := range results { + if !math.IsNaN(r.value) { + values[r.name] = r.value + } + + if !math.IsNaN(r.delta) { + deltas[r.name] = r.delta + } + + if r.waiting { + anyWaiting = true + + continue + } + + if r.err != nil { + t.assertionErrors++ + + failedAssertions = append(failedAssertions, r.name) + anyFailed = true + + t.logger.Warnf("Assertion %q error: %v", r.name, r.err) + + continue + } + + if r.passed { + passedAssertions = append(passedAssertions, r.name) + } else { + failedAssertions = append(failedAssertions, r.name) + anyFailed = true + } + } + + t.setOutputs(&checkOutputs{ + passedAssertions: passedAssertions, + failedAssertions: failedAssertions, + values: values, + deltas: deltas, + }) + + // Determine result + allPassed := !anyFailed && !anyWaiting + + switch { + case allPassed: + t.ctx.SetResult(types.TaskResultSuccess) + t.ctx.ReportProgress(100, fmt.Sprintf("All assertions passed (%d/%d)", len(passedAssertions), len(t.config.Assertions))) + + if !t.config.ContinueOnPass { + return true, nil + } + + case anyFailed && t.config.FailOnCheckMiss: + t.ctx.SetResult(types.TaskResultFailure) + + return true, fmt.Errorf("assertions failed: %v", failedAssertions) + + default: + t.ctx.SetResult(types.TaskResultNone) + t.ctx.ReportProgress(0, fmt.Sprintf("Waiting... passed=%d failed=%d waiting=%d (attempt %d)", + len(passedAssertions), len(failedAssertions), countWaiting(results), t.scrapeCount)) + } + + return false, nil +} + +type checkOutputs struct { + passedAssertions []string + failedAssertions []string + values map[string]float64 + deltas map[string]float64 +} + +func (t *Task) setOutputs(out *checkOutputs) { + if out == nil { + t.ctx.Outputs.SetVar("passedAssertions", []string{}) + t.ctx.Outputs.SetVar("failedAssertions", []string{}) + t.ctx.Outputs.SetVar("values", map[string]float64{}) + t.ctx.Outputs.SetVar("deltas", map[string]float64{}) + } else { + if data, err := vars.GeneralizeData(out.passedAssertions); err != nil { + t.logger.Warnf("failed to generalize passedAssertions: %v", err) + } else { + t.ctx.Outputs.SetVar("passedAssertions", data) + } + + if data, err := vars.GeneralizeData(out.failedAssertions); err != nil { + t.logger.Warnf("failed to generalize failedAssertions: %v", err) + } else { + t.ctx.Outputs.SetVar("failedAssertions", data) + } + + if data, err := vars.GeneralizeData(out.values); err != nil { + t.logger.Warnf("failed to generalize values: %v", err) + } else { + t.ctx.Outputs.SetVar("values", data) + } + + if data, err := vars.GeneralizeData(out.deltas); err != nil { + t.logger.Warnf("failed to generalize deltas: %v", err) + } else { + t.ctx.Outputs.SetVar("deltas", data) + } + } + + // Baselines + if data, err := vars.GeneralizeData(t.baselines); err != nil { + t.logger.Warnf("failed to generalize baselines: %v", err) + } else { + t.ctx.Outputs.SetVar("baselines", data) + } + + t.ctx.Outputs.SetVar("scrapeErrors", t.scrapeErrors) + t.ctx.Outputs.SetVar("assertionErrors", t.assertionErrors) +} + +func countWaiting(results []assertionResult) int { + count := 0 + + for _, r := range results { + if r.waiting { + count++ + } + } + + return count +} + +func (t *Task) scrapeMetrics(ctx context.Context) (map[string]*dto.MetricFamily, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, t.config.URL, http.NoBody) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + for k, v := range t.config.Headers { + req.Header.Set(k, v) + } + + resp, err := t.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("HTTP request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + + // Limit response size + maxSize := t.config.GetMaxResponseSizeBytes() + limitedReader := io.LimitReader(resp.Body, maxSize+1) + + body, err := io.ReadAll(limitedReader) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if int64(len(body)) > maxSize { + return nil, fmt.Errorf("response body exceeds max size of %d bytes", maxSize) + } + + // Parse Prometheus text format using UTF8 validation scheme + parser := expfmt.NewTextParser(model.UTF8Validation) + + metricFamilies, err := parser.TextToMetricFamilies(strings.NewReader(string(body))) + if err != nil { + return nil, fmt.Errorf("failed to parse metrics: %w", err) + } + + return metricFamilies, nil +} + +func (t *Task) evaluateAssertions(metricFamilies map[string]*dto.MetricFamily) []assertionResult { + results := make([]assertionResult, 0, len(t.config.Assertions)) + + for i := range t.config.Assertions { + result := t.evaluateAssertion(&t.config.Assertions[i], metricFamilies) + results = append(results, result) + } + + return results +} + +func (t *Task) evaluateAssertion(a *AssertionConfig, metricFamilies map[string]*dto.MetricFamily) assertionResult { + result := assertionResult{ + name: a.Name, + value: math.NaN(), + delta: math.NaN(), + } + + // Get effective missing behaviors + missingMetricBehavior := t.config.MissingMetric + if a.MissingMetric != nil { + missingMetricBehavior = *a.MissingMetric + } + + missingSeriesBehavior := t.config.MissingSeries + if a.MissingSeries != nil { + missingSeriesBehavior = *a.MissingSeries + } + + // Find the metric family + mf, ok := metricFamilies[a.Metric] + if !ok { + return t.handleMissing(result, missingMetricBehavior, fmt.Sprintf("metric %q not found", a.Metric)) + } + + // Find matching series + value, err := t.findMetricValue(mf, a.Labels) + if err != nil { + if strings.Contains(err.Error(), "no matching series") { + return t.handleMissing(result, missingSeriesBehavior, err.Error()) + } + + result.err = err + + return result + } + + result.value = value + + // Check for non-finite values + if math.IsNaN(value) || math.IsInf(value, 0) { + result.err = fmt.Errorf("metric value is non-finite: %v", value) + + return result + } + + // Handle delta mode + mode := a.Mode + if mode == "" { + mode = AssertionModeValue + } + + var compareValue float64 + + if mode == AssertionModeDelta { + baseline, hasBaseline := t.baselines[a.Name] + + if !hasBaseline { + // First scrape: record baseline, don't evaluate + t.baselines[a.Name] = value + result.waiting = true + + t.logger.Debugf("Assertion %q: recorded baseline %.4f", a.Name, value) + + return result + } + + // Check for counter reset (only applies to COUNTER type, not gauges) + // Gauges can legitimately decrease, so negative delta is normal for them + isCounter := mf.GetType() == dto.MetricType_COUNTER + if isCounter && value < baseline { + switch t.config.ResetBehavior { + case ResetBehaviorFail: + result.err = fmt.Errorf("counter reset detected: current %.4f < baseline %.4f", value, baseline) + + return result + case ResetBehaviorRebaseline: + t.baselines[a.Name] = value + t.logger.Warnf("Assertion %q: counter reset detected, rebaselining to %.4f", a.Name, value) + + result.waiting = true + + return result + case ResetBehaviorIgnore: + // Use old baseline, may produce negative delta + t.logger.Warnf("Assertion %q: counter reset detected, ignoring", a.Name) + } + } + + compareValue = value - baseline + result.delta = compareValue + } else { + compareValue = value + } + + // Evaluate operator + result.passed = evaluateOperator(a.Operator, compareValue, a.Value) + + if !result.passed { + t.logger.Debugf("Assertion %q failed: %.4f %s %.4f = false", a.Name, compareValue, a.Operator, a.Value) + } + + return result +} + +func (t *Task) handleMissing(result assertionResult, behavior MissingBehavior, message string) assertionResult { + switch behavior { + case MissingBehaviorFail: + result.err = errors.New(message) + case MissingBehaviorPass: + result.passed = true + case MissingBehaviorWait: + result.waiting = true + default: + result.waiting = true + } + + return result +} + +func (t *Task) findMetricValue(mf *dto.MetricFamily, labels map[string]string) (float64, error) { + var matchingMetrics []*dto.Metric + + for _, m := range mf.GetMetric() { + if matchLabels(m.GetLabel(), labels) { + matchingMetrics = append(matchingMetrics, m) + } + } + + if len(matchingMetrics) == 0 { + return 0, fmt.Errorf("no matching series for labels %v", labels) + } + + if len(matchingMetrics) > 1 { + return 0, fmt.Errorf("labels match %d series (must match exactly one)", len(matchingMetrics)) + } + + value, err := getMetricValue(matchingMetrics[0], mf.GetType()) + if err != nil { + return 0, fmt.Errorf("failed to extract metric value: %w", err) + } + + return value, nil +} + +func matchLabels(metricLabels []*dto.LabelPair, wantLabels map[string]string) bool { + if len(wantLabels) == 0 { + return true + } + + labelMap := make(map[string]string, len(metricLabels)) + + for _, lp := range metricLabels { + labelMap[lp.GetName()] = lp.GetValue() + } + + for k, v := range wantLabels { + if labelMap[k] != v { + return false + } + } + + return true +} + +var errNoMetricValue = errors.New("metric has no extractable value") + +func getMetricValue(m *dto.Metric, mType dto.MetricType) (float64, error) { + switch mType { + case dto.MetricType_COUNTER: + if c := m.GetCounter(); c != nil { + return c.GetValue(), nil + } + case dto.MetricType_GAUGE: + if g := m.GetGauge(); g != nil { + return g.GetValue(), nil + } + case dto.MetricType_UNTYPED: + if u := m.GetUntyped(); u != nil { + return u.GetValue(), nil + } + case dto.MetricType_SUMMARY: + if s := m.GetSummary(); s != nil { + return s.GetSampleSum(), nil + } + case dto.MetricType_HISTOGRAM, dto.MetricType_GAUGE_HISTOGRAM: + if h := m.GetHistogram(); h != nil { + return h.GetSampleSum(), nil + } + } + + // Fallback: try all types + if c := m.GetCounter(); c != nil { + return c.GetValue(), nil + } + + if g := m.GetGauge(); g != nil { + return g.GetValue(), nil + } + + if u := m.GetUntyped(); u != nil { + return u.GetValue(), nil + } + + return 0, errNoMetricValue +} + +func evaluateOperator(op Operator, actual, expected float64) bool { + switch op { + case OperatorEq: + return actual == expected + case OperatorNeq: + return actual != expected + case OperatorGt: + return actual > expected + case OperatorGte: + return actual >= expected + case OperatorLt: + return actual < expected + case OperatorLte: + return actual <= expected + default: + return false + } +} diff --git a/pkg/tasks/check_http_metrics/task_test.go b/pkg/tasks/check_http_metrics/task_test.go new file mode 100644 index 00000000..d52e122f --- /dev/null +++ b/pkg/tasks/check_http_metrics/task_test.go @@ -0,0 +1,1931 @@ +package checkhttpmetrics + +import ( + "context" + "fmt" + "math" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/ethpandaops/assertoor/pkg/helper" + "github.com/ethpandaops/assertoor/pkg/types" + "github.com/ethpandaops/assertoor/pkg/vars" + dto "github.com/prometheus/client_model/go" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" +) + +const ( + testMetricsURL = "http://localhost:9090/metrics" + testMetric1 = "metric1" + testLabelFoo = "foo" + testLabelBar = "bar" + testLabelEnv = "env" + testLabelRegion = "region" + testLabelHost = "host" + testValueProd = "prod" + testAssertionName = "test" + testAssertionName2 = "test_assertion" + testInvalidOpLabel = "invalid operator" + testResetName = "reset_test" + testCounterMetric = "test_counter" + testCheckCounterAssert = "check_counter" +) + +// validBaseConfig returns a config with required fields set for validation to pass intervals check. +func validBaseConfig() Config { + return Config{ + PollInterval: helper.Duration{Duration: 10 * time.Second}, + RequestTimeout: helper.Duration{Duration: 5 * time.Second}, + } +} + +func TestConfig_Validate(t *testing.T) { + tests := []struct { + name string + configFunc func() Config + wantErr string + }{ + { + name: "missing url", + configFunc: func() Config { return Config{} }, + wantErr: "url is required", + }, + { + name: "missing assertions", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + + return c + }, + wantErr: "at least one assertion is required", + }, + { + name: "missing assertion name", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.Assertions = []AssertionConfig{ + {Metric: "test_metric", Operator: OperatorGt, Value: 0}, + } + + return c + }, + wantErr: "assertion[0]: name is required", + }, + { + name: "duplicate assertion names", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.Assertions = []AssertionConfig{ + {Name: testAssertionName, Metric: testMetric1, Operator: OperatorGt, Value: 0}, + {Name: testAssertionName, Metric: "metric2", Operator: OperatorGt, Value: 0}, + } + + return c + }, + wantErr: "assertion[1]: duplicate name", + }, + { + name: testInvalidOpLabel, + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.Assertions = []AssertionConfig{ + {Name: testAssertionName, Metric: testMetric1, Operator: "invalid", Value: 0}, + } + + return c + }, + wantErr: testInvalidOpLabel, + }, + { + name: "missing operator", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.Assertions = []AssertionConfig{ + {Name: testAssertionName, Metric: testMetric1, Value: 0}, + } + + return c + }, + wantErr: "operator is required", + }, + { + name: "invalid max response size", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.MaxResponseSize = "invalid" + c.Assertions = []AssertionConfig{ + {Name: testAssertionName, Metric: testMetric1, Operator: OperatorGt, Value: 0}, + } + + return c + }, + wantErr: "invalid maxResponseSize", + }, + { + name: "zero max response size", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.MaxResponseSize = "0B" + c.Assertions = []AssertionConfig{ + {Name: testAssertionName, Metric: testMetric1, Operator: OperatorGt, Value: 0}, + } + + return c + }, + wantErr: "maxResponseSize must be positive", + }, + { + name: "valid config", + configFunc: func() Config { + c := validBaseConfig() + c.URL = testMetricsURL + c.MaxResponseSize = "10MB" + c.MissingMetric = MissingBehaviorWait + c.MissingSeries = MissingBehaviorFail + c.ResetBehavior = ResetBehaviorFail + c.Assertions = []AssertionConfig{ + {Name: testAssertionName, Metric: testMetric1, Operator: OperatorGt, Value: 0}, + } + + return c + }, + wantErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := tt.configFunc() + err := cfg.Validate() + + switch { + case tt.wantErr == "" && err != nil: + t.Fatalf("unexpected error: %v", err) + case tt.wantErr != "" && err == nil: + t.Fatalf("expected error containing %q, got nil", tt.wantErr) + case tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr): + t.Fatalf("error %q does not contain %q", err.Error(), tt.wantErr) + } + }) + } +} + +func TestMatchLabels(t *testing.T) { + tests := []struct { + name string + metricLabels []*dto.LabelPair + wantLabels map[string]string + want bool + }{ + { + name: "empty want labels matches anything", + metricLabels: []*dto.LabelPair{{Name: proto.String(testLabelFoo), Value: proto.String(testLabelBar)}}, + wantLabels: nil, + want: true, + }, + { + name: "exact match", + metricLabels: []*dto.LabelPair{{Name: proto.String(testLabelFoo), Value: proto.String(testLabelBar)}}, + wantLabels: map[string]string{testLabelFoo: testLabelBar}, + want: true, + }, + { + name: "partial match (subset of labels)", + metricLabels: []*dto.LabelPair{ + {Name: proto.String(testLabelFoo), Value: proto.String(testLabelBar)}, + {Name: proto.String("baz"), Value: proto.String("qux")}, + }, + wantLabels: map[string]string{testLabelFoo: testLabelBar}, + want: true, + }, + { + name: "no match - different value", + metricLabels: []*dto.LabelPair{{Name: proto.String(testLabelFoo), Value: proto.String(testLabelBar)}}, + wantLabels: map[string]string{testLabelFoo: "baz"}, + want: false, + }, + { + name: "no match - missing label", + metricLabels: []*dto.LabelPair{{Name: proto.String(testLabelFoo), Value: proto.String(testLabelBar)}}, + wantLabels: map[string]string{"missing": "label"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := matchLabels(tt.metricLabels, tt.wantLabels) + if got != tt.want { + t.Errorf("matchLabels() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestEvaluateOperator(t *testing.T) { + tests := []struct { + name string + op Operator + actual float64 + expected float64 + want bool + }{ + {"eq true", OperatorEq, 5, 5, true}, + {"eq false", OperatorEq, 5, 6, false}, + {"neq true", OperatorNeq, 5, 6, true}, + {"neq false", OperatorNeq, 5, 5, false}, + {"gt true", OperatorGt, 6, 5, true}, + {"gt false", OperatorGt, 5, 5, false}, + {"gte true equal", OperatorGte, 5, 5, true}, + {"gte true greater", OperatorGte, 6, 5, true}, + {"gte false", OperatorGte, 4, 5, false}, + {"lt true", OperatorLt, 4, 5, true}, + {"lt false", OperatorLt, 5, 5, false}, + {"lte true equal", OperatorLte, 5, 5, true}, + {"lte true less", OperatorLte, 4, 5, true}, + {"lte false", OperatorLte, 6, 5, false}, + {testInvalidOpLabel, Operator("invalid"), 5, 5, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := evaluateOperator(tt.op, tt.actual, tt.expected) + if got != tt.want { + t.Errorf("evaluateOperator(%v, %v, %v) = %v, want %v", tt.op, tt.actual, tt.expected, got, tt.want) + } + }) + } +} + +func TestGetMetricValue(t *testing.T) { + counterVal := 42.0 + gaugeVal := 3.14 + untypedVal := 99.9 + summarySum := 100.0 + histogramSum := 200.0 + + tests := []struct { + name string + metric *dto.Metric + mType dto.MetricType + want float64 + wantErr bool + }{ + { + name: "counter", + metric: &dto.Metric{Counter: &dto.Counter{Value: &counterVal}}, + mType: dto.MetricType_COUNTER, + want: 42.0, + }, + { + name: "gauge", + metric: &dto.Metric{Gauge: &dto.Gauge{Value: &gaugeVal}}, + mType: dto.MetricType_GAUGE, + want: 3.14, + }, + { + name: "untyped", + metric: &dto.Metric{Untyped: &dto.Untyped{Value: &untypedVal}}, + mType: dto.MetricType_UNTYPED, + want: 99.9, + }, + { + name: "summary returns sample sum", + metric: &dto.Metric{Summary: &dto.Summary{SampleSum: &summarySum}}, + mType: dto.MetricType_SUMMARY, + want: 100.0, + }, + { + name: "histogram returns sample sum", + metric: &dto.Metric{Histogram: &dto.Histogram{SampleSum: &histogramSum}}, + mType: dto.MetricType_HISTOGRAM, + want: 200.0, + }, + { + name: "fallback to counter when type unknown", + metric: &dto.Metric{Counter: &dto.Counter{Value: &counterVal}}, + mType: dto.MetricType(-1), + want: 42.0, + }, + { + name: "empty metric returns error", + metric: &dto.Metric{}, + mType: dto.MetricType_COUNTER, + wantErr: true, + }, + { + name: "type mismatch with no fallback returns error", + metric: &dto.Metric{}, + mType: dto.MetricType_GAUGE, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getMetricValue(tt.metric, tt.mType) + + if tt.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } + + return + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got != tt.want { + t.Errorf("getMetricValue() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultConfig() + + if cfg.MissingMetric != MissingBehaviorWait { + t.Errorf("MissingMetric = %v, want %v", cfg.MissingMetric, MissingBehaviorWait) + } + + if cfg.MissingSeries != MissingBehaviorWait { + t.Errorf("MissingSeries = %v, want %v", cfg.MissingSeries, MissingBehaviorWait) + } + + if cfg.ResetBehavior != ResetBehaviorFail { + t.Errorf("ResetBehavior = %v, want %v", cfg.ResetBehavior, ResetBehaviorFail) + } + + if cfg.MaxResponseSize != "10MB" { + t.Errorf("MaxResponseSize = %v, want 10MB", cfg.MaxResponseSize) + } +} + +func TestGetMaxResponseSizeBytes(t *testing.T) { + // Test that Validate() sets the default when MaxResponseSize is empty + t.Run("default set by Validate when empty", func(t *testing.T) { + cfg := Config{ + URL: "http://example.com", + PollInterval: helper.Duration{Duration: time.Second}, + RequestTimeout: helper.Duration{Duration: time.Second}, + Assertions: []AssertionConfig{{Name: testAssertionName, Metric: "m", Operator: OperatorEq}}, + } + if err := cfg.Validate(); err != nil { + t.Fatalf("Validate() failed: %v", err) + } + + got := cfg.GetMaxResponseSizeBytes() + want := int64(10 * 1000 * 1000) // 10MB = 10,000,000 bytes (decimal) + + if got != want { + t.Errorf("GetMaxResponseSizeBytes() = %v, want %v", got, want) + } + }) + + // Test that Validate() parses custom MaxResponseSize + t.Run("custom value parsed by Validate", func(t *testing.T) { + cfg := Config{ + URL: "http://example.com", + MaxResponseSize: "5MB", + PollInterval: helper.Duration{Duration: time.Second}, + RequestTimeout: helper.Duration{Duration: time.Second}, + Assertions: []AssertionConfig{{Name: testAssertionName, Metric: "m", Operator: OperatorEq}}, + } + if err := cfg.Validate(); err != nil { + t.Fatalf("Validate() failed: %v", err) + } + + got := cfg.GetMaxResponseSizeBytes() + want := int64(5 * 1000 * 1000) // 5MB = 5,000,000 bytes (decimal) + + if got != want { + t.Errorf("GetMaxResponseSizeBytes() = %v, want %v", got, want) + } + }) +} + +func TestValidateMissingBehavior(t *testing.T) { + validCases := []MissingBehavior{MissingBehaviorWait, MissingBehaviorFail, MissingBehaviorPass, ""} + for _, v := range validCases { + if err := validateMissingBehavior(v); err != nil { + t.Errorf("validateMissingBehavior(%q) returned unexpected error: %v", v, err) + } + } + + if err := validateMissingBehavior("invalid"); err == nil { + t.Error("validateMissingBehavior(\"invalid\") expected error, got nil") + } +} + +func TestValidateResetBehavior(t *testing.T) { + validCases := []ResetBehavior{ResetBehaviorFail, ResetBehaviorRebaseline, ResetBehaviorIgnore, ""} + for _, v := range validCases { + if err := validateResetBehavior(v); err != nil { + t.Errorf("validateResetBehavior(%q) returned unexpected error: %v", v, err) + } + } + + if err := validateResetBehavior("invalid"); err == nil { + t.Error("validateResetBehavior(\"invalid\") expected error, got nil") + } +} + +func TestValidateMode(t *testing.T) { + validCases := []AssertionMode{AssertionModeValue, AssertionModeDelta, ""} + for _, v := range validCases { + if err := validateMode(v); err != nil { + t.Errorf("validateMode(%q) returned unexpected error: %v", v, err) + } + } + + if err := validateMode("invalid"); err == nil { + t.Error("validateMode(\"invalid\") expected error, got nil") + } +} + +func TestValidateOperator(t *testing.T) { + validCases := []Operator{OperatorEq, OperatorNeq, OperatorGt, OperatorGte, OperatorLt, OperatorLte} + for _, v := range validCases { + if err := validateOperator(v); err != nil { + t.Errorf("validateOperator(%q) returned unexpected error: %v", v, err) + } + } + + if err := validateOperator(""); err == nil { + t.Error("validateOperator(\"\") expected error, got nil") + } + + if err := validateOperator("invalid"); err == nil { + t.Error("validateOperator(\"invalid\") expected error, got nil") + } +} + +// newTestTask creates a Task for testing with a no-op logger. +func newTestTask(cfg *Config) *Task { + logger := logrus.New() + logger.SetLevel(logrus.PanicLevel) + + c := Config{} + if cfg != nil { + c = *cfg + } + + // Ensure maxResponseSizeBytes has a default for tests that don't call Validate + // Use decimal value (10MB = 10,000,000) consistent with humanize.ParseBytes + if c.maxResponseSizeBytes == 0 { + c.maxResponseSizeBytes = 10 * 1000 * 1000 + } + + return &Task{ + config: c, + logger: logger, + baselines: make(map[string]float64), + } +} + +// makeCounterFamily creates a COUNTER metric family with the given metrics. +func makeCounterFamily(_ string, metrics ...*dto.Metric) *dto.MetricFamily { + mType := dto.MetricType_COUNTER + + return &dto.MetricFamily{ + Name: proto.String(testMetric1), + Type: &mType, + Metric: metrics, + } +} + +// makeGaugeFamily creates a GAUGE metric family with the given metrics. +func makeGaugeFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { + mType := dto.MetricType_GAUGE + + return &dto.MetricFamily{ + Name: proto.String(name), + Type: &mType, + Metric: metrics, + } +} + +// makeCounter creates a counter metric with labels and value. +func makeCounter(value float64, labels map[string]string) *dto.Metric { + m := &dto.Metric{ + Counter: &dto.Counter{Value: proto.Float64(value)}, + } + + for k, v := range labels { + m.Label = append(m.Label, &dto.LabelPair{ + Name: proto.String(k), + Value: proto.String(v), + }) + } + + return m +} + +// makeGauge creates a gauge metric with labels and value. +func makeGauge(value float64, labels map[string]string) *dto.Metric { + m := &dto.Metric{ + Gauge: &dto.Gauge{Value: proto.Float64(value)}, + } + + for k, v := range labels { + m.Label = append(m.Label, &dto.LabelPair{ + Name: proto.String(k), + Value: proto.String(v), + }) + } + + return m +} + +// makeUntypedFamily creates an UNTYPED metric family with the given metrics. +func makeUntypedFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { + mType := dto.MetricType_UNTYPED + + return &dto.MetricFamily{ + Name: proto.String(name), + Type: &mType, + Metric: metrics, + } +} + +// makeUntyped creates an untyped metric with labels and value. +func makeUntyped(value float64, labels map[string]string) *dto.Metric { + m := &dto.Metric{ + Untyped: &dto.Untyped{Value: proto.Float64(value)}, + } + + for k, v := range labels { + m.Label = append(m.Label, &dto.LabelPair{ + Name: proto.String(k), + Value: proto.String(v), + }) + } + + return m +} + +func TestFindMetricValue(t *testing.T) { + tests := []struct { + name string + mf *dto.MetricFamily + labels map[string]string + wantValue float64 + wantErr string + }{ + { + name: "single series no labels", + mf: makeCounterFamily(testMetric1, makeCounter(42, nil)), + labels: nil, + wantValue: 42, + }, + { + name: "single series with matching labels", + mf: makeCounterFamily(testMetric1, makeCounter(100, map[string]string{ + testLabelEnv: testValueProd, + })), + labels: map[string]string{testLabelEnv: testValueProd}, + wantValue: 100, + }, + { + name: "no matching series", + mf: makeCounterFamily(testMetric1, makeCounter(42, map[string]string{testLabelEnv: testValueProd})), + labels: map[string]string{testLabelEnv: "dev"}, + wantErr: "no matching series", + }, + { + name: "multiple matching series error", + mf: makeCounterFamily(testMetric1, + makeCounter(10, map[string]string{testLabelEnv: testValueProd}), + makeCounter(20, map[string]string{testLabelEnv: testValueProd}), + ), + labels: map[string]string{testLabelEnv: testValueProd}, + wantErr: "labels match 2 series", + }, + { + name: "partial label match selects correct series", + mf: makeCounterFamily(testMetric1, + makeCounter(10, map[string]string{testLabelEnv: testValueProd, testLabelRegion: "us"}), + makeCounter(20, map[string]string{testLabelEnv: testValueProd, testLabelRegion: "eu"}), + ), + labels: map[string]string{testLabelEnv: testValueProd, testLabelRegion: "eu"}, + wantValue: 20, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := newTestTask(&Config{}) + val, err := task.findMetricValue(tt.mf, tt.labels) + + switch { + case tt.wantErr != "" && err == nil: + t.Fatalf("expected error containing %q, got nil", tt.wantErr) + case tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr): + t.Fatalf("error %q does not contain %q", err.Error(), tt.wantErr) + case tt.wantErr == "" && err != nil: + t.Fatalf("unexpected error: %v", err) + case tt.wantErr == "" && val != tt.wantValue: + t.Errorf("findMetricValue() = %v, want %v", val, tt.wantValue) + } + }) + } +} + +func TestHandleMissing(t *testing.T) { + tests := []struct { + name string + behavior MissingBehavior + wantErr bool + wantPass bool + wantWait bool + }{ + {"fail behavior", MissingBehaviorFail, true, false, false}, + {"pass behavior", MissingBehaviorPass, false, true, false}, + {"wait behavior", MissingBehaviorWait, false, false, true}, + {"empty defaults to wait", "", false, false, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := newTestTask(&Config{}) + result := assertionResult{name: "test"} + result = task.handleMissing(result, tt.behavior, "metric not found") + + if tt.wantErr && result.err == nil { + t.Error("expected error, got nil") + } + + if !tt.wantErr && result.err != nil { + t.Errorf("unexpected error: %v", result.err) + } + + if result.passed != tt.wantPass { + t.Errorf("passed = %v, want %v", result.passed, tt.wantPass) + } + + if result.waiting != tt.wantWait { + t.Errorf("waiting = %v, want %v", result.waiting, tt.wantWait) + } + }) + } +} + +func TestEvaluateAssertion_MissingMetric(t *testing.T) { + tests := []struct { + name string + behavior MissingBehavior + wantErr bool + wantPass bool + wantWait bool + }{ + {"wait on missing metric", MissingBehaviorWait, false, false, true}, + {"fail on missing metric", MissingBehaviorFail, true, false, false}, + {"pass on missing metric", MissingBehaviorPass, false, true, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := newTestTask(&Config{MissingMetric: tt.behavior}) + assertion := &AssertionConfig{ + Name: testAssertionName2, + Metric: "nonexistent_metric", + Operator: OperatorGt, + Value: 0, + } + + // Empty metric families - metric doesn't exist + result := task.evaluateAssertion(assertion, map[string]*dto.MetricFamily{}) + + if tt.wantErr { + if result.err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(result.err.Error(), "not found") { + t.Errorf("error %q does not contain 'not found'", result.err.Error()) + } + } else if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.passed != tt.wantPass { + t.Errorf("passed = %v, want %v", result.passed, tt.wantPass) + } + + if result.waiting != tt.wantWait { + t.Errorf("waiting = %v, want %v", result.waiting, tt.wantWait) + } + }) + } +} + +func TestEvaluateAssertion_MissingSeries(t *testing.T) { + tests := []struct { + name string + behavior MissingBehavior + wantErr bool + wantPass bool + wantWait bool + }{ + {"wait on missing series", MissingBehaviorWait, false, false, true}, + {"fail on missing series", MissingBehaviorFail, true, false, false}, + {"pass on missing series", MissingBehaviorPass, false, true, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := newTestTask(&Config{MissingSeries: tt.behavior}) + assertion := &AssertionConfig{ + Name: testAssertionName2, + Metric: testMetric1, + Labels: map[string]string{testLabelEnv: "nonexistent"}, + Operator: OperatorGt, + Value: 0, + } + + // Metric exists but labels don't match + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(42, map[string]string{testLabelEnv: testValueProd})), + } + result := task.evaluateAssertion(assertion, families) + + if tt.wantErr { + if result.err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(result.err.Error(), "no matching series") { + t.Errorf("error %q does not contain 'no matching series'", result.err.Error()) + } + } else if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.passed != tt.wantPass { + t.Errorf("passed = %v, want %v", result.passed, tt.wantPass) + } + + if result.waiting != tt.wantWait { + t.Errorf("waiting = %v, want %v", result.waiting, tt.wantWait) + } + }) + } +} + +func TestEvaluateAssertion_AssertionOverridesBehavior(t *testing.T) { + // Global behavior is wait, but assertion overrides to fail + failBehavior := MissingBehaviorFail + task := newTestTask(&Config{MissingMetric: MissingBehaviorWait}) + assertion := &AssertionConfig{ + Name: "testAssertionName2", + Metric: "nonexistent_metric", + Operator: OperatorGt, + Value: 0, + MissingMetric: &failBehavior, + } + + result := task.evaluateAssertion(assertion, map[string]*dto.MetricFamily{}) + + if result.err == nil { + t.Fatal("expected error, got nil") + } + + if result.waiting { + t.Error("expected waiting=false when override is fail") + } +} + +func TestEvaluateAssertion_ValueMode(t *testing.T) { + tests := []struct { + name string + metricVal float64 + operator Operator + assertVal float64 + wantPassed bool + }{ + {"gt passes", 100, OperatorGt, 50, true}, + {"gt fails", 50, OperatorGt, 100, false}, + {"eq passes", 42, OperatorEq, 42, true}, + {"eq fails", 42, OperatorEq, 43, false}, + {"lte passes", 50, OperatorLte, 50, true}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := newTestTask(&Config{}) + assertion := &AssertionConfig{ + Name: testAssertionName2, + Metric: testMetric1, + Mode: AssertionModeValue, + Operator: tt.operator, + Value: tt.assertVal, + } + + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(tt.metricVal, nil)), + } + result := task.evaluateAssertion(assertion, families) + + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.waiting { + t.Error("unexpected waiting=true") + } + + if result.passed != tt.wantPassed { + t.Errorf("passed = %v, want %v", result.passed, tt.wantPassed) + } + + if result.value != tt.metricVal { + t.Errorf("value = %v, want %v", result.value, tt.metricVal) + } + + if !math.IsNaN(result.delta) { + t.Errorf("delta = %v, want NaN", result.delta) + } + }) + } +} + +func TestEvaluateAssertion_DeltaMode_FirstScrape(t *testing.T) { + task := newTestTask(&Config{}) + assertion := &AssertionConfig{ + Name: "delta_test", + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorGt, + Value: 10, + } + + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(100, nil)), + } + + // First scrape should record baseline and wait + result := task.evaluateAssertion(assertion, families) + + if !result.waiting { + t.Error("first scrape should wait") + } + + if result.passed { + t.Error("first scrape should not pass yet") + } + + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if task.baselines["delta_test"] != 100.0 { + t.Errorf("baseline = %v, want 100.0", task.baselines["delta_test"]) + } +} + +func TestEvaluateAssertion_DeltaMode_SecondScrape(t *testing.T) { + task := newTestTask(&Config{}) + task.baselines["delta_test"] = 100.0 // Simulate first scrape + + assertion := &AssertionConfig{ + Name: "delta_test", + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorGt, + Value: 10, + } + + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(150, nil)), + } + + // Second scrape should evaluate delta (150 - 100 = 50) + result := task.evaluateAssertion(assertion, families) + + if result.waiting { + t.Error("second scrape should not wait") + } + + if !result.passed { + t.Error("delta 50 > 10 should pass") + } + + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.value != 150.0 { + t.Errorf("value = %v, want 150.0", result.value) + } + + if result.delta != 50.0 { + t.Errorf("delta = %v, want 50.0", result.delta) + } +} + +func TestEvaluateAssertion_CounterReset_Fail(t *testing.T) { + task := newTestTask(&Config{ResetBehavior: ResetBehaviorFail}) + task.baselines[testResetName] = 100.0 + + assertion := &AssertionConfig{ + Name: testResetName, + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorGte, + Value: 0, + } + + // Counter dropped from 100 to 50 (reset detected) + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(50, nil)), + } + + result := task.evaluateAssertion(assertion, families) + + if result.err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(result.err.Error(), "counter reset detected") { + t.Errorf("error %q does not contain 'counter reset detected'", result.err.Error()) + } +} + +func TestEvaluateAssertion_CounterReset_Rebaseline(t *testing.T) { + task := newTestTask(&Config{ResetBehavior: ResetBehaviorRebaseline}) + task.baselines[testResetName] = 100.0 + + assertion := &AssertionConfig{ + Name: testResetName, + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorGte, + Value: 0, + } + + // Counter dropped from 100 to 50 (reset detected) + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(50, nil)), + } + + result := task.evaluateAssertion(assertion, families) + + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if !result.waiting { + t.Error("should wait after rebaseline") + } + + if task.baselines[testResetName] != 50.0 { + t.Errorf("baseline = %v, want 50.0", task.baselines[testResetName]) + } +} + +func TestEvaluateAssertion_CounterReset_Ignore(t *testing.T) { + task := newTestTask(&Config{ResetBehavior: ResetBehaviorIgnore}) + task.baselines[testResetName] = 100.0 + + assertion := &AssertionConfig{ + Name: testResetName, + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorLt, + Value: 0, + } + + // Counter dropped from 100 to 50 (reset detected but ignored) + families := map[string]*dto.MetricFamily{ + testMetric1: makeCounterFamily(testMetric1, makeCounter(50, nil)), + } + + result := task.evaluateAssertion(assertion, families) + + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.waiting { + t.Error("should not wait when ignoring resets") + } + + // Delta = 50 - 100 = -50, which is < 0 + if !result.passed { + t.Error("negative delta should be allowed when ignoring resets") + } + + if result.delta != -50.0 { + t.Errorf("delta = %v, want -50.0", result.delta) + } +} + +func TestEvaluateAssertion_GaugeDecrease_NoReset(t *testing.T) { + // Gauges can decrease normally - this should NOT trigger reset detection + task := newTestTask(&Config{ResetBehavior: ResetBehaviorFail}) + task.baselines["gauge_test"] = 100.0 + + assertion := &AssertionConfig{ + Name: "gauge_test", + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorLt, + Value: 0, + } + + // Gauge dropped from 100 to 50 - this is normal for gauges + families := map[string]*dto.MetricFamily{ + testMetric1: makeGaugeFamily(testMetric1, makeGauge(50, nil)), + } + + result := task.evaluateAssertion(assertion, families) + + // Should NOT error even though ResetBehavior is Fail - gauges don't reset + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.waiting { + t.Error("should not wait") + } + + if !result.passed { + t.Error("gauge decrease should work normally") + } + + if result.delta != -50.0 { + t.Errorf("delta = %v, want -50.0", result.delta) + } +} + +func TestEvaluateAssertion_UntypedDecrease_NoReset(t *testing.T) { + // UNTYPED metrics (metrics without a TYPE declaration) should NOT trigger reset detection + task := newTestTask(&Config{ResetBehavior: ResetBehaviorFail}) + task.baselines["untyped_test"] = 100.0 + + assertion := &AssertionConfig{ + Name: "untyped_test", + Metric: testMetric1, + Mode: AssertionModeDelta, + Operator: OperatorLt, + Value: 0, + } + + // Untyped metric dropped from 100 to 50 - should NOT trigger reset detection + families := map[string]*dto.MetricFamily{ + testMetric1: makeUntypedFamily(testMetric1, makeUntyped(50, nil)), + } + + result := task.evaluateAssertion(assertion, families) + + // Should NOT error even though ResetBehavior is Fail - only COUNTER type triggers reset + if result.err != nil { + t.Fatalf("unexpected error: %v", result.err) + } + + if result.waiting { + t.Error("should not wait") + } + + if !result.passed { + t.Error("untyped decrease should work normally without reset detection") + } + + if result.delta != -50.0 { + t.Errorf("delta = %v, want -50.0", result.delta) + } +} + +func TestEvaluateAssertion_NaN(t *testing.T) { + task := newTestTask(&Config{}) + assertion := &AssertionConfig{ + Name: "nan_test", + Metric: testMetric1, + Operator: OperatorGt, + Value: 0, + } + + families := map[string]*dto.MetricFamily{ + testMetric1: makeGaugeFamily(testMetric1, makeGauge(math.NaN(), nil)), + } + + result := task.evaluateAssertion(assertion, families) + + if result.err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(result.err.Error(), "non-finite") { + t.Errorf("error %q does not contain 'non-finite'", result.err.Error()) + } +} + +func TestEvaluateAssertion_Inf(t *testing.T) { + task := newTestTask(&Config{}) + assertion := &AssertionConfig{ + Name: "inf_test", + Metric: testMetric1, + Operator: OperatorGt, + Value: 0, + } + + families := map[string]*dto.MetricFamily{ + testMetric1: makeGaugeFamily(testMetric1, makeGauge(math.Inf(1), nil)), + } + + result := task.evaluateAssertion(assertion, families) + + if result.err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(result.err.Error(), "non-finite") { + t.Errorf("error %q does not contain 'non-finite'", result.err.Error()) + } +} + +func TestScrapeMetrics(t *testing.T) { + metricsBody := `# HELP test_counter A test counter +# TYPE test_counter counter +test_counter{env="prod"} 42 +test_counter{env="dev"} 10 +# HELP test_gauge A test gauge +# TYPE test_gauge gauge +test_gauge 3.14 +` + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(metricsBody)) + })) + defer server.Close() + + task := newTestTask(&Config{ + URL: server.URL, + RequestTimeout: helper.Duration{Duration: 5 * time.Second}, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + families, err := task.scrapeMetrics(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if _, ok := families[testCounterMetric]; !ok { + t.Error("missing test_counter metric") + } + + if _, ok := families["test_gauge"]; !ok { + t.Error("missing test_gauge metric") + } + + // Verify counter values + counterFamily := families[testCounterMetric] + if counterFamily.GetType() != dto.MetricType_COUNTER { + t.Errorf("counter type = %v, want COUNTER", counterFamily.GetType()) + } + + if len(counterFamily.GetMetric()) != 2 { + t.Errorf("counter metric count = %d, want 2", len(counterFamily.GetMetric())) + } + + // Verify gauge value + gaugeFamily := families["test_gauge"] + if gaugeFamily.GetType() != dto.MetricType_GAUGE { + t.Errorf("gauge type = %v, want GAUGE", gaugeFamily.GetType()) + } + + if len(gaugeFamily.GetMetric()) != 1 { + t.Errorf("gauge metric count = %d, want 1", len(gaugeFamily.GetMetric())) + } +} + +func TestScrapeMetrics_HTTPError(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + task := newTestTask(&Config{URL: server.URL}) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + _, err := task.scrapeMetrics(context.Background()) + if err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(err.Error(), "500") { + t.Errorf("error %q does not contain '500'", err.Error()) + } +} + +func TestScrapeMetrics_ResponseTooLarge(t *testing.T) { + // Create a response larger than the limit + largeBody := make([]byte, 100) + for i := range largeBody { + largeBody[i] = 'x' + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write(largeBody) + })) + defer server.Close() + + task := newTestTask(&Config{ + URL: server.URL, + maxResponseSizeBytes: 50, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + _, err := task.scrapeMetrics(context.Background()) + if err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(err.Error(), "exceeds max size") { + t.Errorf("error %q does not contain 'exceeds max size'", err.Error()) + } +} + +func TestScrapeMetrics_InvalidPrometheusFormat(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("this is not prometheus format {{{")) + })) + defer server.Close() + + task := newTestTask(&Config{URL: server.URL}) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + _, err := task.scrapeMetrics(context.Background()) + if err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(err.Error(), "parse") { + t.Errorf("error %q does not contain 'parse'", err.Error()) + } +} + +func TestScrapeMetrics_CustomHeaders(t *testing.T) { + var receivedAuth string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + receivedAuth = r.Header.Get("Authorization") + + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("# HELP m A metric\n# TYPE m gauge\nm 1\n")) + })) + defer server.Close() + + task := newTestTask(&Config{ + URL: server.URL, + Headers: map[string]string{"Authorization": "Bearer token123"}, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + _, err := task.scrapeMetrics(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if receivedAuth != "Bearer token123" { + t.Errorf("Authorization header = %q, want 'Bearer token123'", receivedAuth) + } +} + +func newTestTaskWithContext(cfg *Config) (*Task, types.Variables, *types.TaskResult) { + logger := logrus.New() + logger.SetLevel(logrus.PanicLevel) + + outputs := vars.NewVariables(nil) + + var lastResult types.TaskResult + + ctx := &types.TaskContext{ + Outputs: outputs, + SetResult: func(r types.TaskResult) { lastResult = r }, + ReportProgress: func(_ float64, _ string) {}, + } + + var config Config + if cfg != nil { + config = *cfg + } + + // Ensure maxResponseSizeBytes has a default for tests that don't call Validate + // Use decimal value (10MB = 10,000,000) consistent with humanize.ParseBytes + if config.maxResponseSizeBytes == 0 { + config.maxResponseSizeBytes = 10 * 1000 * 1000 + } + + task := &Task{ + ctx: ctx, + config: config, + logger: logger, + baselines: make(map[string]float64), + } + + return task, outputs, &lastResult +} + +func TestRunCheck_ScrapeError_FailOnCheckMiss(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + task, outputs, result := newTestTaskWithContext(&Config{ + URL: server.URL, + FailOnCheckMiss: true, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + done, err := task.runCheck(context.Background()) + + if !done { + t.Error("expected done=true") + } + + if err == nil { + t.Fatal("expected error, got nil") + } + + if !strings.Contains(err.Error(), "scrape failed") { + t.Errorf("error %q does not contain 'scrape failed'", err.Error()) + } + + if *result != types.TaskResultFailure { + t.Errorf("result = %v, want TaskResultFailure", *result) + } + + if outputs.GetVar("scrapeErrors") != 1 { + t.Errorf("scrapeErrors = %v, want 1", outputs.GetVar("scrapeErrors")) + } +} + +func TestRunCheck_ScrapeError_RetryOnFail(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + task, outputs, result := newTestTaskWithContext(&Config{ + URL: server.URL, + FailOnCheckMiss: false, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + done, err := task.runCheck(context.Background()) + + if done { + t.Error("expected done=false") + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if *result != types.TaskResultNone { + t.Errorf("result = %v, want TaskResultNone", *result) + } + + if outputs.GetVar("scrapeErrors") != 1 { + t.Errorf("scrapeErrors = %v, want 1", outputs.GetVar("scrapeErrors")) + } +} + +func TestRunCheck_AllAssertionsPass(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("# TYPE test_counter counter\ntest_counter 100\n")) + })) + defer server.Close() + + task, outputs, result := newTestTaskWithContext(&Config{ + URL: server.URL, + MissingMetric: MissingBehaviorFail, + MissingSeries: MissingBehaviorFail, + Assertions: []AssertionConfig{ + {Name: testCheckCounterAssert, Metric: testCounterMetric, Mode: AssertionModeValue, Operator: OperatorGte, Value: 50}, + }, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + done, err := task.runCheck(context.Background()) + + if !done { + t.Error("expected done=true") + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if *result != types.TaskResultSuccess { + t.Errorf("result = %v, want TaskResultSuccess", *result) + } + + passed, ok := outputs.GetVar("passedAssertions").([]interface{}) + if !ok { + t.Fatalf("passedAssertions type = %T, want []interface{}", outputs.GetVar("passedAssertions")) + } + + if len(passed) != 1 { + t.Errorf("passedAssertions length = %d, want 1", len(passed)) + } + + if len(passed) > 0 && passed[0] != testCheckCounterAssert { + t.Errorf("passedAssertions[0] = %v, want %q", passed[0], testCheckCounterAssert) + } +} + +func TestRunCheck_AssertionFails(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("# TYPE test_counter counter\ntest_counter 10\n")) + })) + defer server.Close() + + task, outputs, result := newTestTaskWithContext(&Config{ + URL: server.URL, + FailOnCheckMiss: true, + MissingMetric: MissingBehaviorFail, + MissingSeries: MissingBehaviorFail, + Assertions: []AssertionConfig{ + {Name: testCheckCounterAssert, Metric: testCounterMetric, Mode: AssertionModeValue, Operator: OperatorGte, Value: 50}, + }, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + done, err := task.runCheck(context.Background()) + + if !done { + t.Error("expected done=true") + } + + if err == nil { + t.Fatal("expected error, got nil") + } + + if *result != types.TaskResultFailure { + t.Errorf("result = %v, want TaskResultFailure", *result) + } + + failed, ok := outputs.GetVar("failedAssertions").([]interface{}) + if !ok { + t.Fatalf("failedAssertions type = %T, want []interface{}", outputs.GetVar("failedAssertions")) + } + + if len(failed) != 1 { + t.Errorf("failedAssertions length = %d, want 1", len(failed)) + } + + if len(failed) > 0 && failed[0] != testCheckCounterAssert { + t.Errorf("failedAssertions[0] = %v, want %q", failed[0], testCheckCounterAssert) + } +} + +func TestRunCheck_MixedAssertions(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("# TYPE metric_a counter\nmetric_a 100\n# TYPE metric_b counter\nmetric_b 5\n")) + })) + defer server.Close() + + task, outputs, result := newTestTaskWithContext(&Config{ + URL: server.URL, + FailOnCheckMiss: false, + MissingMetric: MissingBehaviorWait, + MissingSeries: MissingBehaviorWait, + Assertions: []AssertionConfig{ + {Name: "a_passes", Metric: "metric_a", Mode: AssertionModeValue, Operator: OperatorGte, Value: 50}, + {Name: "b_fails", Metric: "metric_b", Mode: AssertionModeValue, Operator: OperatorGte, Value: 50}, + {Name: "c_waits", Metric: "metric_missing", Mode: AssertionModeValue, Operator: OperatorGte, Value: 0}, + }, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + done, err := task.runCheck(context.Background()) + + // Should keep waiting because one assertion is waiting + if done { + t.Error("expected done=false") + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if *result != types.TaskResultNone { + t.Errorf("result = %v, want TaskResultNone", *result) + } + + passed, ok := outputs.GetVar("passedAssertions").([]interface{}) + if !ok { + t.Fatalf("passedAssertions type = %T, want []interface{}", outputs.GetVar("passedAssertions")) + } + + foundAPasses := false + + for _, p := range passed { + if p == "a_passes" { + foundAPasses = true + } + } + + if !foundAPasses { + t.Error("passedAssertions should contain 'a_passes'") + } + + failed, ok := outputs.GetVar("failedAssertions").([]interface{}) + if !ok { + t.Fatalf("failedAssertions type = %T, want []interface{}", outputs.GetVar("failedAssertions")) + } + + foundBFails := false + + for _, f := range failed { + if f == "b_fails" { + foundBFails = true + } + } + + if !foundBFails { + t.Error("failedAssertions should contain 'b_fails'") + } +} + +func TestRunCheck_AssertionErrorIncrement(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + // Metric with multiple matching series (causes assertion error) + _, _ = w.Write([]byte("# TYPE m counter\nm{env=\"a\"} 1\nm{env=\"b\"} 2\n")) + })) + defer server.Close() + + task, outputs, _ := newTestTaskWithContext(&Config{ + URL: server.URL, + FailOnCheckMiss: false, + MissingMetric: MissingBehaviorFail, + MissingSeries: MissingBehaviorFail, + Assertions: []AssertionConfig{ + {Name: "ambiguous", Metric: "m", Mode: AssertionModeValue, Operator: OperatorGte, Value: 0}, + }, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + _, _ = task.runCheck(context.Background()) + + if outputs.GetVar("assertionErrors") != 1 { + t.Errorf("assertionErrors = %v, want 1", outputs.GetVar("assertionErrors")) + } +} + +func TestRunCheck_ContinueOnPass(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("# TYPE my_counter counter\nmy_counter 100\n")) + })) + defer server.Close() + + task, _, result := newTestTaskWithContext(&Config{ + URL: server.URL, + ContinueOnPass: true, + MissingMetric: MissingBehaviorFail, + MissingSeries: MissingBehaviorFail, + Assertions: []AssertionConfig{ + {Name: "check", Metric: "my_counter", Mode: AssertionModeValue, Operator: OperatorGte, Value: 50}, + }, + }) + task.httpClient = &http.Client{Timeout: 5 * time.Second} + + done, err := task.runCheck(context.Background()) + + // Should continue even though all assertions pass + if done { + t.Error("expected done=false with continueOnPass") + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if *result != types.TaskResultSuccess { + t.Errorf("result = %v, want TaskResultSuccess", *result) + } +} + +// ============================================================================= +// Full-Cycle Integration Tests +// ============================================================================= +// These tests verify the complete flow from DefaultConfig() through Validate() +// to execution, mirroring how the task is used in production. + +func TestIntegration_ValueMode(t *testing.T) { + // Setup mock metrics server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(` +# HELP http_requests_total Total HTTP requests +# TYPE http_requests_total counter +http_requests_total{method="GET",status="200"} 150 +http_requests_total{method="POST",status="200"} 50 +`)) + })) + defer server.Close() + + // Start from DefaultConfig (like production) + cfg := DefaultConfig() + cfg.URL = server.URL + cfg.Assertions = []AssertionConfig{ + { + Name: "get_requests_above_100", + Metric: "http_requests_total", + Labels: map[string]string{"method": "GET", "status": "200"}, + Mode: AssertionModeValue, + Operator: OperatorGt, + Value: 100, + }, + } + + // Validate (like production) + if err := cfg.Validate(); err != nil { + t.Fatalf("Validate() failed: %v", err) + } + + // Verify maxResponseSizeBytes was set by Validate + if cfg.GetMaxResponseSizeBytes() == 0 { + t.Fatal("maxResponseSizeBytes should be set after Validate()") + } + + // Create task with context (like production) + logger := logrus.New() + logger.SetLevel(logrus.PanicLevel) + + outputs := vars.NewVariables(nil) + + var lastResult types.TaskResult + + ctx := &types.TaskContext{ + Outputs: outputs, + SetResult: func(r types.TaskResult) { lastResult = r }, + ReportProgress: func(_ float64, _ string) {}, + } + + task := &Task{ + ctx: ctx, + config: cfg, + logger: logger, + baselines: make(map[string]float64), + httpClient: &http.Client{Timeout: cfg.RequestTimeout.Duration}, + } + + // Run check + done, err := task.runCheck(context.Background()) + if err != nil { + t.Fatalf("runCheck() error: %v", err) + } + + if !done { + t.Error("expected done=true when assertion passes") + } + + if lastResult != types.TaskResultSuccess { + t.Errorf("result = %v, want TaskResultSuccess", lastResult) + } + + // Verify outputs + passed := outputs.GetVar("passedAssertions") + if passed == nil { + t.Fatal("passedAssertions output not set") + } + + passedList, ok := passed.([]any) + if !ok { + t.Fatalf("passedAssertions type = %T, want []any", passed) + } + + if len(passedList) != 1 || passedList[0] != "get_requests_above_100" { + t.Errorf("passedAssertions = %v, want [get_requests_above_100]", passedList) + } +} + +func TestIntegration_DeltaMode(t *testing.T) { + requestCount := 0 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + requestCount++ + + // First scrape: baseline of 100 + // Second scrape: value of 150 (delta = 50) + value := 100 + if requestCount > 1 { + value = 150 + } + + _, _ = fmt.Fprintf(w, ` +# HELP requests_total Total requests +# TYPE requests_total counter +requests_total %d +`, value) + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.URL = server.URL + cfg.Assertions = []AssertionConfig{ + { + Name: "delta_above_40", + Metric: "requests_total", + Mode: AssertionModeDelta, + Operator: OperatorGt, + Value: 40, + }, + } + + if err := cfg.Validate(); err != nil { + t.Fatalf("Validate() failed: %v", err) + } + + logger := logrus.New() + logger.SetLevel(logrus.PanicLevel) + + outputs := vars.NewVariables(nil) + + var lastResult types.TaskResult + + ctx := &types.TaskContext{ + Outputs: outputs, + SetResult: func(r types.TaskResult) { lastResult = r }, + ReportProgress: func(_ float64, _ string) {}, + } + + task := &Task{ + ctx: ctx, + config: cfg, + logger: logger, + baselines: make(map[string]float64), + httpClient: &http.Client{Timeout: cfg.RequestTimeout.Duration}, + } + + // First scrape - records baseline, doesn't complete + done, err := task.runCheck(context.Background()) + if err != nil { + t.Fatalf("first runCheck() error: %v", err) + } + + if done { + t.Error("expected done=false on first scrape (baseline recording)") + } + + // Second scrape - evaluates delta + done, err = task.runCheck(context.Background()) + if err != nil { + t.Fatalf("second runCheck() error: %v", err) + } + + if !done { + t.Error("expected done=true on second scrape") + } + + if lastResult != types.TaskResultSuccess { + t.Errorf("result = %v, want TaskResultSuccess", lastResult) + } + + // Verify delta output + deltas := outputs.GetVar("deltas") + if deltas == nil { + t.Fatal("deltas output not set") + } + + deltasMap, ok := deltas.(map[string]any) + if !ok { + t.Fatalf("deltas type = %T, want map[string]any", deltas) + } + + if delta, exists := deltasMap["delta_above_40"]; !exists || delta != 50.0 { + t.Errorf("deltas[delta_above_40] = %v, want 50.0", deltasMap["delta_above_40"]) + } +} + +func TestIntegration_MultipleAssertions(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(` +# HELP cpu_usage CPU usage percentage +# TYPE cpu_usage gauge +cpu_usage{host="server1"} 45.5 +cpu_usage{host="server2"} 78.2 +# HELP memory_usage Memory usage percentage +# TYPE memory_usage gauge +memory_usage{host="server1"} 60.0 +`)) + })) + defer server.Close() + + cfg := DefaultConfig() + cfg.URL = server.URL + cfg.Assertions = []AssertionConfig{ + { + Name: "server1_cpu_ok", + Metric: "cpu_usage", + Labels: map[string]string{testLabelHost: "server1"}, + Mode: AssertionModeValue, + Operator: OperatorLt, + Value: 50, + }, + { + Name: "server1_memory_ok", + Metric: "memory_usage", + Labels: map[string]string{testLabelHost: "server1"}, + Mode: AssertionModeValue, + Operator: OperatorLte, + Value: 60, + }, + { + Name: "server2_cpu_high", + Metric: "cpu_usage", + Labels: map[string]string{testLabelHost: "server2"}, + Mode: AssertionModeValue, + Operator: OperatorGt, + Value: 70, + }, + } + + if err := cfg.Validate(); err != nil { + t.Fatalf("Validate() failed: %v", err) + } + + logger := logrus.New() + logger.SetLevel(logrus.PanicLevel) + + outputs := vars.NewVariables(nil) + + var lastResult types.TaskResult + + ctx := &types.TaskContext{ + Outputs: outputs, + SetResult: func(r types.TaskResult) { lastResult = r }, + ReportProgress: func(_ float64, _ string) {}, + } + + task := &Task{ + ctx: ctx, + config: cfg, + logger: logger, + baselines: make(map[string]float64), + httpClient: &http.Client{Timeout: cfg.RequestTimeout.Duration}, + } + + done, err := task.runCheck(context.Background()) + if err != nil { + t.Fatalf("runCheck() error: %v", err) + } + + if !done { + t.Error("expected done=true when all assertions pass") + } + + if lastResult != types.TaskResultSuccess { + t.Errorf("result = %v, want TaskResultSuccess", lastResult) + } + + // Verify all three passed + passed := outputs.GetVar("passedAssertions") + passedList, ok := passed.([]any) + + if !ok { + t.Fatalf("passedAssertions type = %T, want []any", passed) + } + + if len(passedList) != 3 { + t.Errorf("passedAssertions count = %d, want 3", len(passedList)) + } + + // Verify values output + values := outputs.GetVar("values") + valuesMap, ok := values.(map[string]any) + + if !ok { + t.Fatalf("values type = %T, want map[string]any", values) + } + + if v := valuesMap["server1_cpu_ok"]; v != 45.5 { + t.Errorf("values[server1_cpu_ok] = %v, want 45.5", v) + } +} diff --git a/pkg/tasks/tasks.go b/pkg/tasks/tasks.go index 16d066de..ace4be72 100644 --- a/pkg/tasks/tasks.go +++ b/pkg/tasks/tasks.go @@ -19,6 +19,7 @@ import ( checkethcall "github.com/ethpandaops/assertoor/pkg/tasks/check_eth_call" checkethconfig "github.com/ethpandaops/assertoor/pkg/tasks/check_eth_config" checkexecutionsyncstatus "github.com/ethpandaops/assertoor/pkg/tasks/check_execution_sync_status" + checkhttpmetrics "github.com/ethpandaops/assertoor/pkg/tasks/check_http_metrics" generateattestations "github.com/ethpandaops/assertoor/pkg/tasks/generate_attestations" generatebatchdeposits "github.com/ethpandaops/assertoor/pkg/tasks/generate_batch_deposits" generateblobtransactions "github.com/ethpandaops/assertoor/pkg/tasks/generate_blob_transactions" @@ -71,6 +72,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{ checkexecutionblock.TaskDescriptor, checkethcall.TaskDescriptor, checkethconfig.TaskDescriptor, + checkhttpmetrics.TaskDescriptor, checkexecutionsyncstatus.TaskDescriptor, generateattestations.TaskDescriptor, generatebatchdeposits.TaskDescriptor,