Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 8 additions & 26 deletions app/workflows/__tests__/backfillTrackStep.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { backfillTrackStep } from "../backfillTrackStep";

import { fetchSongstatsWithBackoff } from "@/lib/songstats/fetchSongstatsWithBackoff";
import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements";
import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger";
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";

vi.mock("@/lib/songstats/fetchSongstatsWithBackoff", () => ({
Expand All @@ -12,9 +11,6 @@ vi.mock("@/lib/songstats/fetchSongstatsWithBackoff", () => ({
vi.mock("@/lib/supabase/song_measurements/upsertSongMeasurements", () => ({
upsertSongMeasurements: vi.fn(),
}));
vi.mock("@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger", () => ({
insertSongstatsQuotaLedger: vi.fn(),
}));
vi.mock("@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue", () => ({
updateSongstatsBackfillQueue: vi.fn(),
}));
Expand All @@ -28,7 +24,7 @@ describe("backfillTrackStep", () => {
vi.mocked(upsertSongMeasurements).mockResolvedValue([] as never);
});

it("writes the historic series, records the spend, marks done on 200", async () => {
it("writes the historic series and marks done on 200", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 200,
attempts: 1,
Expand Down Expand Up @@ -61,15 +57,11 @@ describe("backfillTrackStep", () => {
raw_ref: "songstats-backfill",
},
]);
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959",
});
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "done" });
expect(result).toEqual({ ok: true, hitsSpent: 1 });
expect(result).toEqual({ ok: true });
});

it("DEFERS (pending, no quota hit, signals stop) when backoff is exhausted on 429", async () => {
it("DEFERS (pending, signals stop) when backoff is exhausted on 429", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 429,
attempts: 6,
Expand All @@ -79,14 +71,12 @@ describe("backfillTrackStep", () => {

const result = await backfillTrackStep(ROW);

// left pending for the next drain; NO ledger hit (Songstats consumed nothing)
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "pending" });
expect(insertSongstatsQuotaLedger).not.toHaveBeenCalled();
expect(upsertSongMeasurements).not.toHaveBeenCalled();
expect(result).toEqual({ ok: false, hitsSpent: 0, deferred: true });
expect(result).toEqual({ ok: false, deferred: true });
});

it("marks a definitive 404 (no history) as done and records the spend", async () => {
it("marks a definitive 404 (no history) as done", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 404,
attempts: 1,
Expand All @@ -97,14 +87,10 @@ describe("backfillTrackStep", () => {
const result = await backfillTrackStep(ROW);

expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "done" });
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959 (no data 404)",
});
expect(result).toEqual({ ok: false, hitsSpent: 1 });
expect(result).toEqual({ ok: false });
});

it("marks a permanent 4xx (403) as done (terminal) and records the spend", async () => {
it("marks a permanent 4xx (403) as done (terminal)", async () => {
vi.mocked(fetchSongstatsWithBackoff).mockResolvedValue({
status: 403,
attempts: 1,
Expand All @@ -115,10 +101,6 @@ describe("backfillTrackStep", () => {
const result = await backfillTrackStep(ROW);

expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith(["q1"], { status: "done" });
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959 (terminal 403)",
});
expect(result).toEqual({ ok: false, hitsSpent: 1 });
expect(result).toEqual({ ok: false });
});
});
32 changes: 0 additions & 32 deletions app/workflows/__tests__/getBackfillBudgetStep.test.ts

This file was deleted.

24 changes: 10 additions & 14 deletions app/workflows/__tests__/songstatsBackfillWorkflow.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { songstatsBackfillWorkflow } from "../songstatsBackfillWorkflow";

import { getBackfillBudgetStep } from "../getBackfillBudgetStep";
import { claimBackfillRowsStep } from "../claimBackfillRowsStep";
import { backfillTrackStep } from "../backfillTrackStep";
import { releaseClaimedRowsStep } from "../releaseClaimedRowsStep";

vi.mock("../getBackfillBudgetStep", () => ({ getBackfillBudgetStep: vi.fn() }));
vi.mock("../claimBackfillRowsStep", () => ({ claimBackfillRowsStep: vi.fn() }));
vi.mock("../backfillTrackStep", () => ({ backfillTrackStep: vi.fn() }));
vi.mock("../releaseClaimedRowsStep", () => ({ releaseClaimedRowsStep: vi.fn() }));
Expand All @@ -21,41 +19,39 @@ describe("songstatsBackfillWorkflow", () => {
});

it("releases the rest of the claimed batch to pending when a track defers", async () => {
vi.mocked(getBackfillBudgetStep).mockResolvedValue(100);
vi.mocked(claimBackfillRowsStep).mockResolvedValue([row("r1"), row("r2"), row("r3")]);
vi.mocked(backfillTrackStep)
.mockResolvedValueOnce({ ok: true, hitsSpent: 1 }) // r1
.mockResolvedValueOnce({ ok: false, hitsSpent: 0, deferred: true }); // r2 defers
.mockResolvedValueOnce({ ok: true }) // r1
.mockResolvedValueOnce({ ok: false, deferred: true }); // r2 defers

const result = await songstatsBackfillWorkflow();

// r2 is set pending by the step itself; the unprocessed remainder (r3) is released here
expect(releaseClaimedRowsStep).toHaveBeenCalledWith(["r3"]);
expect(backfillTrackStep).toHaveBeenCalledTimes(2); // stopped at the defer, never reached r3
expect(result).toEqual({ backfilled: 1, failed: 0, deferred: true });
expect(result).toEqual({ backfilled: 1, terminal: 0, deferred: true });
});

it("drains until the queue is empty and never releases when nothing defers", async () => {
vi.mocked(getBackfillBudgetStep).mockResolvedValue(100);
vi.mocked(claimBackfillRowsStep)
.mockResolvedValueOnce([row("a"), row("b")])
.mockResolvedValueOnce([]); // queue drained
vi.mocked(backfillTrackStep)
.mockResolvedValueOnce({ ok: true, hitsSpent: 1 })
.mockResolvedValueOnce({ ok: false, hitsSpent: 1 }); // terminal (e.g. 404)
.mockResolvedValueOnce({ ok: true })
.mockResolvedValueOnce({ ok: false }); // terminal (e.g. 404)

const result = await songstatsBackfillWorkflow();

expect(releaseClaimedRowsStep).not.toHaveBeenCalled();
expect(result).toEqual({ backfilled: 1, failed: 1, deferred: false });
expect(result).toEqual({ backfilled: 1, terminal: 1, deferred: false });
});

it("does not drain when there is no budget", async () => {
vi.mocked(getBackfillBudgetStep).mockResolvedValue(0);
it("stops immediately when the first claim is empty", async () => {
vi.mocked(claimBackfillRowsStep).mockResolvedValue([]);

const result = await songstatsBackfillWorkflow();

expect(claimBackfillRowsStep).not.toHaveBeenCalled();
expect(result).toEqual({ backfilled: 0, failed: 0, deferred: false });
expect(backfillTrackStep).not.toHaveBeenCalled();
expect(result).toEqual({ backfilled: 0, terminal: 0, deferred: false });
});
});
29 changes: 12 additions & 17 deletions app/workflows/backfillTrackStep.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { fetchSongstatsWithBackoff } from "@/lib/songstats/fetchSongstatsWithBackoff";
import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements";
import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger";
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";
import { historicStatsPayloadSchema } from "@/lib/research/playcounts/historicStatsPayloadSchema";
import { Tables } from "@/types/database.types";
Expand All @@ -9,21 +8,22 @@ const METRIC = "platform_displayed_play_count";

/**
* Backfill one claimed queue row, with bounded exponential backoff on Songstats'
* rate limit (Songstats is the rate authority — see chat#1797):
* rate limit (Songstats is the rate authority — see chat#1797; there is no local
* quota ledger):
* - **200** → write each history point as a permanent `songstats` measurement,
* record the spend, mark `done`.
* mark `done`.
* - **404 / other 4xx** → a real request with a definitive answer; terminal, so
* mark `done` (404 = no history) and record the spend.
* mark `done` (404 = no history).
* - **backoff exhausted** (still 429/5xx after retries) → **defer**: leave the row
* `pending` for the next drain, consume no quota, and signal the workflow to
* stop (`deferred`) — Songstats is saturated right now.
* `pending` for the next drain and signal the workflow to stop (`deferred`) —
* Songstats is saturated right now.
*
* @param row - The claimed queue row (already in_progress)
* @returns ok + hitsSpent (0 when deferred) + `deferred` when Songstats is saturated
* @returns `ok` (true on a written backfill) + `deferred` when Songstats is saturated
*/
export async function backfillTrackStep(
row: Tables<"songstats_backfill_queue">,
): Promise<{ ok: boolean; hitsSpent: number; deferred?: boolean }> {
): Promise<{ ok: boolean; deferred?: boolean }> {
"use step";
const result = await fetchSongstatsWithBackoff("tracks/historic_stats", {
isrc: row.song,
Expand All @@ -32,25 +32,21 @@ export async function backfillTrackStep(

if (result.retriesExhausted) {
// Still retryable (429 throttle / 408 / gateway 5xx) past the backoff bound —
// leave it for the next run, spend nothing.
// leave it for the next run.
console.log(
`[backfill] ${row.song} deferred (retryable ${result.status} after ${result.attempts} tries)`,
);
await updateSongstatsBackfillQueue([row.id], { status: "pending" });
return { ok: false, hitsSpent: 0, deferred: true };
return { ok: false, deferred: true };
}

if (result.status !== 200) {
const noData = result.status === 404;
console.log(
`[backfill] ${row.song} done (${noData ? "no data 404" : `terminal ${result.status}`})`,
);
await insertSongstatsQuotaLedger({
hits: 1,
purpose: `backfill ${row.song} (${noData ? "no data 404" : `terminal ${result.status}`})`,
});
await updateSongstatsBackfillQueue([row.id], { status: "done" });
return { ok: false, hitsSpent: 1 };
return { ok: false };
}

const parsed = historicStatsPayloadSchema.safeParse(result.data);
Expand All @@ -76,7 +72,6 @@ export async function backfillTrackStep(
await upsertSongMeasurements(rows);

console.log(`[backfill] ${row.song} done (${rows.length} points written)`);
await insertSongstatsQuotaLedger({ hits: 1, purpose: `backfill ${row.song}` });
await updateSongstatsBackfillQueue([row.id], { status: "done" });
return { ok: true, hitsSpent: 1 };
return { ok: true };
}
21 changes: 0 additions & 21 deletions app/workflows/getBackfillBudgetStep.ts

This file was deleted.

29 changes: 12 additions & 17 deletions app/workflows/songstatsBackfillWorkflow.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { getBackfillBudgetStep } from "@/app/workflows/getBackfillBudgetStep";
import { claimBackfillRowsStep } from "@/app/workflows/claimBackfillRowsStep";
import { backfillTrackStep } from "@/app/workflows/backfillTrackStep";
import { releaseClaimedRowsStep } from "@/app/workflows/releaseClaimedRowsStep";
Expand All @@ -8,24 +7,22 @@ const BATCH_SIZE = 25;
/**
* Durable Songstats backfill drain (recoupable/chat#1791 write path): claim
* value-ranked rows via the SKIP LOCKED RPC and backfill each track's historic
* series, with per-track exponential backoff handling Songstats' rate limit
* (chat#1797). **Stops as soon as a track defers** — Songstats still
* rate-limiting it past the backoff bound — releasing the rest of the claimed
* batch back to `pending` (so the next drain retries them immediately rather
* than waiting on stale-reclaim) instead of hammering a saturated API. Every
* successful hit converts into permanent owned data (fetch-once: captured
* history is never refetched).
* series. Per-track exponential backoff absorbs Songstats' rate limit; the run
* **stops as soon as a track defers** (still retryable past the backoff bound),
* releasing the rest of the claimed batch back to `pending` so the next drain
* retries them immediately rather than waiting on stale-reclaim. Otherwise it
* drains until the queue has no claimable `pending` rows. Every backfill
* converts into permanent owned data (fetch-once).
*/
export async function songstatsBackfillWorkflow() {
"use workflow";

let budget = await getBackfillBudgetStep();
let backfilled = 0;
let failed = 0;
let terminal = 0;
let deferred = false;

drain: while (budget > 0) {
const rows = await claimBackfillRowsStep(Math.min(budget, BATCH_SIZE));
drain: while (true) {
const rows = await claimBackfillRowsStep(BATCH_SIZE);
if (rows.length === 0) break;
console.log(`[songstats-backfill] claimed ${rows.length} rows`);

Expand All @@ -39,16 +36,14 @@ export async function songstatsBackfillWorkflow() {
await releaseClaimedRowsStep(rows.slice(i + 1).map(r => r.id));
break drain;
}
budget -= result.hitsSpent;
if (result.ok) backfilled += 1;
else failed += 1;
if (budget <= 0) break;
else terminal += 1;
}
}

console.log(
`[songstats-backfill] done: ${backfilled} backfilled, ${failed} terminal` +
`[songstats-backfill] done: ${backfilled} backfilled, ${terminal} terminal` +
(deferred ? ", deferred (rate-limited)" : ""),
);
return { backfilled, failed, deferred };
return { backfilled, terminal, deferred };
}

This file was deleted.

Loading
Loading