diff --git a/app/api/internal/playcount-maintenance/route.ts b/app/api/internal/playcount-maintenance/route.ts new file mode 100644 index 000000000..577b51ac5 --- /dev/null +++ b/app/api/internal/playcount-maintenance/route.ts @@ -0,0 +1,18 @@ +import { NextRequest, NextResponse } from "next/server"; +import { playcountMaintenanceHandler } from "@/lib/research/playcounts/playcountMaintenanceHandler"; + +export const dynamic = "force-dynamic"; +export const fetchCache = "force-no-store"; +export const revalidate = 0; + +/** + * GET /api/internal/playcount-maintenance — Vercel Cron entrypoint that + * drains the Songstats backfill queue (budget-gated) and re-runs due monthly + * snapshots. Cron-only (CRON_SECRET bearer). + * + * @param request - The incoming Next.js request. + * @returns A NextResponse with the started run ids. + */ +export async function GET(request: NextRequest): Promise { + return playcountMaintenanceHandler(request); +} diff --git a/app/api/research/snapshots/route.ts b/app/api/research/snapshots/route.ts new file mode 100644 index 000000000..46374172f --- /dev/null +++ b/app/api/research/snapshots/route.ts @@ -0,0 +1,26 @@ +import { NextRequest, NextResponse } from "next/server"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { createSnapshotHandler } from "@/lib/research/playcounts/createSnapshotHandler"; + +export const maxDuration = 60; + +/** + * OPTIONS /api/research/snapshots — CORS preflight. + * + * @returns CORS-enabled 200 response + */ +export async function OPTIONS() { + return new NextResponse(null, { status: 200, headers: getCorsHeaders() }); +} + +/** + * POST /api/research/snapshots — snapshot platform-displayed play counts for + * a catalog / album list / ISRC list. Returns 202 with `snapshot_id` and a + * cost estimate before any scraper spend; 429 at the per-org monthly cap. + * + * @param request - body: exactly one of catalog_id / album_ids / isrcs + * @returns JSON snapshot job or error + */ +export async function POST(request: NextRequest) { + return createSnapshotHandler(request); +} diff --git a/app/workflows/__tests__/backfillTrackStep.test.ts b/app/workflows/__tests__/backfillTrackStep.test.ts new file mode 100644 index 000000000..90fc701e8 --- /dev/null +++ b/app/workflows/__tests__/backfillTrackStep.test.ts @@ -0,0 +1,93 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { backfillTrackStep } from "../backfillTrackStep"; + +import { fetchSongstats } from "@/lib/songstats/fetchSongstats"; +import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements"; +import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger"; +import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue"; + +vi.mock("@/lib/songstats/fetchSongstats", () => ({ fetchSongstats: vi.fn() })); +vi.mock("@/lib/supabase/song_measurements/upsertSongMeasurements", () => ({ + upsertSongMeasurements: vi.fn(), +})); +vi.mock("@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger", () => ({ + insertSongstatsQuotaLedger: vi.fn(), +})); +vi.mock("@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue", () => ({ + updateSongstatsBackfillQueue: vi.fn(), +})); + +const ROW = { id: "q1", song: "USA2P2015959" } as never; + +describe("backfillTrackStep", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(upsertSongMeasurements).mockResolvedValue([] as never); + }); + + it("writes the historic series as songstats measurements, records spend, marks done", async () => { + vi.mocked(fetchSongstats).mockResolvedValue({ + status: 200, + data: { + stats: [ + { + source: "spotify", + data: { + history: [ + { date: "2025-01-01", streams_total: 1008736324 }, + { date: "2026-01-01", streams_total: 1330251464 }, + ], + }, + }, + ], + }, + }); + + const result = await backfillTrackStep(ROW); + + expect(fetchSongstats).toHaveBeenCalledWith("tracks/historic_stats", { + isrc: "USA2P2015959", + source: "spotify", + }); + expect(upsertSongMeasurements).toHaveBeenCalledWith([ + { + song: "USA2P2015959", + platform: "spotify", + metric: "platform_displayed_play_count", + value: 1008736324, + captured_at: "2025-01-01T00:00:00.000Z", + data_source: "songstats", + raw_ref: "songstats-backfill", + }, + { + song: "USA2P2015959", + platform: "spotify", + metric: "platform_displayed_play_count", + value: 1330251464, + captured_at: "2026-01-01T00:00:00.000Z", + data_source: "songstats", + raw_ref: "songstats-backfill", + }, + ]); + expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({ + hits: 1, + purpose: "backfill USA2P2015959", + }); + expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "done" }); + expect(result).toEqual({ ok: true, hitsSpent: 1 }); + }); + + it("marks the row failed on upstream error and still records the spend attempt", async () => { + vi.mocked(fetchSongstats).mockResolvedValue({ status: 429, data: {} }); + + const result = await backfillTrackStep(ROW); + + expect(updateSongstatsBackfillQueue).toHaveBeenCalledWith("q1", { status: "failed" }); + expect(insertSongstatsQuotaLedger).toHaveBeenCalledWith({ + hits: 1, + purpose: "backfill USA2P2015959 (failed 429)", + }); + expect(upsertSongMeasurements).not.toHaveBeenCalled(); + expect(result).toEqual({ ok: false, hitsSpent: 1 }); + }); +}); diff --git a/app/workflows/__tests__/captureSnapshotChunkStep.test.ts b/app/workflows/__tests__/captureSnapshotChunkStep.test.ts new file mode 100644 index 000000000..1adf5aa47 --- /dev/null +++ b/app/workflows/__tests__/captureSnapshotChunkStep.test.ts @@ -0,0 +1,32 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { captureSnapshotChunkStep } from "../captureSnapshotChunkStep"; + +import { fetchSpotifyAlbumPlayCounts } from "@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts"; +import { writeAlbumPlayCounts } from "@/lib/research/playcounts/writeAlbumPlayCounts"; + +vi.mock("@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts", () => ({ + fetchSpotifyAlbumPlayCounts: vi.fn(), +})); +vi.mock("@/lib/research/playcounts/writeAlbumPlayCounts", () => ({ + writeAlbumPlayCounts: vi.fn(), +})); + +describe("captureSnapshotChunkStep", () => { + beforeEach(() => vi.clearAllMocks()); + + it("captures a chunk and writes measurements with snapshot lineage", async () => { + vi.mocked(fetchSpotifyAlbumPlayCounts).mockResolvedValue({ + runId: "run_7", + albums: [{ name: "A", tracks: [] }], + }); + vi.mocked(writeAlbumPlayCounts).mockResolvedValue(12); + + const written = await captureSnapshotChunkStep("snap_1", ["a1", "a2"]); + + expect(fetchSpotifyAlbumPlayCounts).toHaveBeenCalledWith(["a1", "a2"]); + expect(writeAlbumPlayCounts).toHaveBeenCalledWith([{ name: "A", tracks: [] }], "run_7", { + snapshotId: "snap_1", + }); + expect(written).toBe(12); + }); +}); diff --git a/app/workflows/__tests__/getBackfillBudgetStep.test.ts b/app/workflows/__tests__/getBackfillBudgetStep.test.ts new file mode 100644 index 000000000..6c3fb8159 --- /dev/null +++ b/app/workflows/__tests__/getBackfillBudgetStep.test.ts @@ -0,0 +1,32 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { getBackfillBudgetStep } from "../getBackfillBudgetStep"; + +import { selectSongstatsQuotaSpent } from "@/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent"; + +vi.mock("@/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent", () => ({ + selectSongstatsQuotaSpent: vi.fn(), +})); + +describe("getBackfillBudgetStep", () => { + beforeEach(() => { + vi.clearAllMocks(); + delete process.env.SONGSTATS_QUOTA_LIMIT; + delete process.env.SONGSTATS_QUOTA_RESERVE; + }); + + it("computes limit - reserve - spent over the rolling 30d window", async () => { + vi.mocked(selectSongstatsQuotaSpent).mockResolvedValue(300); + + const budget = await getBackfillBudgetStep(); + + expect(budget).toBe(1000 - 100 - 300); + const since = vi.mocked(selectSongstatsQuotaSpent).mock.calls[0][0]; + const ageDays = (Date.now() - new Date(since).getTime()) / 86400000; + expect(Math.round(ageDays)).toBe(30); + }); + + it("never returns negative", async () => { + vi.mocked(selectSongstatsQuotaSpent).mockResolvedValue(5000); + expect(await getBackfillBudgetStep()).toBe(0); + }); +}); diff --git a/app/workflows/backfillTrackStep.ts b/app/workflows/backfillTrackStep.ts new file mode 100644 index 000000000..bc250d9cb --- /dev/null +++ b/app/workflows/backfillTrackStep.ts @@ -0,0 +1,62 @@ +import { fetchSongstats } from "@/lib/songstats/fetchSongstats"; +import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements"; +import { insertSongstatsQuotaLedger } from "@/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger"; +import { updateSongstatsBackfillQueue } from "@/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue"; +import { historicStatsPayloadSchema } from "@/lib/research/playcounts/historicStatsPayloadSchema"; +import { Tables } from "@/types/database.types"; + +const METRIC = "platform_displayed_play_count"; + +/** + * Backfill one claimed queue row: fetch the track's Songstats historic series + * (one quota hit — recorded win or lose), write each point as a permanent + * `songstats`-labeled measurement, and close the row. Failures mark the row + * failed without failing the run — the next row may still succeed. + * + * @param row - The claimed queue row (already in_progress) + * @returns ok + hits spent (always 1; the hit is consumed even on failure) + */ +export async function backfillTrackStep( + row: Tables<"songstats_backfill_queue">, +): Promise<{ ok: boolean; hitsSpent: number }> { + "use step"; + const result = await fetchSongstats("tracks/historic_stats", { + isrc: row.song, + source: "spotify", + }); + + if (result.status !== 200) { + await insertSongstatsQuotaLedger({ + hits: 1, + purpose: `backfill ${row.song} (failed ${result.status})`, + }); + await updateSongstatsBackfillQueue(row.id, { status: "failed" }); + return { ok: false, hitsSpent: 1 }; + } + + const parsed = historicStatsPayloadSchema.safeParse(result.data); + const history = parsed.success + ? (parsed.data.stats?.find(s => s.source === "spotify")?.data?.history ?? []) + : []; + + const rows = history.flatMap(point => { + const value = (point as Record).streams_total; + if (!point.date || typeof value !== "number") return []; + return [ + { + song: row.song, + platform: "spotify", + metric: METRIC, + value, + captured_at: new Date(`${point.date}T00:00:00Z`).toISOString(), + data_source: "songstats", + raw_ref: "songstats-backfill", + }, + ]; + }); + await upsertSongMeasurements(rows); + + await insertSongstatsQuotaLedger({ hits: 1, purpose: `backfill ${row.song}` }); + await updateSongstatsBackfillQueue(row.id, { status: "done" }); + return { ok: true, hitsSpent: 1 }; +} diff --git a/app/workflows/captureSnapshotChunkStep.ts b/app/workflows/captureSnapshotChunkStep.ts new file mode 100644 index 000000000..8afcad51d --- /dev/null +++ b/app/workflows/captureSnapshotChunkStep.ts @@ -0,0 +1,20 @@ +import { fetchSpotifyAlbumPlayCounts } from "@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts"; +import { writeAlbumPlayCounts } from "@/lib/research/playcounts/writeAlbumPlayCounts"; + +/** + * Capture one chunk of albums for a snapshot job: one actor call for the + * chunk, measurements written with run + snapshot lineage. A step so each + * chunk retries independently and its result is persisted for replay. + * + * @param snapshotId - The snapshot job id (lineage) + * @param albumIds - Album ids in this chunk + * @returns Number of measurements written + */ +export async function captureSnapshotChunkStep( + snapshotId: string, + albumIds: string[], +): Promise { + "use step"; + const { runId, albums } = await fetchSpotifyAlbumPlayCounts(albumIds); + return writeAlbumPlayCounts(albums, runId, { snapshotId }); +} diff --git a/app/workflows/claimBackfillRowsStep.ts b/app/workflows/claimBackfillRowsStep.ts new file mode 100644 index 000000000..dc1d0819d --- /dev/null +++ b/app/workflows/claimBackfillRowsStep.ts @@ -0,0 +1,15 @@ +import { claimSongstatsBackfillRows } from "@/lib/supabase/songstats_backfill_queue/claimSongstatsBackfillRows"; +import { Tables } from "@/types/database.types"; + +/** + * Claim a batch of pending backfill rows from inside the workflow. + * + * @param batchSize - Max rows to claim + * @returns The claimed rows (already marked in_progress) + */ +export async function claimBackfillRowsStep( + batchSize: number, +): Promise[]> { + "use step"; + return claimSongstatsBackfillRows(batchSize); +} diff --git a/app/workflows/getBackfillBudgetStep.ts b/app/workflows/getBackfillBudgetStep.ts new file mode 100644 index 000000000..866ee7605 --- /dev/null +++ b/app/workflows/getBackfillBudgetStep.ts @@ -0,0 +1,21 @@ +import { selectSongstatsQuotaSpent } from "@/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent"; + +const DEFAULT_QUOTA_LIMIT = 1000; +const DEFAULT_QUOTA_RESERVE = 100; +const WINDOW_DAYS = 30; + +/** + * Remaining Songstats budget for this run: plan limit minus a reserve (kept + * for request-path fallbacks and manual research) minus hits spent in the + * rolling 30-day window. Never negative. + * + * @returns Hits the backfill worker may spend now + */ +export async function getBackfillBudgetStep(): Promise { + "use step"; + const limit = Number(process.env.SONGSTATS_QUOTA_LIMIT) || DEFAULT_QUOTA_LIMIT; + const reserve = Number(process.env.SONGSTATS_QUOTA_RESERVE) || DEFAULT_QUOTA_RESERVE; + const since = new Date(Date.now() - WINDOW_DAYS * 24 * 60 * 60 * 1000).toISOString(); + const spent = await selectSongstatsQuotaSpent(since); + return Math.max(0, limit - reserve - spent); +} diff --git a/app/workflows/getSnapshotStep.ts b/app/workflows/getSnapshotStep.ts new file mode 100644 index 000000000..3a0abacb1 --- /dev/null +++ b/app/workflows/getSnapshotStep.ts @@ -0,0 +1,16 @@ +import { selectPlaycountSnapshots } from "@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots"; +import { Tables } from "@/types/database.types"; + +/** + * Load a snapshot job row from inside the workflow. + * + * @param snapshotId - The snapshot job id + * @returns The row + * @throws Error when the snapshot does not exist (fatal — no retry value) + */ +export async function getSnapshotStep(snapshotId: string): Promise> { + "use step"; + const rows = await selectPlaycountSnapshots({ id: snapshotId }); + if (rows.length === 0) throw new Error(`Snapshot ${snapshotId} not found`); + return rows[0]; +} diff --git a/app/workflows/markSnapshotStep.ts b/app/workflows/markSnapshotStep.ts new file mode 100644 index 000000000..1cfea6a68 --- /dev/null +++ b/app/workflows/markSnapshotStep.ts @@ -0,0 +1,16 @@ +import { updatePlaycountSnapshot } from "@/lib/supabase/playcount_snapshots/updatePlaycountSnapshot"; +import { TablesUpdate } from "@/types/database.types"; + +/** + * Persist a snapshot job state transition from inside the workflow. + * + * @param snapshotId - The snapshot job id + * @param fields - Fields to update (state, counts) + */ +export async function markSnapshotStep( + snapshotId: string, + fields: TablesUpdate<"playcount_snapshots">, +): Promise { + "use step"; + await updatePlaycountSnapshot(snapshotId, fields); +} diff --git a/app/workflows/playcountSnapshotWorkflow.ts b/app/workflows/playcountSnapshotWorkflow.ts new file mode 100644 index 000000000..88ef6ee17 --- /dev/null +++ b/app/workflows/playcountSnapshotWorkflow.ts @@ -0,0 +1,35 @@ +import { getSnapshotStep } from "@/app/workflows/getSnapshotStep"; +import { markSnapshotStep } from "@/app/workflows/markSnapshotStep"; +import { captureSnapshotChunkStep } from "@/app/workflows/captureSnapshotChunkStep"; + +const CHUNK_SIZE = 100; + +/** + * Durable snapshot capture (recoupable/chat#1791 write path): mark the job + * running, capture albums in retryable chunks (one actor call per chunk, + * measurements written with run + snapshot lineage), mark done/failed. + * Started fire-and-forget from `createSnapshot`; observable in the Vercel + * dashboard like the sibling workflows. + */ +export async function playcountSnapshotWorkflow(snapshotId: string) { + "use workflow"; + + try { + const snapshot = await getSnapshotStep(snapshotId); + await markSnapshotStep(snapshotId, { state: "running" }); + + const albumIds = snapshot.album_ids ?? []; + let written = 0; + for (let i = 0; i < albumIds.length; i += CHUNK_SIZE) { + written += await captureSnapshotChunkStep(snapshotId, albumIds.slice(i, i + CHUNK_SIZE)); + } + + await markSnapshotStep(snapshotId, { state: "done" }); + return { success: true as const, measurementsWritten: written }; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(`[playcount-snapshot] Failed for ${snapshotId}:`, message); + await markSnapshotStep(snapshotId, { state: "failed" }); + return { success: false as const, error: message }; + } +} diff --git a/app/workflows/songstatsBackfillWorkflow.ts b/app/workflows/songstatsBackfillWorkflow.ts new file mode 100644 index 000000000..0a1960bf9 --- /dev/null +++ b/app/workflows/songstatsBackfillWorkflow.ts @@ -0,0 +1,36 @@ +import { getBackfillBudgetStep } from "@/app/workflows/getBackfillBudgetStep"; +import { claimBackfillRowsStep } from "@/app/workflows/claimBackfillRowsStep"; +import { backfillTrackStep } from "@/app/workflows/backfillTrackStep"; + +const BATCH_SIZE = 25; + +/** + * Durable Songstats backfill drain (recoupable/chat#1791 write path): check + * the rolling-window budget, claim value-ranked rows via the SKIP LOCKED RPC, + * backfill each track's historic series into the measurement store, and stop + * when the queue or the budget is dry. Every quota hit converts into + * permanent owned data (fetch-once: captured history is never refetched). + */ +export async function songstatsBackfillWorkflow() { + "use workflow"; + + let budget = await getBackfillBudgetStep(); + let backfilled = 0; + let failed = 0; + + while (budget > 0) { + const rows = await claimBackfillRowsStep(Math.min(budget, BATCH_SIZE)); + if (rows.length === 0) break; + + for (const row of rows) { + const result = await backfillTrackStep(row); + budget -= result.hitsSpent; + if (result.ok) backfilled += 1; + else failed += 1; + if (budget <= 0) break; + } + } + + console.log(`[songstats-backfill] done: ${backfilled} backfilled, ${failed} failed`); + return { backfilled, failed }; +} diff --git a/lib/internal/__tests__/validateCronRequest.test.ts b/lib/internal/__tests__/validateCronRequest.test.ts new file mode 100644 index 000000000..7a00c3c35 --- /dev/null +++ b/lib/internal/__tests__/validateCronRequest.test.ts @@ -0,0 +1,36 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; +import { validateCronRequest } from "../validateCronRequest"; + +const req = (auth?: string) => + new NextRequest("http://x/api/internal/playcount-maintenance", { + headers: auth ? { authorization: auth } : {}, + }); + +describe("validateCronRequest", () => { + beforeEach(() => { + process.env.CRON_SECRET = "s3cr3t"; + }); + afterEach(() => { + delete process.env.CRON_SECRET; + }); + + it("passes a correct bearer", () => { + expect(validateCronRequest(req("Bearer s3cr3t"))).toBeNull(); + }); + + it("401s wrong or missing bearer", () => { + for (const r of [req("Bearer wrong"), req()]) { + const res = validateCronRequest(r); + expect(res).toBeInstanceOf(NextResponse); + expect((res as NextResponse).status).toBe(401); + } + }); + + it("500s when CRON_SECRET is unset (misconfiguration, not open door)", () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + delete process.env.CRON_SECRET; + expect((validateCronRequest(req("Bearer s3cr3t")) as NextResponse).status).toBe(500); + consoleError.mockRestore(); + }); +}); diff --git a/lib/internal/validateCronRequest.ts b/lib/internal/validateCronRequest.ts new file mode 100644 index 000000000..a81a0b6cc --- /dev/null +++ b/lib/internal/validateCronRequest.ts @@ -0,0 +1,27 @@ +import { NextRequest, NextResponse } from "next/server"; + +/** + * Gates an internal route to Vercel Cron only. Vercel attaches + * `Authorization: Bearer ${CRON_SECRET}` to scheduled invocations; we require + * an exact match. A missing `CRON_SECRET` is a misconfiguration (500), not an + * open door. + * + * @param request - The incoming Next.js request. + * @returns A NextResponse to short-circuit on failure, or null when authorized. + */ +export function validateCronRequest(request: NextRequest): NextResponse | null { + const secret = process.env.CRON_SECRET; + if (!secret) { + console.error("[internal-cron] CRON_SECRET is not configured"); + return NextResponse.json( + { status: "error", message: "Internal server error" }, + { status: 500 }, + ); + } + + if (request.headers.get("authorization") !== `Bearer ${secret}`) { + return NextResponse.json({ status: "error", message: "Unauthorized" }, { status: 401 }); + } + + return null; +} diff --git a/lib/research/playcounts/__tests__/createSnapshot.test.ts b/lib/research/playcounts/__tests__/createSnapshot.test.ts new file mode 100644 index 000000000..d67f17060 --- /dev/null +++ b/lib/research/playcounts/__tests__/createSnapshot.test.ts @@ -0,0 +1,94 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { createSnapshot } from "../createSnapshot"; + +import { resolveSnapshotAlbums } from "../resolveSnapshotAlbums"; +import { selectPlaycountSnapshots } from "@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots"; +import { insertPlaycountSnapshot } from "@/lib/supabase/playcount_snapshots/insertPlaycountSnapshot"; +import { start } from "workflow/api"; + +vi.mock("../resolveSnapshotAlbums", () => ({ resolveSnapshotAlbums: vi.fn() })); +vi.mock("@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots", () => ({ + selectPlaycountSnapshots: vi.fn(), +})); +vi.mock("@/lib/supabase/playcount_snapshots/insertPlaycountSnapshot", () => ({ + insertPlaycountSnapshot: vi.fn(), +})); +vi.mock("workflow/api", () => ({ start: vi.fn() })); +vi.mock("@/app/workflows/playcountSnapshotWorkflow", () => ({ + playcountSnapshotWorkflow: vi.fn(), +})); + +describe("createSnapshot", () => { + beforeEach(() => { + vi.clearAllMocks(); + delete process.env.SNAPSHOT_MONTHLY_CAP_USD; + vi.mocked(selectPlaycountSnapshots).mockResolvedValue([]); + vi.mocked(insertPlaycountSnapshot).mockResolvedValue({ id: "snap_1" } as never); + vi.mocked(start).mockResolvedValue({ runId: "run_1" } as never); + }); + + it("inserts a queued job, starts the workflow, returns 202 payload", async () => { + vi.mocked(resolveSnapshotAlbums).mockResolvedValue(["a1", "a2"]); + + const result = await createSnapshot({ + accountId: "acc_1", + body: { album_ids: ["a1", "a2"], platforms: ["spotify"], schedule: "once" }, + }); + + expect(insertPlaycountSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ + account: "acc_1", + album_ids: ["a1", "a2"], + platforms: ["spotify"], + schedule: "once", + state: "queued", + album_count: 2, + estimated_cost_usd: 0.006, + }), + ); + expect(start).toHaveBeenCalled(); + expect(result).toEqual({ + data: { + status: "success", + snapshot_id: "snap_1", + state: "queued", + album_count: 2, + estimated_cost_usd: 0.006, + }, + }); + }); + + it("400s when nothing resolves to albums", async () => { + vi.mocked(resolveSnapshotAlbums).mockResolvedValue([]); + + const result = await createSnapshot({ + accountId: "acc_1", + body: { isrcs: ["UNMAPPED"], platforms: ["spotify"], schedule: "once" }, + }); + + expect(result).toEqual({ + error: "No albums resolvable from the given input — no identifier mappings exist yet", + status: 400, + }); + expect(insertPlaycountSnapshot).not.toHaveBeenCalled(); + }); + + it("429s at the per-org monthly cap", async () => { + process.env.SNAPSHOT_MONTHLY_CAP_USD = "1"; + vi.mocked(resolveSnapshotAlbums).mockResolvedValue( + Array.from({ length: 400 }, (_, i) => `a${i}`), + ); + vi.mocked(selectPlaycountSnapshots).mockResolvedValue([{ estimated_cost_usd: 0.9 }] as never); + + const result = await createSnapshot({ + accountId: "acc_1", + body: { album_ids: ["x"], platforms: ["spotify"], schedule: "once" }, + }); + + expect(result).toEqual({ + error: "Per-organization monthly snapshot cap reached", + status: 429, + }); + expect(start).not.toHaveBeenCalled(); + }); +}); diff --git a/lib/research/playcounts/__tests__/resolveSnapshotAlbums.test.ts b/lib/research/playcounts/__tests__/resolveSnapshotAlbums.test.ts new file mode 100644 index 000000000..96baabff8 --- /dev/null +++ b/lib/research/playcounts/__tests__/resolveSnapshotAlbums.test.ts @@ -0,0 +1,54 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { resolveSnapshotAlbums } from "../resolveSnapshotAlbums"; + +import { selectSongIdentifiers } from "@/lib/supabase/song_identifiers/selectSongIdentifiers"; +import { selectCatalogSongsWithArtists } from "@/lib/supabase/catalog_songs/selectCatalogSongsWithArtists"; + +vi.mock("@/lib/supabase/song_identifiers/selectSongIdentifiers", () => ({ + selectSongIdentifiers: vi.fn(), +})); +vi.mock("@/lib/supabase/catalog_songs/selectCatalogSongsWithArtists", () => ({ + selectCatalogSongsWithArtists: vi.fn(), +})); + +describe("resolveSnapshotAlbums", () => { + beforeEach(() => vi.clearAllMocks()); + + it("passes album_ids through deduped", async () => { + const result = await resolveSnapshotAlbums({ album_ids: ["a1", "a1", "a2"] }); + + expect(result).toEqual(["a1", "a2"]); + expect(selectSongIdentifiers).not.toHaveBeenCalled(); + }); + + it("resolves isrcs to album ids via identifiers", async () => { + vi.mocked(selectSongIdentifiers).mockResolvedValue([ + { song: "I1", platform: "spotify", identifier_type: "album_id", value: "a1" }, + { song: "I2", platform: "spotify", identifier_type: "album_id", value: "a1" }, + ]); + + const result = await resolveSnapshotAlbums({ isrcs: ["I1", "I2"] }); + + expect(selectSongIdentifiers).toHaveBeenCalledWith({ + platform: "spotify", + identifierType: "album_id", + songs: ["I1", "I2"], + }); + expect(result).toEqual(["a1"]); + }); + + it("resolves a catalog to album ids via its songs", async () => { + vi.mocked(selectCatalogSongsWithArtists).mockResolvedValue({ + songs: [{ isrc: "I1" }, { isrc: "I2" }], + total_count: 2, + } as never); + vi.mocked(selectSongIdentifiers).mockResolvedValue([ + { song: "I1", platform: "spotify", identifier_type: "album_id", value: "a9" }, + ]); + + const result = await resolveSnapshotAlbums({ catalog_id: "cat_1" }); + + expect(selectCatalogSongsWithArtists).toHaveBeenCalledWith({ catalogId: "cat_1" }); + expect(result).toEqual(["a9"]); + }); +}); diff --git a/lib/research/playcounts/__tests__/startDueMonthlySnapshots.test.ts b/lib/research/playcounts/__tests__/startDueMonthlySnapshots.test.ts new file mode 100644 index 000000000..96da967ca --- /dev/null +++ b/lib/research/playcounts/__tests__/startDueMonthlySnapshots.test.ts @@ -0,0 +1,64 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { startDueMonthlySnapshots } from "../startDueMonthlySnapshots"; + +import { selectPlaycountSnapshots } from "@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots"; +import { insertPlaycountSnapshot } from "@/lib/supabase/playcount_snapshots/insertPlaycountSnapshot"; +import { start } from "workflow/api"; + +vi.mock("@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots", () => ({ + selectPlaycountSnapshots: vi.fn(), +})); +vi.mock("@/lib/supabase/playcount_snapshots/insertPlaycountSnapshot", () => ({ + insertPlaycountSnapshot: vi.fn(), +})); +vi.mock("workflow/api", () => ({ start: vi.fn() })); +vi.mock("@/app/workflows/playcountSnapshotWorkflow", () => ({ + playcountSnapshotWorkflow: vi.fn(), +})); + +const monthly = (id: string, createdAt: string, albumIds: string[]) => + ({ + id, + account: "acc_1", + album_ids: albumIds, + platforms: ["spotify"], + schedule: "monthly", + state: "done", + album_count: albumIds.length, + estimated_cost_usd: 0.003, + created_at: createdAt, + }) as never; + +describe("startDueMonthlySnapshots", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-07-10T07:00:00Z")); + vi.mocked(insertPlaycountSnapshot).mockResolvedValue({ id: "snap_new" } as never); + vi.mocked(start).mockResolvedValue({ runId: "r" } as never); + }); + + it("re-runs the latest monthly snapshot per series when >30d old", async () => { + vi.mocked(selectPlaycountSnapshots).mockResolvedValue([ + monthly("snap_2", "2026-06-09T07:00:00Z", ["a1"]), + monthly("snap_1", "2026-05-09T07:00:00Z", ["a1"]), + ]); + + const started = await startDueMonthlySnapshots(); + + expect(insertPlaycountSnapshot).toHaveBeenCalledTimes(1); + expect(insertPlaycountSnapshot).toHaveBeenCalledWith( + expect.objectContaining({ account: "acc_1", album_ids: ["a1"], schedule: "monthly" }), + ); + expect(started).toBe(1); + }); + + it("does nothing when the latest run is fresh", async () => { + vi.mocked(selectPlaycountSnapshots).mockResolvedValue([ + monthly("snap_2", "2026-06-25T07:00:00Z", ["a1"]), + ]); + + expect(await startDueMonthlySnapshots()).toBe(0); + expect(insertPlaycountSnapshot).not.toHaveBeenCalled(); + }); +}); diff --git a/lib/research/playcounts/__tests__/validateCreateSnapshotRequest.test.ts b/lib/research/playcounts/__tests__/validateCreateSnapshotRequest.test.ts new file mode 100644 index 000000000..f5873f21d --- /dev/null +++ b/lib/research/playcounts/__tests__/validateCreateSnapshotRequest.test.ts @@ -0,0 +1,57 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { NextRequest, NextResponse } from "next/server"; +import { validateCreateSnapshotRequest } from "../validateCreateSnapshotRequest"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; + +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); +vi.mock("@/lib/auth/validateAuthContext", () => ({ validateAuthContext: vi.fn() })); + +const req = (body: unknown) => + new NextRequest("http://x/api/research/snapshots", { + method: "POST", + body: JSON.stringify(body), + headers: { "Content-Type": "application/json" }, + }); + +describe("validateCreateSnapshotRequest", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.mocked(validateAuthContext).mockResolvedValue({ accountId: "acc_1" } as never); + }); + + it("accepts album_ids with defaults applied", async () => { + const result = await validateCreateSnapshotRequest(req({ album_ids: ["a1"] })); + + expect(result).toEqual({ + accountId: "acc_1", + body: { album_ids: ["a1"], platforms: ["spotify"], schedule: "once" }, + }); + }); + + it("rejects more or less than exactly one input kind", async () => { + for (const body of [{}, { album_ids: ["a"], isrcs: ["i"] }]) { + const result = await validateCreateSnapshotRequest(req(body)); + expect(result).toBeInstanceOf(NextResponse); + expect((result as NextResponse).status).toBe(400); + } + }); + + it("rejects unknown platforms and schedules", async () => { + for (const body of [ + { album_ids: ["a"], platforms: ["apple_music"] }, + { album_ids: ["a"], schedule: "weekly" }, + ]) { + const result = await validateCreateSnapshotRequest(req(body)); + expect((result as NextResponse).status).toBe(400); + } + }); + + it("short-circuits with the auth response", async () => { + const denied = NextResponse.json({ status: "error" }, { status: 401 }); + vi.mocked(validateAuthContext).mockResolvedValue(denied as never); + + expect(await validateCreateSnapshotRequest(req({ album_ids: ["a"] }))).toBe(denied); + }); +}); diff --git a/lib/research/playcounts/__tests__/writeAlbumPlayCounts.test.ts b/lib/research/playcounts/__tests__/writeAlbumPlayCounts.test.ts new file mode 100644 index 000000000..9bb4fdc49 --- /dev/null +++ b/lib/research/playcounts/__tests__/writeAlbumPlayCounts.test.ts @@ -0,0 +1,64 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { writeAlbumPlayCounts } from "../writeAlbumPlayCounts"; + +import { selectSongIdentifiers } from "@/lib/supabase/song_identifiers/selectSongIdentifiers"; +import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements"; + +vi.mock("@/lib/supabase/song_identifiers/selectSongIdentifiers", () => ({ + selectSongIdentifiers: vi.fn(), +})); +vi.mock("@/lib/supabase/song_measurements/upsertSongMeasurements", () => ({ + upsertSongMeasurements: vi.fn(), +})); + +const ALBUMS = [ + { + name: "K.I.D.S. (Deluxe)", + tracks: [ + { id: "t1", name: "The Spins", streamCount: 100 }, + { id: "t_unmapped", name: "Outro", streamCount: 5 }, + ], + }, +]; + +describe("writeAlbumPlayCounts", () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-06-11T12:00:00Z")); + vi.mocked(upsertSongMeasurements).mockResolvedValue([] as never); + }); + + it("writes one measurement per mapped track with run + snapshot lineage", async () => { + vi.mocked(selectSongIdentifiers).mockResolvedValue([ + { song: "ISRC1", platform: "spotify", identifier_type: "track_id", value: "t1" }, + ]); + + const written = await writeAlbumPlayCounts(ALBUMS, "run_1", { snapshotId: "snap_1" }); + + expect(upsertSongMeasurements).toHaveBeenCalledWith([ + { + song: "ISRC1", + platform: "spotify", + metric: "platform_displayed_play_count", + value: 100, + captured_at: "2026-06-11T12:00:00.000Z", + data_source: "apify_spotify_playcount", + raw_ref: "run_1", + snapshot: "snap_1", + }, + ]); + expect(written).toBe(1); + }); + + it("omits snapshot lineage when not given", async () => { + vi.mocked(selectSongIdentifiers).mockResolvedValue([ + { song: "ISRC1", platform: "spotify", identifier_type: "track_id", value: "t1" }, + ]); + + await writeAlbumPlayCounts(ALBUMS, "run_2", {}); + + const rows = vi.mocked(upsertSongMeasurements).mock.calls[0][0]; + expect(rows[0]).not.toHaveProperty("snapshot"); + }); +}); diff --git a/lib/research/playcounts/createSnapshot.ts b/lib/research/playcounts/createSnapshot.ts new file mode 100644 index 000000000..b5f6b309e --- /dev/null +++ b/lib/research/playcounts/createSnapshot.ts @@ -0,0 +1,72 @@ +import { start } from "workflow/api"; +import { resolveSnapshotAlbums } from "@/lib/research/playcounts/resolveSnapshotAlbums"; +import { selectPlaycountSnapshots } from "@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots"; +import { insertPlaycountSnapshot } from "@/lib/supabase/playcount_snapshots/insertPlaycountSnapshot"; +import { playcountSnapshotWorkflow } from "@/app/workflows/playcountSnapshotWorkflow"; +import { CreateSnapshotBody } from "@/lib/research/playcounts/validateCreateSnapshotRequest"; + +/** Actor pricing: ~$3 per 1k album URLs. */ +const COST_PER_ALBUM_USD = 0.003; +const DEFAULT_MONTHLY_CAP_USD = 25; + +export type CreateSnapshotResult = { data: unknown } | { error: string; status: number }; + +/** + * Create a snapshot job: resolve the input to album ids, enforce the per-org + * monthly cost cap, persist the job (mints `snapshot_id`), and start the + * capture workflow. Returns the 202 payload with the cost estimate — the + * estimate is computed before any scraper spend. + * + * @param params.accountId - The authenticated account (cap scope) + * @param params.body - Validated snapshot request body + */ +export async function createSnapshot(params: { + accountId: string; + body: CreateSnapshotBody; +}): Promise { + const albumIds = await resolveSnapshotAlbums(params.body); + if (albumIds.length === 0) { + return { + error: "No albums resolvable from the given input — no identifier mappings exist yet", + status: 400, + }; + } + + const monthStart = new Date(); + monthStart.setUTCDate(1); + monthStart.setUTCHours(0, 0, 0, 0); + const monthSnapshots = await selectPlaycountSnapshots({ + account: params.accountId, + createdAfter: monthStart.toISOString(), + }); + const spentUsd = monthSnapshots.reduce((sum, row) => sum + (row.estimated_cost_usd ?? 0), 0); + const estimatedCostUsd = Number((albumIds.length * COST_PER_ALBUM_USD).toFixed(4)); + const capUsd = Number(process.env.SNAPSHOT_MONTHLY_CAP_USD) || DEFAULT_MONTHLY_CAP_USD; + if (spentUsd + estimatedCostUsd > capUsd) { + return { error: "Per-organization monthly snapshot cap reached", status: 429 }; + } + + const row = await insertPlaycountSnapshot({ + account: params.accountId, + catalog: params.body.catalog_id ?? null, + album_ids: albumIds, + isrcs: params.body.isrcs ?? null, + platforms: params.body.platforms, + schedule: params.body.schedule, + state: "queued", + album_count: albumIds.length, + estimated_cost_usd: estimatedCostUsd, + }); + + await start(playcountSnapshotWorkflow, [row.id]); + + return { + data: { + status: "success", + snapshot_id: row.id, + state: "queued", + album_count: albumIds.length, + estimated_cost_usd: estimatedCostUsd, + }, + }; +} diff --git a/lib/research/playcounts/createSnapshotHandler.ts b/lib/research/playcounts/createSnapshotHandler.ts new file mode 100644 index 000000000..e5535427f --- /dev/null +++ b/lib/research/playcounts/createSnapshotHandler.ts @@ -0,0 +1,30 @@ +import { type NextRequest, NextResponse } from "next/server"; +import { errorResponse } from "@/lib/networking/errorResponse"; +import { createSnapshot } from "@/lib/research/playcounts/createSnapshot"; +import { validateCreateSnapshotRequest } from "@/lib/research/playcounts/validateCreateSnapshotRequest"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; + +/** + * POST /api/research/snapshots + * + * Capture platform-displayed play counts for a whole catalog, album list, or + * ISRC list in one server-side snapshot. Executes asynchronously; the cost + * estimate is returned (202) before any scraper spend. + * + * @param request - The incoming HTTP request. + * @returns The JSON response. + */ +export async function createSnapshotHandler(request: NextRequest): Promise { + try { + const validated = await validateCreateSnapshotRequest(request); + if (validated instanceof NextResponse) return validated; + + const result = await createSnapshot(validated); + if ("error" in result) return errorResponse(result.error, result.status); + + return NextResponse.json(result.data, { status: 202, headers: getCorsHeaders() }); + } catch (error) { + console.error("[ERROR] createSnapshotHandler:", error); + return errorResponse("Internal error", 500); + } +} diff --git a/lib/research/playcounts/playcountMaintenanceHandler.ts b/lib/research/playcounts/playcountMaintenanceHandler.ts new file mode 100644 index 000000000..facba435b --- /dev/null +++ b/lib/research/playcounts/playcountMaintenanceHandler.ts @@ -0,0 +1,35 @@ +import { type NextRequest, NextResponse } from "next/server"; +import { start } from "workflow/api"; +import { validateCronRequest } from "@/lib/internal/validateCronRequest"; +import { songstatsBackfillWorkflow } from "@/app/workflows/songstatsBackfillWorkflow"; +import { startDueMonthlySnapshots } from "@/lib/research/playcounts/startDueMonthlySnapshots"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; + +/** + * GET /api/internal/playcount-maintenance — daily Vercel Cron entrypoint: + * starts the Songstats backfill drain workflow and re-runs due monthly + * snapshot series. Cron-only (CRON_SECRET bearer). + * + * @param request - The incoming HTTP request. + * @returns The JSON response. + */ +export async function playcountMaintenanceHandler(request: NextRequest): Promise { + const denied = validateCronRequest(request); + if (denied) return denied; + + try { + const run = await start(songstatsBackfillWorkflow); + const monthlyStarted = await startDueMonthlySnapshots(); + + return NextResponse.json( + { status: "success", backfill_run_id: run.runId, monthly_snapshots_started: monthlyStarted }, + { status: 202, headers: getCorsHeaders() }, + ); + } catch (error) { + console.error("[ERROR] playcountMaintenanceHandler:", error); + return NextResponse.json( + { status: "error", message: "Internal error" }, + { status: 500, headers: getCorsHeaders() }, + ); + } +} diff --git a/lib/research/playcounts/resolveSnapshotAlbums.ts b/lib/research/playcounts/resolveSnapshotAlbums.ts new file mode 100644 index 000000000..436d59d78 --- /dev/null +++ b/lib/research/playcounts/resolveSnapshotAlbums.ts @@ -0,0 +1,33 @@ +import { selectSongIdentifiers } from "@/lib/supabase/song_identifiers/selectSongIdentifiers"; +import { selectCatalogSongsWithArtists } from "@/lib/supabase/catalog_songs/selectCatalogSongsWithArtists"; + +export type SnapshotInput = { + catalog_id?: string; + album_ids?: string[]; + isrcs?: string[]; +}; + +/** + * Resolve a snapshot input (catalog | album ids | ISRCs) to a deduped list of + * Spotify album ids. Catalogs resolve through their songs; ISRCs through the + * album_id identifier mappings. + * + * @param input - Exactly one of catalog_id / album_ids / isrcs + * @returns Unique Spotify album ids ([] when nothing is mapped yet) + */ +export async function resolveSnapshotAlbums(input: SnapshotInput): Promise { + if (input.album_ids) return [...new Set(input.album_ids)]; + + let isrcs = input.isrcs ?? []; + if (input.catalog_id) { + const { songs } = await selectCatalogSongsWithArtists({ catalogId: input.catalog_id }); + isrcs = songs.map(row => row.isrc); + } + + const identifiers = await selectSongIdentifiers({ + platform: "spotify", + identifierType: "album_id", + songs: isrcs, + }); + return [...new Set(identifiers.map(row => row.value))]; +} diff --git a/lib/research/playcounts/startDueMonthlySnapshots.ts b/lib/research/playcounts/startDueMonthlySnapshots.ts new file mode 100644 index 000000000..cd0aa623f --- /dev/null +++ b/lib/research/playcounts/startDueMonthlySnapshots.ts @@ -0,0 +1,46 @@ +import { start } from "workflow/api"; +import { selectPlaycountSnapshots } from "@/lib/supabase/playcount_snapshots/selectPlaycountSnapshots"; +import { insertPlaycountSnapshot } from "@/lib/supabase/playcount_snapshots/insertPlaycountSnapshot"; +import { playcountSnapshotWorkflow } from "@/app/workflows/playcountSnapshotWorkflow"; + +const DUE_AFTER_DAYS = 30; + +/** + * Re-run due monthly snapshot series. A series is one (account, album set); + * its latest run being older than 30 days makes it due — a fresh job row is + * inserted (same inputs, new snapshot_id) and the capture workflow started. + * + * @returns Number of snapshot jobs started + */ +export async function startDueMonthlySnapshots(): Promise { + const monthly = await selectPlaycountSnapshots({ schedule: "monthly" }); + + const latestBySeries = new Map(); + for (const row of monthly) { + const key = `${row.account}:${[...(row.album_ids ?? [])].sort().join(",")}`; + const existing = latestBySeries.get(key); + if (!existing || (row.created_at ?? "") > (existing.created_at ?? "")) { + latestBySeries.set(key, row); + } + } + + const dueBefore = Date.now() - DUE_AFTER_DAYS * 24 * 60 * 60 * 1000; + let started = 0; + for (const last of latestBySeries.values()) { + if (new Date(last.created_at ?? 0).getTime() > dueBefore) continue; + const row = await insertPlaycountSnapshot({ + account: last.account, + catalog: last.catalog, + album_ids: last.album_ids, + isrcs: last.isrcs, + platforms: last.platforms, + schedule: "monthly", + state: "queued", + album_count: last.album_count, + estimated_cost_usd: last.estimated_cost_usd, + }); + await start(playcountSnapshotWorkflow, [row.id]); + started += 1; + } + return started; +} diff --git a/lib/research/playcounts/validateCreateSnapshotRequest.ts b/lib/research/playcounts/validateCreateSnapshotRequest.ts new file mode 100644 index 000000000..6ef87d859 --- /dev/null +++ b/lib/research/playcounts/validateCreateSnapshotRequest.ts @@ -0,0 +1,45 @@ +import { type NextRequest, NextResponse } from "next/server"; +import { z } from "zod"; +import { validateAuthContext } from "@/lib/auth/validateAuthContext"; +import { errorResponse } from "@/lib/networking/errorResponse"; + +export const createSnapshotBodySchema = z + .object({ + catalog_id: z.uuid("catalog_id must be a valid UUID").optional(), + album_ids: z.array(z.string()).min(1).optional(), + isrcs: z.array(z.string()).min(1).optional(), + platforms: z.array(z.enum(["spotify"])).default(["spotify"]), + schedule: z.enum(["once", "monthly"]).default("once"), + }) + .refine(body => [body.catalog_id, body.album_ids, body.isrcs].filter(Boolean).length === 1, { + message: "Provide exactly one of catalog_id, album_ids, isrcs", + }); + +export type CreateSnapshotBody = z.infer; + +export type ValidatedCreateSnapshotRequest = { + accountId: string; + body: CreateSnapshotBody; +}; + +/** + * Validates `POST /api/research/snapshots` — auth + body (exactly one of + * `catalog_id` / `album_ids` / `isrcs`; `platforms` currently `spotify` only; + * `schedule` once | monthly). + * + * @param request - The incoming HTTP request. + */ +export async function validateCreateSnapshotRequest( + request: NextRequest, +): Promise { + const authResult = await validateAuthContext(request); + if (authResult instanceof NextResponse) return authResult; + + const raw = await request.json().catch(() => null); + const result = createSnapshotBodySchema.safeParse(raw); + if (!result.success) { + return errorResponse(result.error.issues[0].message, 400); + } + + return { accountId: authResult.accountId, body: result.data }; +} diff --git a/lib/research/playcounts/writeAlbumPlayCounts.ts b/lib/research/playcounts/writeAlbumPlayCounts.ts new file mode 100644 index 000000000..ed31ae61c --- /dev/null +++ b/lib/research/playcounts/writeAlbumPlayCounts.ts @@ -0,0 +1,51 @@ +import { selectSongIdentifiers } from "@/lib/supabase/song_identifiers/selectSongIdentifiers"; +import { upsertSongMeasurements } from "@/lib/supabase/song_measurements/upsertSongMeasurements"; +import { SpotifyAlbumPlayCounts } from "@/lib/apify/spotify/fetchSpotifyAlbumPlayCounts"; + +const METRIC = "platform_displayed_play_count"; +const DATA_SOURCE = "apify_spotify_playcount"; + +/** + * Write actor album results into the measurement store: one row per track + * with an identifier mapping, stamped with the actor run id (and snapshot id + * when capturing for a snapshot job). Shared by the on-demand stats refresh + * and the snapshot workflow. + * + * @param albums - Parsed actor album items + * @param runId - The actor run id (raw_ref lineage) + * @param opts.snapshotId - Optional snapshot job lineage + * @returns Number of measurements written + */ +export async function writeAlbumPlayCounts( + albums: SpotifyAlbumPlayCounts[], + runId: string, + opts: { snapshotId?: string }, +): Promise { + const tracks = albums.flatMap(album => album.tracks ?? []); + const mappings = await selectSongIdentifiers({ + platform: "spotify", + identifierType: "track_id", + values: tracks.map(t => t.id), + }); + const songByTrackId = new Map(mappings.map(m => [m.value, m.song])); + const capturedAt = new Date().toISOString(); + + const rows = tracks.flatMap(track => { + const song = songByTrackId.get(track.id); + if (!song || typeof track.streamCount !== "number") return []; + return [ + { + song, + platform: "spotify", + metric: METRIC, + value: track.streamCount, + captured_at: capturedAt, + data_source: DATA_SOURCE, + raw_ref: runId, + ...(opts.snapshotId ? { snapshot: opts.snapshotId } : {}), + }, + ]; + }); + await upsertSongMeasurements(rows); + return rows.length; +} diff --git a/lib/supabase/playcount_snapshots/__tests__/insertPlaycountSnapshot.test.ts b/lib/supabase/playcount_snapshots/__tests__/insertPlaycountSnapshot.test.ts new file mode 100644 index 000000000..25345ea86 --- /dev/null +++ b/lib/supabase/playcount_snapshots/__tests__/insertPlaycountSnapshot.test.ts @@ -0,0 +1,44 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { insertPlaycountSnapshot } from "../insertPlaycountSnapshot"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +describe("insertPlaycountSnapshot", () => { + beforeEach(() => vi.clearAllMocks()); + + it("inserts a snapshot job row and returns it", async () => { + const row = { id: "snap_1", account: "acc_1", state: "queued" }; + const single = vi.fn().mockResolvedValue({ data: row, error: null }); + const select = vi.fn().mockReturnValue({ single }); + const insert = vi.fn().mockReturnValue({ select }); + vi.mocked(supabase.from).mockReturnValue({ insert } as never); + + const result = await insertPlaycountSnapshot({ + account: "acc_1", + album_ids: ["a1"], + platforms: ["spotify"], + album_count: 1, + estimated_cost_usd: 0.003, + } as never); + + expect(supabase.from).toHaveBeenCalledWith("playcount_snapshots"); + expect(insert).toHaveBeenCalled(); + expect(result).toEqual(row); + }); + + it("throws on insert error", async () => { + const single = vi.fn().mockResolvedValue({ data: null, error: { message: "boom" } }); + const select = vi.fn().mockReturnValue({ single }); + const insert = vi.fn().mockReturnValue({ select }); + vi.mocked(supabase.from).mockReturnValue({ insert } as never); + + await expect(insertPlaycountSnapshot({ account: "a" } as never)).rejects.toThrow( + "Failed to insert playcount snapshot: boom", + ); + }); +}); diff --git a/lib/supabase/playcount_snapshots/__tests__/selectPlaycountSnapshots.test.ts b/lib/supabase/playcount_snapshots/__tests__/selectPlaycountSnapshots.test.ts new file mode 100644 index 000000000..77898c271 --- /dev/null +++ b/lib/supabase/playcount_snapshots/__tests__/selectPlaycountSnapshots.test.ts @@ -0,0 +1,48 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { selectPlaycountSnapshots } from "../selectPlaycountSnapshots"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +function mockBuilder(result: { data: unknown; error: unknown }) { + const builder: Record> & { + then?: (resolve: (v: unknown) => void) => void; + } = {} as never; + for (const m of ["select", "eq", "gte", "order"]) builder[m] = vi.fn().mockReturnValue(builder); + builder.then = resolve => resolve(result); + vi.mocked(supabase.from).mockReturnValue(builder as never); + return builder; +} + +describe("selectPlaycountSnapshots", () => { + beforeEach(() => vi.clearAllMocks()); + + it("filters by account, created_at lower bound, and schedule", async () => { + const rows = [{ id: "snap_1", estimated_cost_usd: 1.71 }]; + const builder = mockBuilder({ data: rows, error: null }); + + const result = await selectPlaycountSnapshots({ + account: "acc_1", + createdAfter: "2026-06-01T00:00:00Z", + schedule: "monthly", + }); + + expect(supabase.from).toHaveBeenCalledWith("playcount_snapshots"); + expect(builder.eq).toHaveBeenCalledWith("account", "acc_1"); + expect(builder.eq).toHaveBeenCalledWith("schedule", "monthly"); + expect(builder.gte).toHaveBeenCalledWith("created_at", "2026-06-01T00:00:00Z"); + expect(result).toEqual(rows); + }); + + it("returns [] on error", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + mockBuilder({ data: null, error: { message: "boom" } }); + + expect(await selectPlaycountSnapshots({})).toEqual([]); + consoleError.mockRestore(); + }); +}); diff --git a/lib/supabase/playcount_snapshots/__tests__/updatePlaycountSnapshot.test.ts b/lib/supabase/playcount_snapshots/__tests__/updatePlaycountSnapshot.test.ts new file mode 100644 index 000000000..b01d28f9d --- /dev/null +++ b/lib/supabase/playcount_snapshots/__tests__/updatePlaycountSnapshot.test.ts @@ -0,0 +1,34 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { updatePlaycountSnapshot } from "../updatePlaycountSnapshot"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +describe("updatePlaycountSnapshot", () => { + beforeEach(() => vi.clearAllMocks()); + + it("updates snapshot fields by id", async () => { + const eq = vi.fn().mockResolvedValue({ error: null }); + const update = vi.fn().mockReturnValue({ eq }); + vi.mocked(supabase.from).mockReturnValue({ update } as never); + + await updatePlaycountSnapshot("snap_1", { state: "running" }); + + expect(update).toHaveBeenCalledWith({ state: "running" }); + expect(eq).toHaveBeenCalledWith("id", "snap_1"); + }); + + it("throws on update error", async () => { + const eq = vi.fn().mockResolvedValue({ error: { message: "boom" } }); + const update = vi.fn().mockReturnValue({ eq }); + vi.mocked(supabase.from).mockReturnValue({ update } as never); + + await expect(updatePlaycountSnapshot("snap_1", { state: "done" })).rejects.toThrow( + "Failed to update playcount snapshot: boom", + ); + }); +}); diff --git a/lib/supabase/playcount_snapshots/insertPlaycountSnapshot.ts b/lib/supabase/playcount_snapshots/insertPlaycountSnapshot.ts new file mode 100644 index 000000000..94a2ce5ee --- /dev/null +++ b/lib/supabase/playcount_snapshots/insertPlaycountSnapshot.ts @@ -0,0 +1,26 @@ +import supabase from "../serverClient"; +import { Tables, TablesInsert } from "@/types/database.types"; + +/** + * Insert a snapshot job row (mints the snapshot_id returned by + * POST /api/research/snapshots). + * + * @param snapshot - The snapshot job to insert + * @returns The inserted row + * @throws Error if the insert fails + */ +export async function insertPlaycountSnapshot( + snapshot: TablesInsert<"playcount_snapshots">, +): Promise> { + const { data, error } = await supabase + .from("playcount_snapshots") + .insert(snapshot) + .select() + .single(); + + if (error) { + throw new Error(`Failed to insert playcount snapshot: ${error.message}`); + } + + return data; +} diff --git a/lib/supabase/playcount_snapshots/selectPlaycountSnapshots.ts b/lib/supabase/playcount_snapshots/selectPlaycountSnapshots.ts new file mode 100644 index 000000000..f3a26055f --- /dev/null +++ b/lib/supabase/playcount_snapshots/selectPlaycountSnapshots.ts @@ -0,0 +1,43 @@ +import supabase from "../serverClient"; +import { Tables } from "@/types/database.types"; + +/** + * Select snapshot job rows with optional filters — by account (per-org cap + * window), creation lower bound, and schedule (due monthly re-runs). + * + * @param params.id - Optional snapshot id filter + * @param params.account - Optional account filter + * @param params.createdAfter - Optional inclusive created_at lower bound (ISO) + * @param params.schedule - Optional schedule filter ("once" | "monthly") + * @returns Matching rows newest-first, or [] if none exist or on error + */ +export async function selectPlaycountSnapshots({ + id, + account, + createdAfter, + schedule, +}: { + id?: string; + account?: string; + createdAfter?: string; + schedule?: string; +}): Promise[]> { + let query = supabase + .from("playcount_snapshots") + .select("*") + .order("created_at", { ascending: false }); + + if (id) query = query.eq("id", id); + if (account) query = query.eq("account", account); + if (schedule) query = query.eq("schedule", schedule); + if (createdAfter) query = query.gte("created_at", createdAfter); + + const { data, error } = await query; + + if (error) { + console.error("Error fetching playcount_snapshots:", error); + return []; + } + + return data || []; +} diff --git a/lib/supabase/playcount_snapshots/updatePlaycountSnapshot.ts b/lib/supabase/playcount_snapshots/updatePlaycountSnapshot.ts new file mode 100644 index 000000000..b01cbfe3d --- /dev/null +++ b/lib/supabase/playcount_snapshots/updatePlaycountSnapshot.ts @@ -0,0 +1,20 @@ +import supabase from "../serverClient"; +import { TablesUpdate } from "@/types/database.types"; + +/** + * Update a snapshot job row (state transitions, counts, cost). + * + * @param id - The snapshot id + * @param fields - Fields to update + * @throws Error if the update fails + */ +export async function updatePlaycountSnapshot( + id: string, + fields: TablesUpdate<"playcount_snapshots">, +): Promise { + const { error } = await supabase.from("playcount_snapshots").update(fields).eq("id", id); + + if (error) { + throw new Error(`Failed to update playcount snapshot: ${error.message}`); + } +} diff --git a/lib/supabase/songstats_backfill_queue/__tests__/claimSongstatsBackfillRows.test.ts b/lib/supabase/songstats_backfill_queue/__tests__/claimSongstatsBackfillRows.test.ts new file mode 100644 index 000000000..8e58559fb --- /dev/null +++ b/lib/supabase/songstats_backfill_queue/__tests__/claimSongstatsBackfillRows.test.ts @@ -0,0 +1,33 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { claimSongstatsBackfillRows } from "../claimSongstatsBackfillRows"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +describe("claimSongstatsBackfillRows", () => { + beforeEach(() => vi.clearAllMocks()); + + it("claims a batch via the FOR UPDATE SKIP LOCKED rpc", async () => { + const rows = [{ id: "q1", song: "USA2P2015959", status: "in_progress" }]; + vi.mocked(supabase.rpc).mockResolvedValue({ data: rows, error: null } as never); + + const result = await claimSongstatsBackfillRows(25); + + expect(supabase.rpc).toHaveBeenCalledWith("claim_songstats_backfill_rows", { + batch_size: 25, + }); + expect(result).toEqual(rows); + }); + + it("returns [] on rpc error", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + vi.mocked(supabase.rpc).mockResolvedValue({ data: null, error: { message: "boom" } } as never); + + expect(await claimSongstatsBackfillRows(5)).toEqual([]); + consoleError.mockRestore(); + }); +}); diff --git a/lib/supabase/songstats_backfill_queue/__tests__/updateSongstatsBackfillQueue.test.ts b/lib/supabase/songstats_backfill_queue/__tests__/updateSongstatsBackfillQueue.test.ts new file mode 100644 index 000000000..f93215309 --- /dev/null +++ b/lib/supabase/songstats_backfill_queue/__tests__/updateSongstatsBackfillQueue.test.ts @@ -0,0 +1,35 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { updateSongstatsBackfillQueue } from "../updateSongstatsBackfillQueue"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +describe("updateSongstatsBackfillQueue", () => { + beforeEach(() => vi.clearAllMocks()); + + it("updates a queue row's status by id", async () => { + const eq = vi.fn().mockResolvedValue({ error: null }); + const update = vi.fn().mockReturnValue({ eq }); + vi.mocked(supabase.from).mockReturnValue({ update } as never); + + await updateSongstatsBackfillQueue("q1", { status: "done" }); + + expect(supabase.from).toHaveBeenCalledWith("songstats_backfill_queue"); + expect(update).toHaveBeenCalledWith({ status: "done" }); + expect(eq).toHaveBeenCalledWith("id", "q1"); + }); + + it("throws on update error", async () => { + const eq = vi.fn().mockResolvedValue({ error: { message: "boom" } }); + const update = vi.fn().mockReturnValue({ eq }); + vi.mocked(supabase.from).mockReturnValue({ update } as never); + + await expect(updateSongstatsBackfillQueue("q1", { status: "failed" })).rejects.toThrow( + "Failed to update songstats backfill queue: boom", + ); + }); +}); diff --git a/lib/supabase/songstats_backfill_queue/claimSongstatsBackfillRows.ts b/lib/supabase/songstats_backfill_queue/claimSongstatsBackfillRows.ts new file mode 100644 index 000000000..c3c5448c9 --- /dev/null +++ b/lib/supabase/songstats_backfill_queue/claimSongstatsBackfillRows.ts @@ -0,0 +1,26 @@ +import supabase from "../serverClient"; +import { Tables } from "@/types/database.types"; + +/** + * Atomically claim a batch of pending backfill rows in rank order via the + * claim_songstats_backfill_rows RPC (FOR UPDATE SKIP LOCKED — concurrent + * workers skip each other's locked rows). Claimed rows are already marked + * in_progress when returned. + * + * @param batchSize - Max rows to claim + * @returns The claimed rows, or [] when the queue is drained or on error + */ +export async function claimSongstatsBackfillRows( + batchSize: number, +): Promise[]> { + const { data, error } = await supabase.rpc("claim_songstats_backfill_rows", { + batch_size: batchSize, + }); + + if (error) { + console.error("Error claiming songstats backfill rows:", error); + return []; + } + + return data || []; +} diff --git a/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue.ts b/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue.ts new file mode 100644 index 000000000..df3c8a16a --- /dev/null +++ b/lib/supabase/songstats_backfill_queue/updateSongstatsBackfillQueue.ts @@ -0,0 +1,20 @@ +import supabase from "../serverClient"; +import { TablesUpdate } from "@/types/database.types"; + +/** + * Update a backfill queue row (mark done/failed after a claim). + * + * @param id - The queue row id + * @param fields - Fields to update + * @throws Error if the update fails + */ +export async function updateSongstatsBackfillQueue( + id: string, + fields: TablesUpdate<"songstats_backfill_queue">, +): Promise { + const { error } = await supabase.from("songstats_backfill_queue").update(fields).eq("id", id); + + if (error) { + throw new Error(`Failed to update songstats backfill queue: ${error.message}`); + } +} diff --git a/lib/supabase/songstats_quota_ledger/__tests__/insertSongstatsQuotaLedger.test.ts b/lib/supabase/songstats_quota_ledger/__tests__/insertSongstatsQuotaLedger.test.ts new file mode 100644 index 000000000..4f7d5c2a8 --- /dev/null +++ b/lib/supabase/songstats_quota_ledger/__tests__/insertSongstatsQuotaLedger.test.ts @@ -0,0 +1,32 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { insertSongstatsQuotaLedger } from "../insertSongstatsQuotaLedger"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +describe("insertSongstatsQuotaLedger", () => { + beforeEach(() => vi.clearAllMocks()); + + it("records spent hits", async () => { + const insert = vi.fn().mockResolvedValue({ error: null }); + vi.mocked(supabase.from).mockReturnValue({ insert } as never); + + await insertSongstatsQuotaLedger({ hits: 1, purpose: "backfill USA2P2015959" }); + + expect(supabase.from).toHaveBeenCalledWith("songstats_quota_ledger"); + expect(insert).toHaveBeenCalledWith([{ hits: 1, purpose: "backfill USA2P2015959" }]); + }); + + it("throws on insert error (spend must be recorded)", async () => { + const insert = vi.fn().mockResolvedValue({ error: { message: "boom" } }); + vi.mocked(supabase.from).mockReturnValue({ insert } as never); + + await expect(insertSongstatsQuotaLedger({ hits: 1 })).rejects.toThrow( + "Failed to record songstats quota spend: boom", + ); + }); +}); diff --git a/lib/supabase/songstats_quota_ledger/__tests__/selectSongstatsQuotaSpent.test.ts b/lib/supabase/songstats_quota_ledger/__tests__/selectSongstatsQuotaSpent.test.ts new file mode 100644 index 000000000..6d59c6267 --- /dev/null +++ b/lib/supabase/songstats_quota_ledger/__tests__/selectSongstatsQuotaSpent.test.ts @@ -0,0 +1,35 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { selectSongstatsQuotaSpent } from "../selectSongstatsQuotaSpent"; +import supabase from "../../serverClient"; + +vi.mock("../../serverClient", () => { + const mockFrom = vi.fn(); + const mockRpc = vi.fn(); + return { default: { from: mockFrom, rpc: mockRpc } }; +}); + +describe("selectSongstatsQuotaSpent", () => { + beforeEach(() => vi.clearAllMocks()); + + it("sums hits spent since the window start", async () => { + const gte = vi.fn().mockResolvedValue({ data: [{ hits: 3 }, { hits: 7 }], error: null }); + const select = vi.fn().mockReturnValue({ gte }); + vi.mocked(supabase.from).mockReturnValue({ select } as never); + + const result = await selectSongstatsQuotaSpent("2026-05-12T00:00:00Z"); + + expect(supabase.from).toHaveBeenCalledWith("songstats_quota_ledger"); + expect(gte).toHaveBeenCalledWith("spent_at", "2026-05-12T00:00:00Z"); + expect(result).toBe(10); + }); + + it("treats errors as full quota spent (fail safe — do not overspend)", async () => { + const consoleError = vi.spyOn(console, "error").mockImplementation(() => {}); + const gte = vi.fn().mockResolvedValue({ data: null, error: { message: "boom" } }); + const select = vi.fn().mockReturnValue({ gte }); + vi.mocked(supabase.from).mockReturnValue({ select } as never); + + expect(await selectSongstatsQuotaSpent("2026-05-12T00:00:00Z")).toBe(Number.MAX_SAFE_INTEGER); + consoleError.mockRestore(); + }); +}); diff --git a/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger.ts b/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger.ts new file mode 100644 index 000000000..777c42a79 --- /dev/null +++ b/lib/supabase/songstats_quota_ledger/insertSongstatsQuotaLedger.ts @@ -0,0 +1,19 @@ +import supabase from "../serverClient"; +import { TablesInsert } from "@/types/database.types"; + +/** + * Record Songstats quota spend. Throws on failure — unrecorded spend would + * let the worker silently blow the rolling-window budget. + * + * @param entry - hits spent, optional purpose/account attribution + * @throws Error if the insert fails + */ +export async function insertSongstatsQuotaLedger( + entry: TablesInsert<"songstats_quota_ledger">, +): Promise { + const { error } = await supabase.from("songstats_quota_ledger").insert([entry]); + + if (error) { + throw new Error(`Failed to record songstats quota spend: ${error.message}`); + } +} diff --git a/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent.ts b/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent.ts new file mode 100644 index 000000000..0610a4b27 --- /dev/null +++ b/lib/supabase/songstats_quota_ledger/selectSongstatsQuotaSpent.ts @@ -0,0 +1,23 @@ +import supabase from "../serverClient"; + +/** + * Sum Songstats hits spent since a window start (the rolling 30-day quota + * check). Errors are treated as the full quota being spent — failing safe: + * the worker must never overspend because the ledger was unreadable. + * + * @param since - Inclusive ISO lower bound of the window + * @returns Total hits spent in the window + */ +export async function selectSongstatsQuotaSpent(since: string): Promise { + const { data, error } = await supabase + .from("songstats_quota_ledger") + .select("hits") + .gte("spent_at", since); + + if (error) { + console.error("Error reading songstats quota ledger:", error); + return Number.MAX_SAFE_INTEGER; + } + + return (data || []).reduce((sum, row) => sum + row.hits, 0); +} diff --git a/types/database.types.ts b/types/database.types.ts index 8ef25914b..1771da89f 100644 --- a/types/database.types.ts +++ b/types/database.types.ts @@ -3756,6 +3756,23 @@ export type Database = { Args: { target_team_account_id: string; target_user_id: string }; Returns: boolean; }; + claim_songstats_backfill_rows: { + Args: { batch_size: number }; + Returns: { + created_at: string; + id: string; + rank_score: number; + song: string; + status: string; + updated_at: string; + }[]; + SetofOptions: { + from: "*"; + to: "songstats_backfill_queue"; + isOneToOne: false; + isSetofReturn: true; + }; + }; count_reports_by_day: { Args: { end_date: string; start_date: string }; Returns: { diff --git a/vercel.json b/vercel.json index f732f23d9..a9abbb836 100644 --- a/vercel.json +++ b/vercel.json @@ -4,6 +4,10 @@ { "path": "/api/internal/credit-spend-digest", "schedule": "*/10 * * * *" + }, + { + "path": "/api/internal/playcount-maintenance", + "schedule": "0 7 * * *" } ] }