Skip to content

Commit 5869a37

Browse files
committed
add whenFailed option for error handling after retries exhausted
1 parent f26034a commit 5869a37

12 files changed

Lines changed: 817 additions & 371 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@pgflow/core': patch
3+
'@pgflow/dsl': patch
4+
---
5+
6+
Add whenFailed option for error handling after retries exhausted (fail, skip, skip-cascade)

pkgs/core/schemas/0100_function_fail_task.sql

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ as $$
1212
DECLARE
1313
v_run_failed boolean;
1414
v_step_failed boolean;
15+
v_step_skipped boolean;
16+
v_when_failed text;
17+
v_task_exhausted boolean; -- True if task has exhausted retries
1518
begin
1619

1720
-- If run is already failed, no retries allowed
@@ -62,7 +65,8 @@ flow_info AS (
6265
config AS (
6366
SELECT
6467
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
65-
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay
68+
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay,
69+
s.when_failed
6670
FROM pgflow.steps s
6771
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
6872
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
@@ -90,27 +94,53 @@ fail_or_retry_task as (
9094
AND task.status = 'started'
9195
RETURNING *
9296
),
97+
-- Determine if task exhausted retries and get when_failed mode
98+
task_status AS (
99+
SELECT
100+
(select status from fail_or_retry_task) AS new_task_status,
101+
(select when_failed from config) AS when_failed_mode,
102+
-- Task is exhausted when it's failed (no more retries)
103+
((select status from fail_or_retry_task) = 'failed') AS is_exhausted
104+
),
93105
maybe_fail_step AS (
94106
UPDATE pgflow.step_states
95107
SET
108+
-- Status logic:
109+
-- - If task not exhausted (retrying): keep current status
110+
-- - If exhausted AND when_failed='fail': set to 'failed'
111+
-- - If exhausted AND when_failed IN ('skip', 'skip-cascade'): set to 'skipped'
96112
status = CASE
97-
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN 'failed'
98-
ELSE pgflow.step_states.status
113+
WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status
114+
WHEN (select when_failed_mode from task_status) = 'fail' THEN 'failed'
115+
ELSE 'skipped' -- skip or skip-cascade
99116
END,
100117
failed_at = CASE
101-
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN now()
118+
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) = 'fail' THEN now()
102119
ELSE NULL
103120
END,
104121
error_message = CASE
105-
WHEN (select fail_or_retry_task.status from fail_or_retry_task) = 'failed' THEN fail_task.error_message
122+
WHEN (select is_exhausted from task_status) THEN fail_task.error_message
106123
ELSE NULL
107-
END
124+
END,
125+
skip_reason = CASE
126+
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed'
127+
ELSE pgflow.step_states.skip_reason
128+
END,
129+
skipped_at = CASE
130+
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN now()
131+
ELSE pgflow.step_states.skipped_at
132+
END,
133+
-- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint)
134+
remaining_tasks = CASE
135+
WHEN (select is_exhausted from task_status) AND (select when_failed_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL
136+
ELSE pgflow.step_states.remaining_tasks
137+
END
108138
FROM fail_or_retry_task
109139
WHERE pgflow.step_states.run_id = fail_task.run_id
110140
AND pgflow.step_states.step_slug = fail_task.step_slug
111141
RETURNING pgflow.step_states.*
112142
)
113-
-- Update run status
143+
-- Update run status: only fail when when_failed='fail' and step was failed
114144
UPDATE pgflow.runs
115145
SET status = CASE
116146
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
@@ -119,10 +149,27 @@ SET status = CASE
119149
failed_at = CASE
120150
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
121151
ELSE NULL
122-
END
152+
END,
153+
-- Decrement remaining_steps when step was skipped (not failed, run continues)
154+
remaining_steps = CASE
155+
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
156+
ELSE pgflow.runs.remaining_steps
157+
END
123158
WHERE pgflow.runs.run_id = fail_task.run_id
124159
RETURNING (status = 'failed') INTO v_run_failed;
125160

161+
-- Capture when_failed mode and check if step was skipped for later processing
162+
SELECT s.when_failed INTO v_when_failed
163+
FROM pgflow.steps s
164+
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
165+
WHERE r.run_id = fail_task.run_id
166+
AND s.step_slug = fail_task.step_slug;
167+
168+
SELECT (status = 'skipped') INTO v_step_skipped
169+
FROM pgflow.step_states
170+
WHERE pgflow.step_states.run_id = fail_task.run_id
171+
AND pgflow.step_states.step_slug = fail_task.step_slug;
172+
126173
-- Check if step failed by querying the step_states table
127174
SELECT (status = 'failed') INTO v_step_failed
128175
FROM pgflow.step_states
@@ -146,6 +193,33 @@ IF v_step_failed THEN
146193
);
147194
END IF;
148195

196+
-- Handle step skipping (when_failed = 'skip' or 'skip-cascade')
197+
IF v_step_skipped THEN
198+
-- Send broadcast event for step skipped
199+
PERFORM realtime.send(
200+
jsonb_build_object(
201+
'event_type', 'step:skipped',
202+
'run_id', fail_task.run_id,
203+
'step_slug', fail_task.step_slug,
204+
'status', 'skipped',
205+
'skip_reason', 'handler_failed',
206+
'error_message', fail_task.error_message,
207+
'skipped_at', now()
208+
),
209+
concat('step:', fail_task.step_slug, ':skipped'),
210+
concat('pgflow:run:', fail_task.run_id),
211+
false
212+
);
213+
214+
-- For skip-cascade: cascade skip to all downstream dependents
215+
IF v_when_failed = 'skip-cascade' THEN
216+
PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed');
217+
END IF;
218+
219+
-- Try to complete the run (remaining_steps may now be 0)
220+
PERFORM pgflow.maybe_complete_run(fail_task.run_id);
221+
END IF;
222+
149223
-- Send broadcast event for run failure if the run was failed
150224
IF v_run_failed THEN
151225
DECLARE

0 commit comments

Comments
 (0)