Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 42 additions & 50 deletions app/workflows/__tests__/backfillTrackStep.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { backfillTrackStep } from "../backfillTrackStep";

import { fetchSongstats } from "@/lib/songstats/fetchSongstats";
import { fetchSongstatsWithBackoff } from "@/lib/songstats/fetchSongstatsWithBackoff";
import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements";
import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger";
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";

vi.mock("@/lib/songstats/fetchSongstats", () => ({ fetchSongstats: vi.fn() }));
vi.mock("@/lib/songstats/fetchSongstatsWithBackoff", () => ({
fetchSongstatsWithBackoff: vi.fn(),
}));
vi.mock("@/lib/supabase/song_measurements/upsertSongMeasurements", () => ({
upsertSongMeasurements: vi.fn(),
}));
Expand All @@ -22,33 +24,32 @@ const ROW = { id: "q1", song: "USA2P2015959" } as never;
describe("backfillTrackStep", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.spyOn(console, "log").mockImplementation(() => {});
vi.mocked(upsertSongMeasurements).mockResolvedValue([] as never);
});

it("writes the historic series as songstats measurements, records spend, marks done", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({
it("writes the historic series, records the spend, marks done on 200", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 200,
attempts: 1,
retriesExhausted: false,
data: {
stats: [
{
source: "spotify",
data: {
history: [
{ date: "2025-01-01", streams_total: 1008736324 },
{ date: "2026-01-01", streams_total: 1330251464 },
],
},
data: { history: [{ date: "2025-01-01", streams_total: 1008736324 }] },
},
],
},
});

const result = await backfillTrackStep(ROW);

expect(fetchSongstats).toHaveBeenCalledWith("tracks/historic_stats", {
expect(fetchSongstatsWithBackoff).toHaveBeenCalledWith("tracks/historic_stats", {
isrc: "USA2P2015959",
source: "spotify",
});
// exact transformation: each history point → a permanent songstats measurement
expect(upsertSongMeasurements).toHaveBeenCalledWith([
{
song: "USA2P2015959",
Expand All @@ -59,74 +60,65 @@ describe("backfillTrackStep", () => {
data_source: "songstats",
raw_ref: "songstats-backfill",
},
{
song: "USA2P2015959",
platform: "spotify",
metric: "platform_displayed_play_count",
value: 1330251464,
captured_at: "2026-01-01T00:00:00.000Z",
data_source: "songstats",
raw_ref: "songstats-backfill",
},
]);
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959",
});
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "done" });
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "done" });
expect(result).toEqual({ ok: true, hitsSpent: 1 });
});

it("marks a transient upstream error (429) as failed (reclaimable) and records the spend", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({ status: 429, data: {} });

const result = await backfillTrackStep(ROW);

// transient -> 'failed' so the daily reclaim sweep returns it to 'pending'
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "failed" });
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959 (failed 429)",
it("DEFERS (pending, no quota hit, signals stop) when backoff is exhausted on 429", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 429,
attempts: 6,
retriesExhausted: true,
data: {},
});
expect(upsertSongMeasurements).not.toHaveBeenCalled();
expect(result).toEqual({ ok: false, hitsSpent: 1 });
});

it("marks a transient 5xx as failed (reclaimable)", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({ status: 504, data: {} });

const result = await backfillTrackStep(ROW);

expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "failed" });
expect(result).toEqual({ ok: false, hitsSpent: 1 });
// left pending for the next drain; NO ledger hit (Songstats consumed nothing)
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "pending" });
expect(insertSongstatsQuotaLedger).not.toHaveBeenCalled();
expect(upsertSongMeasurements).not.toHaveBeenCalled();
expect(result).toEqual({ ok: false, hitsSpent: 0, deferred: true });
});

it("marks a permanent client error (403) as done so reclaim never recycles it", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({ status: 403, data: {} });
it("marks a definitive 404 (no history) as done and records the spend", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 404,
attempts: 1,
retriesExhausted: false,
data: {},
});

