Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions app/api/internal/playcount-maintenance/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { NextRequest, NextResponse } from "next/server";
import { playcountMaintenanceHandler } from "@/lib/research/playcounts/playcountMaintenanceHandler";

export const dynamic = "force-dynamic";
export const fetchCache = "force-no-store";
export const revalidate = 0;

/**
* GET /api/internal/playcount-maintenance — Vercel Cron entrypoint that
* drains the Songstats backfill queue (budget-gated) and re-runs due monthly
* snapshots. Cron-only (CRON_SECRET bearer).
*
* @param request - The incoming Next.js request.
* @returns A NextResponse with the started run ids.
*/
export async function GET(request: NextRequest): Promise<NextResponse> {
return playcountMaintenanceHandler(request);
}
26 changes: 26 additions & 0 deletions app/api/research/snapshots/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { NextRequest, NextResponse } from "next/server";
import { getCorsHeaders } from "@/lib/networking/getCorsHeaders";
import { createSnapshotHandler } from "@/lib/research/playcounts/createSnapshotHandler";

export const maxDuration = 60;

/**
* OPTIONS /api/research/snapshots — CORS preflight.
*
* @returns CORS-enabled 200 response
*/
export async function OPTIONS() {
return new NextResponse(null, { status: 200, headers: getCorsHeaders() });
}

/**
* POST /api/research/snapshots — snapshot platform-displayed play counts for
* a catalog / album list / ISRC list. Returns 202 with `snapshot_id` and a
* cost estimate before any scraper spend; 429 at the per-org monthly cap.
*
* @param request - body: exactly one of catalog_id / album_ids / isrcs
* @returns JSON snapshot job or error
*/
export async function POST(request: NextRequest) {
return createSnapshotHandler(request);
}
93 changes: 93 additions & 0 deletions app/workflows/__tests__/backfillTrackStep.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { backfillTrackStep } from "../backfillTrackStep";

import { fetchSongstats } from "@/lib/songstats/fetchSongstats";
import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements";
import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger";
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";

vi.mock("@/lib/songstats/fetchSongstats", () => ({ fetchSongstats: vi.fn() }));
vi.mock("@/lib/supabase/song_measurements/upsertSongMeasurements", () => ({
upsertSongMeasurements: vi.fn(),
}));
vi.mock("@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger", () => ({
insertSongstatsQuotaLedger: vi.fn(),
}));
vi.mock("@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue", () => ({
updateSongstatsBackfillQueue: vi.fn(),
}));

const ROW = { id: "q1", song: "USA2P2015959" } as never;

describe("backfillTrackStep", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(upsertSongMeasurements).mockResolvedValue([] as never);
});

it("writes the historic series as songstats measurements, records spend, marks done", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({
status: 200,
data: {
stats: [
{
source: "spotify",
data: {
history: [
{ date: "2025-01-01", streams_total: 1008736324 },
{ date: "2026-01-01", streams_total: 1330251464 },
],
},
},
],
},
});

const result = await backfillTrackStep(ROW);

expect(fetchSongstats).toHaveBeenCalledWith("tracks/historic_stats", {
isrc: "USA2P2015959",
source: "spotify",
});
expect(upsertSongMeasurements).toHaveBeenCalledWith([
{
song: "USA2P2015959",
platform: "spotify",
metric: "platform_displayed_play_count",
value: 1008736324,
captured_at: "2025-01-01T00:00:00.000Z",
data_source: "songstats",
raw_ref: "songstats-backfill",
},
{
song: "USA2P2015959",
platform: "spotify",
metric: "platform_displayed_play_count",
value: 1330251464,
captured_at: "2026-01-01T00:00:00.000Z",
data_source: "songstats",
raw_ref: "songstats-backfill",
},
]);
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959",
});
expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "done" });
expect(result).toEqual({ ok: true, hitsSpent: 1 });
});

it("marks the row failed on upstream error and still records the spend attempt", async () => {
vi.mocked(fetchSongstats).mockResolvedValue({ status: 429, data: {} });

const result = await backfillTrackStep(ROW);

expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "failed" });
expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({
hits: 1,
purpose: "backfill USA2P2015959 (failed 429)",
});
expect(upsertSongMeasurements).not.toHaveBeenCalled();
expect(result).toEqual({ ok: false, hitsSpent: 1 });
});
});
32 changes: 32 additions & 0 deletions app/workflows/__tests__/captureSnapshotChunkStep.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { captureSnapshotChunkStep } from "../captureSnapshotChunkStep";

import { fetchSpotifyAlbumPlayCounts } from "@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts";
import { writeAlbumPlayCounts } from "@/lib/research/playcounts/writeAlbumPlayCounts";

vi.mock("@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts", () => ({
fetchSpotifyAlbumPlayCounts: vi.fn(),
}));
vi.mock("@/lib/research/playcounts/writeAlbumPlayCounts", () => ({
writeAlbumPlayCounts: vi.fn(),
}));