const result = await backfillTrackStep(ROW);

// non-retryable 4xx (not 408/429) is terminal -> 'done', not 'failed'
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "done" });
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "done" });
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959 (terminal 403)",
purpose: "backfill USA2P2015959 (no data 404)",
});
expect(result).toEqual({ ok: false, hitsSpent: 1 });
});

it("marks a definitive 404 (no history exists) as done so it is never retried", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({ status: 404, data: {} });
it("marks a permanent 4xx (403) as done (terminal) and records the spend", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 403,
attempts: 1,
retriesExhausted: false,
data: {},
});

const result = await backfillTrackStep(ROW);

// terminal no-data -> 'done', not 'failed' — the reclaim sweep must not resurrect it
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "done" });
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "done" });
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959 (no data 404)",
purpose: "backfill USA2P2015959 (terminal 403)",
});
expect(upsertSongMeasurements).not.toHaveBeenCalled();
expect(result).toEqual({ ok: false, hitsSpent: 1 });
});
});
61 changes: 61 additions & 0 deletions app/workflows/__tests__/songstatsBackfillWorkflow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { songstatsBackfillWorkflow } from "../songstatsBackfillWorkflow";

import { getBackfillBudgetStep } from "../getBackfillBudgetStep";
import { claimBackfillRowsStep } from "../claimBackfillRowsStep";
import { backfillTrackStep } from "../backfillTrackStep";
import { releaseClaimedRowsStep } from "../releaseClaimedRowsStep";

vi.mock("../getBackfillBudgetStep", () => ({ getBackfillBudgetStep: vi.fn() }));
vi.mock("../claimBackfillRowsStep", () => ({ claimBackfillRowsStep: vi.fn() }));
vi.mock("../backfillTrackStep", () => ({ backfillTrackStep: vi.fn() }));
vi.mock("../releaseClaimedRowsStep", () => ({ releaseClaimedRowsStep: vi.fn() }));

const row = (id: string) => ({ id, song: id }) as never;

describe("songstatsBackfillWorkflow", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.spyOn(console, "log").mockImplementation(() => {});
vi.mocked(releaseClaimedRowsStep).mockResolvedValue(undefined);
});

it("releases the rest of the claimed batch to pending when a track defers", async () => {
vi.mocked(getBackfillBudgetStep).mockResolvedValue(100);
vi.mocked(claimBackfillRowsStep).mockResolvedValue([row("r1"), row("r2"), row("r3")]);
vi.mocked(backfillTrackStep)
.mockResolvedValueOnce({ ok: true, hitsSpent: 1 }) // r1
.mockResolvedValueOnce({ ok: false, hitsSpent: 0, deferred: true }); // r2 defers

const result = await songstatsBackfillWorkflow();

// r2 is set pending by the step itself; the unprocessed remainder (r3) is released here
expect(releaseClaimedRowsStep).toHaveBeenCalledWith(["r3"]);
expect(backfillTrackStep).toHaveBeenCalledTimes(2); // stopped at the defer, never reached r3
expect(result).toEqual({ backfilled: 1, failed: 0, deferred: true });
});

it("drains until the queue is empty and never releases when nothing defers", async () => {
vi.mocked(getBackfillBudgetStep).mockResolvedValue(100);
vi.mocked(claimBackfillRowsStep)
.mockResolvedValueOnce([row("a"), row("b")])
.mockResolvedValueOnce([]); // queue drained
vi.mocked(backfillTrackStep)
.mockResolvedValueOnce({ ok: true, hitsSpent: 1 })
.mockResolvedValueOnce({ ok: false, hitsSpent: 1 }); // terminal (e.g. 404)

const result = await songstatsBackfillWorkflow();

expect(releaseClaimedRowsStep).not.toHaveBeenCalled();
expect(result).toEqual({ backfilled: 1, failed: 1, deferred: false });
});

it("does not drain when there is no budget", async () => {
vi.mocked(getBackfillBudgetStep).mockResolvedValue(0);

const result = await songstatsBackfillWorkflow();

expect(claimBackfillRowsStep).not.toHaveBeenCalled();
expect(result).toEqual({ backfilled: 0, failed: 0, deferred: false });
});
});
59 changes: 34 additions & 25 deletions app/workflows/backfillTrackStep.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fetchSongstats } from "@/lib/songstats/fetchSongstats";
import { fetchSongstatsWithBackoff } from "@/lib/songstats/fetchSongstatsWithBackoff";
import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements";
import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger";
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";
Expand All @@ -8,40 +8,48 @@ import { Tables } from "@/types/database.types";
const METRIC = "platform_displayed_play_count";

/**
* Backfill one claimed queue row: fetch the track's Songstats historic series
* (one quota hit — recorded win or lose), write each point as a permanent
* `songstats`-labeled measurement, and close the row. Failures mark the row
* failed without failing the run — the next row may still succeed.
* Backfill one claimed queue row, with bounded exponential backoff on Songstats'
* rate limit (Songstats is the rate authority — see chat#1797):
* - **200** → write each history point as a permanent `songstats` measurement,
* record the spend, mark `done`.
* - **404 / other 4xx** → a real request with a definitive answer; terminal, so
* mark `done` (404 = no history) and record the spend.
* - **backoff exhausted** (still 429/5xx after retries) → **defer**: leave the row
* `pending` for the next drain, consume no quota, and signal the workflow to
* stop (`deferred`) — Songstats is saturated right now.
*
* @param row - The claimed queue row (already in_progress)
* @returns ok + hits spent (always 1; the hit is consumed even on failure)
* @returns ok + hitsSpent (0 when deferred) + `deferred` when Songstats is saturated
*/
export async function backfillTrackStep(
row: Tables<"songstats_backfill_queue">,
): Promise<{ ok: boolean; hitsSpent: number }> {
): Promise<{ ok: boolean; hitsSpent: number; deferred?: boolean }> {
"use step";
const result = await fetchSongstats("tracks/historic_stats", {
const result = await fetchSongstatsWithBackoff("tracks/historic_stats", {
isrc: row.song,
source: "spotify",
});

if (result.status !== 200) {
const status = result.status;
const isNoData = status === 404;
// Only transient errors are retryable: 408 (timeout), 429 (quota), any 5xx.
const isRetryable = status === 408 || status === 429 || status >= 500;

// `failed` is reclaimable (the daily sweep returns it to `pending`, bounded
// by the rolling-window budget). 404 no-data and other permanent 4xx are
// terminal → `done`, so reclaim never recycles a track that can't succeed.
const nextStatus = isRetryable ? "failed" : "done";

let outcome = `terminal ${status}`;
if (isNoData) outcome = "no data 404";
else if (isRetryable) outcome = `failed ${status}`;
if (result.retriesExhausted) {
// Still retryable (429 throttle / 408 / gateway 5xx) past the backoff bound —
// leave it for the next run, spend nothing.
console.log(
`[backfill] ${row.song} deferred (retryable ${result.status} after ${result.attempts} tries)`,
);
await updateSongstatsBackfillQueue([row.id], { status: "pending" });
return { ok: false, hitsSpent: 0, deferred: true };
}

await insertSongstatsQuotaLedger({ hits: 1, purpose: `backfill ${row.song} (${outcome})` });
await updateSongstatsBackfillQueue(row.id, { status: nextStatus });
if (result.status !== 200) {
const noData = result.status === 404;
console.log(
`[backfill] ${row.song} done (${noData ? "no data 404" : `terminal ${result.status}`})`,
);
await insertSongstatsQuotaLedger({
hits: 1,
purpose: `backfill ${row.song} (${noData ? "no data 404" : `terminal ${result.status}`})`,
});
await updateSongstatsBackfillQueue([row.id], { status: "done" });
return { ok: false, hitsSpent: 1 };
}

Expand All @@ -67,7 +75,8 @@ export async function backfillTrackStep(
});
await upsertSongMeasurements(rows);

console.log(`[backfill] ${row.song} done (${rows.length} points written)`);
await insertSongstatsQuotaLedger({ hits: 1, purpose: `backfill ${row.song}` });
await updateSongstatsBackfillQueue(row.id, { status: "done" });
await updateSongstatsBackfillQueue([row.id], { status: "done" });
return { ok: true, hitsSpent: 1 };
}
15 changes: 15 additions & 0 deletions app/workflows/releaseClaimedRowsStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";

/**
* Durable step: return unprocessed claimed rows to `pending` when the drain
* stops early on a defer, so the next run retries them immediately instead of
* waiting on the stale-reclaim sweep.
*
* @param ids - Queue row ids still `in_progress` from the aborted batch.
*/
export async function releaseClaimedRowsStep(ids: string[]): Promise<void> {
"use step";
if (ids.length === 0) return;
await updateSongstatsBackfillQueue(ids, { status: "pending" });
console.log(`[songstats-backfill] released ${ids.length} claimed rows back to pending`);
}
38 changes: 28 additions & 10 deletions app/workflows/songstatsBackfillWorkflow.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,54 @@
import { getBackfillBudgetStep } from "@/app/workflows/getBackfillBudgetStep";
import { claimBackfillRowsStep } from "@/app/workflows/claimBackfillRowsStep";
import { backfillTrackStep } from "@/app/workflows/backfillTrackStep";
import { releaseClaimedRowsStep } from "@/app/workflows/releaseClaimedRowsStep";

const BATCH_SIZE = 25;

/**
* Durable Songstats backfill drain (recoupable/chat#1791 write path): check
* the rolling-window budget, claim value-ranked rows via the SKIP LOCKED RPC,
* backfill each track's historic series into the measurement store, and stop
* when the queue or the budget is dry. Every quota hit converts into
* permanent owned data (fetch-once: captured history is never refetched).
* Durable Songstats backfill drain (recoupable/chat#1791 write path): claim
* value-ranked rows via the SKIP LOCKED RPC and backfill each track's historic
* series, with per-track exponential backoff handling Songstats' rate limit
* (chat#1797). **Stops as soon as a track defers** — Songstats still
* rate-limiting it past the backoff bound — releasing the rest of the claimed
* batch back to `pending` (so the next drain retries them immediately rather
* than waiting on stale-reclaim) instead of hammering a saturated API. Every
* successful hit converts into permanent owned data (fetch-once: captured
* history is never refetched).
*/
export async function songstatsBackfillWorkflow() {
"use workflow";

let budget = await getBackfillBudgetStep();
let backfilled = 0;
let failed = 0;
let deferred = false;

while (budget > 0) {
drain: while (budget > 0) {
const rows = await claimBackfillRowsStep(Math.min(budget, BATCH_SIZE));
if (rows.length === 0) break;
console.log(`[songstats-backfill] claimed ${rows.length} rows`);

for (const row of rows) {
const result = await backfillTrackStep(row);
for (let i = 0; i < rows.length; i += 1) {
const result = await backfillTrackStep(rows[i]);
if (result.deferred) {
// Songstats is saturated — stop now. The deferred row is already back to
// `pending`; release the rest of this claimed batch too so they don't sit
// `in_progress` until stale-reclaim.
deferred = true;
await releaseClaimedRowsStep(rows.slice(i + 1).map(r => r.id));
break drain;
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
}
budget -= result.hitsSpent;
if (result.ok) backfilled += 1;
else failed += 1;
if (budget <= 0) break;
}
}

console.log(`[songstats-backfill] done: ${backfilled} backfilled, ${failed} failed`);
return { backfilled, failed };
console.log(
`[songstats-backfill] done: ${backfilled} backfilled, ${failed} terminal` +
(deferred ? ", deferred (rate-limited)" : ""),
);
return { backfilled, failed, deferred };
}
Loading
Loading