describe("captureSnapshotChunkStep", () => {
beforeEach(() => vi.clearAllMocks());

it("captures a chunk and writes measurements with snapshot lineage", async () => {
vi.mocked(fetchSpotifyAlbumPlayCounts).mockResolvedValue({
runId: "run_7",
albums: [{ name: "A", tracks: [] }],
});
vi.mocked(writeAlbumPlayCounts).mockResolvedValue(12);

const written = await captureSnapshotChunkStep("snap_1", ["a1", "a2"]);

expect(fetchSpotifyAlbumPlayCounts).toHaveBeenCalledWith(["a1", "a2"]);
expect(writeAlbumPlayCounts).toHaveBeenCalledWith([{ name: "A", tracks: [] }], "run_7", {
snapshotId: "snap_1",
});
expect(written).toBe(12);
});
});
32 changes: 32 additions & 0 deletions app/workflows/__tests__/getBackfillBudgetStep.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { getBackfillBudgetStep } from "../getBackfillBudgetStep";

import { selectSongstatsQuotaSpent } from "@/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent";

vi.mock("@/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent", () => ({
selectSongstatsQuotaSpent: vi.fn(),
}));

describe("getBackfillBudgetStep", () => {
beforeEach(() => {
vi.clearAllMocks();
delete process.env.SONGSTATS_QUOTA_LIMIT;
delete process.env.SONGSTATS_QUOTA_RESERVE;
});

it("computes limit - reserve - spent over the rolling 30d window", async () => {
vi.mocked(selectSongstatsQuotaSpent).mockResolvedValue(300);

const budget = await getBackfillBudgetStep();

expect(budget).toBe(1000 - 100 - 300);
const since = vi.mocked(selectSongstatsQuotaSpent).mock.calls[0][0];
const ageDays = (Date.now() - new Date(since).getTime()) / 86400000;
expect(Math.round(ageDays)).toBe(30);
});

it("never returns negative", async () => {
vi.mocked(selectSongstatsQuotaSpent).mockResolvedValue(5000);
expect(await getBackfillBudgetStep()).toBe(0);
});
});
62 changes: 62 additions & 0 deletions app/workflows/backfillTrackStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { fetchSongstats } from "@/lib/songstats/fetchSongstats";
import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements";
import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger";
import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue";
import { historicStatsPayloadSchema } from "@/lib/research/playcounts/historicStatsPayloadSchema";
import { Tables } from "@/types/database.types";

const METRIC = "platform_displayed_play_count";

/**
* Backfill one claimed queue row: fetch the track's Songstats historic series
* (one quota hit — recorded win or lose), write each point as a permanent
* `songstats`-labeled measurement, and close the row. Failures mark the row
* failed without failing the run — the next row may still succeed.
*
* @param row - The claimed queue row (already in_progress)
* @returns ok + hits spent (always 1; the hit is consumed even on failure)
*/
export async function backfillTrackStep(
row: Tables<"songstats_backfill_queue">,
): Promise<{ ok: boolean; hitsSpent: number }> {
"use step";
const result = await fetchSongstats("tracks/historic_stats", {
isrc: row.song,
source: "spotify",
});

if (result.status !== 200) {
await insertSongstatsQuotaLedger({
hits: 1,
purpose: `backfill ${row.song} (failed ${result.status})`,
});
await updateSongstatsBackfillQueue(row.id, { status: "failed" });
return { ok: false, hitsSpent: 1 };
}

const parsed = historicStatsPayloadSchema.safeParse(result.data);
const history = parsed.success
? (parsed.data.stats?.find(s => s.source === "spotify")?.data?.history ?? [])
: [];

const rows = history.flatMap(point => {
const value = (point as Record<string, unknown>).streams_total;
if (!point.date || typeof value !== "number") return [];
return [
{
song: row.song,
platform: "spotify",
metric: METRIC,
value,
captured_at: new Date(`${point.date}T00:00:00Z`).toISOString(),
data_source: "songstats",
raw_ref: "songstats-backfill",
},
];
});
await upsertSongMeasurements(rows);

await insertSongstatsQuotaLedger({ hits: 1, purpose: `backfill ${row.song}` });
await updateSongstatsBackfillQueue(row.id, { status: "done" });
return { ok: true, hitsSpent: 1 };
}
20 changes: 20 additions & 0 deletions app/workflows/captureSnapshotChunkStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { fetchSpotifyAlbumPlayCounts } from "@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts";
import { writeAlbumPlayCounts } from "@/lib/research/playcounts/writeAlbumPlayCounts";

/**
* Capture one chunk of albums for a snapshot job: one actor call for the
* chunk, measurements written with run + snapshot lineage. A step so each
* chunk retries independently and its result is persisted for replay.
*
* @param snapshotId - The snapshot job id (lineage)
* @param albumIds - Album ids in this chunk
* @returns Number of measurements written
*/
export async function captureSnapshotChunkStep(
snapshotId: string,
albumIds: string[],
): Promise<number> {
"use step";
const { runId, albums } = await fetchSpotifyAlbumPlayCounts(albumIds);
return writeAlbumPlayCounts(albums, runId, { snapshotId });
}
15 changes: 15 additions & 0 deletions app/workflows/claimBackfillRowsStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { claimSongstatsBackfillRows } from "@/lib/supabase/songstats_backfill_queue/claimSongstatsBackfillRows";
import { Tables } from "@/types/database.types";

/**
* Claim a batch of pending backfill rows from inside the workflow.
*
* @param batchSize - Max rows to claim
* @returns The claimed rows (already marked in_progress)
*/
export async function claimBackfillRowsStep(
batchSize: number,
): Promise<Tables<"songstats_backfill_queue">[]> {
"use step";
return claimSongstatsBackfillRows(batchSize);
}
21 changes: 21 additions & 0 deletions app/workflows/getBackfillBudgetStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { selectSongstatsQuotaSpent } from "@/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent";

const DEFAULT_QUOTA_LIMIT = 1000;
const DEFAULT_QUOTA_RESERVE = 100;
const WINDOW_DAYS = 30;

/**
* Remaining Songstats budget for this run: plan limit minus a reserve (kept
* for request-path fallbacks and manual research) minus hits spent in the
* rolling 30-day window. Never negative.
*
* @returns Hits the backfill worker may spend now
*/
export async function getBackfillBudgetStep(): Promise<number> {
"use step";
const limit = Number(process.env.SONGSTATS_QUOTA_LIMIT) || DEFAULT_QUOTA_LIMIT;
const reserve = Number(process.env.SONGSTATS_QUOTA_RESERVE) || DEFAULT_QUOTA_RESERVE;
const since = new Date(Date.now() - WINDOW_DAYS * 24 * 60 * 60 * 1000).toISOString();
const spent = await selectSongstatsQuotaSpent(since);
return Math.max(0, limit - reserve - spent);
}
16 changes: 16 additions & 0 deletions app/workflows/getSnapshotStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { selectPlaycountSnapshots } from "@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots";
import { Tables } from "@/types/database.types";

/**
* Load a snapshot job row from inside the workflow.
*
* @param snapshotId - The snapshot job id
* @returns The row
* @throws Error when the snapshot does not exist (fatal — no retry value)
*/
export async function getSnapshotStep(snapshotId: string): Promise<Tables<"playcount_snapshots">> {
"use step";
const rows = await selectPlaycountSnapshots({ id: snapshotId });
if (rows.length === 0) throw new Error(`Snapshot ${snapshotId} not found`);
return rows[0];
}
16 changes: 16 additions & 0 deletions app/workflows/markSnapshotStep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { updatePlaycountSnapshot } from "@/lib/supabase/playcount_snapshots/updatePlaycountSnapshot";
import { TablesUpdate } from "@/types/database.types";

/**
* Persist a snapshot job state transition from inside the workflow.
*
* @param snapshotId - The snapshot job id
* @param fields - Fields to update (state, counts)
*/
export async function markSnapshotStep(
snapshotId: string,
fields: TablesUpdate<"playcount_snapshots">,
): Promise<void> {
"use step";
await updatePlaycountSnapshot(snapshotId, fields);
}
35 changes: 35 additions & 0 deletions app/workflows/playcountSnapshotWorkflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { getSnapshotStep } from "@/app/workflows/getSnapshotStep";
import { markSnapshotStep } from "@/app/workflows/markSnapshotStep";
import { captureSnapshotChunkStep } from "@/app/workflows/captureSnapshotChunkStep";

const CHUNK_SIZE = 100;

/**
* Durable snapshot capture (recoupable/chat#1791 write path): mark the job
* running, capture albums in retryable chunks (one actor call per chunk,
* measurements written with run + snapshot lineage), mark done/failed.
* Started fire-and-forget from `createSnapshot`; observable in the Vercel
* dashboard like the sibling workflows.
*/
export async function playcountSnapshotWorkflow(snapshotId: string) {
"use workflow";

try {
const snapshot = await getSnapshotStep(snapshotId);
await markSnapshotStep(snapshotId, { state: "running" });

const albumIds = snapshot.album_ids ?? [];
let written = 0;
for (let i = 0; i < albumIds.length; i += CHUNK_SIZE) {
written += await captureSnapshotChunkStep(snapshotId, albumIds.slice(i, i + CHUNK_SIZE));
}

await markSnapshotStep(snapshotId, { state: "done" });
return { success: true as const, measurementsWritten: written };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(`[playcount-snapshot] Failed for ${snapshotId}:`, message);
await markSnapshotStep(snapshotId, { state: "failed" });
return { success: false as const, error: message };
}
}
Loading
Loading