diff --git a/apps/app/src/jobs/tasks/notifications/risk-task-notification.ts b/apps/app/src/jobs/tasks/notifications/risk-task-notification.ts new file mode 100644 index 0000000000..547939f307 --- /dev/null +++ b/apps/app/src/jobs/tasks/notifications/risk-task-notification.ts @@ -0,0 +1,53 @@ +import { db } from "@bubba/db"; +import { NotificationTypes, TriggerEvents, trigger } from "@bubba/notifications"; +import { logger, schemaTask } from "@trigger.dev/sdk/v3"; +import { formatDistance } from "date-fns"; +import { z } from "zod"; + +export const sendRiskTaskNotification = schemaTask({ + id: "send-risk-task-notification", + schema: z.object({ + task: z.object({ + id: z.string(), + title: z.string(), + dueDate: z.date(), + owner: z.object({ + id: z.string(), + email: z.string(), + organizationId: z.string(), + }), + riskId: z.string(), + }) + }), + run: async (payload) => { + const { task } = payload; + + try { + const owner = task.owner; + + const timeUntilDue = task.dueDate ? formatDistance(task.dueDate, new Date(), { addSuffix: true }) : "soon"; + + await db.riskMitigationTask.update({ + where: { id: task.id }, + data: { notifiedAt: new Date() }, + }); + + await trigger({ + name: TriggerEvents.TaskReminderInApp, + user: { + subscriberId: `${owner.organizationId}_${owner.id}`, + email: owner.email, + fullName: owner.email, + organizationId: owner.organizationId, + }, + payload: { + description: `${task.title} is due ${timeUntilDue}`, + recordId: `/risk/${task.riskId}/tasks/${task.id}`, + type: NotificationTypes.Task, + }, + }); + } catch (error) { + logger.error(`Error sending risk task notification: ${error}`); + } + }, +}); \ No newline at end of file diff --git a/apps/app/src/jobs/tasks/notifications/risk-task-notifications.ts b/apps/app/src/jobs/tasks/notifications/risk-task-notifications.ts deleted file mode 100644 index 47b3dd4d26..0000000000 --- a/apps/app/src/jobs/tasks/notifications/risk-task-notifications.ts +++ /dev/null @@ -1,95 +0,0 @@ -import { db } from "@bubba/db"; -import { - NotificationTypes, - TriggerEvents, - trigger, -} from "@bubba/notifications"; -import { logger, schedules } from "@trigger.dev/sdk/v3"; -import { formatDistance } from "date-fns"; - -export const sendRiskTaskNotifications = schedules.task({ - id: "send-risk-task-notifications", - run: async () => { - const now = new Date(); - const upcomingThreshold = new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000); - - logger.info( - `Sending risk task notifications from now: ${now} to ${upcomingThreshold}`, - ); - - const tasks = await db.riskMitigationTask.findMany({ - where: { - dueDate: { gte: now, lte: upcomingThreshold }, - status: { in: ["open", "pending"] }, - notifiedAt: null, - }, - select: { - id: true, - dueDate: true, - notifiedAt: true, - riskId: true, - title: true, - owner: { - select: { - id: true, - email: true, - name: true, - image: true, - organizationId: true, - }, - }, - }, - }); - - const notifiedTasks = []; - - for (const task of tasks) { - const owner = task.owner; - - const timeUntilDue = task.dueDate - ? formatDistance(task.dueDate, new Date(), { - addSuffix: true, - }) - : "soon"; - - try { - if (!owner || !owner.email || !owner.organizationId) { - logger.warn(`Skipping task ${task.id} - owner ${owner?.id} missing email or organizationId`); - continue; - } - - await db.riskMitigationTask.update({ - where: { id: task.id }, - data: { notifiedAt: new Date() }, - }); - - await trigger({ - name: TriggerEvents.TaskReminderInApp, - user: { - subscriberId: `${owner.organizationId}_${owner.id}`, - email: owner.email, - fullName: owner.name, - image: owner.image, - organizationId: owner.organizationId, - }, - payload: { - description: `${task.title} is due ${timeUntilDue}`, - recordId: `/risk/${task.riskId}/tasks/${task.id}`, - type: NotificationTypes.Task, - }, - }); - - - notifiedTasks.push(task.id); - } catch (error) { - logger.error( - `Error processing task ${task.id} for ${owner?.email}: ${error}`, - ); - } - } - - if (notifiedTasks.length) { - logger.info(`Sent notifications for tasks: ${notifiedTasks.join(", ")}`); - } - }, -}); \ No newline at end of file diff --git a/apps/app/src/jobs/tasks/notifications/risk-task-schedule.ts b/apps/app/src/jobs/tasks/notifications/risk-task-schedule.ts new file mode 100644 index 0000000000..9a1fb167d1 --- /dev/null +++ b/apps/app/src/jobs/tasks/notifications/risk-task-schedule.ts @@ -0,0 +1,81 @@ +import { db } from "@bubba/db"; +import { logger, schedules } from "@trigger.dev/sdk/v3"; +import { sendRiskTaskNotification } from "./risk-task-notification"; + +export const sendRiskTaskSchedule = schedules.task({ + id: "risk-task-schedule", + cron: "0 * * * *", + run: async () => { + const now = new Date(); + const upcomingThreshold = new Date(now.getTime() + 7 * 24 * 60 * 60 * 1000); + + logger.info( + `Sending risk task notifications from now: ${now} to ${upcomingThreshold}`, + ); + + const tasks = await db.riskMitigationTask.findMany({ + where: { + dueDate: { gte: now, lte: upcomingThreshold }, + status: { in: ["open", "pending"] }, + notifiedAt: null, + }, + select: { + id: true, + dueDate: true, + notifiedAt: true, + riskId: true, + title: true, + owner: { + select: { + id: true, + email: true, + name: true, + organizationId: true, + }, + }, + }, + }); + + const triggerPayloads = tasks + .filter((task): task is (typeof task & { + owner: { id: string; email: string; organizationId: string } + }) => Boolean(task.owner?.email && task.owner.organizationId)) + .map(task => ({ + payload: { + task: { + id: task.id, + title: task.title, + dueDate: task.dueDate || new Date(), + owner: task.owner, + riskId: task.riskId, + } + } + })); + + if (triggerPayloads.length > 0) { + try { + + await sendRiskTaskNotification.batchTrigger( + triggerPayloads, + ); + + logger.info(`Triggered ${triggerPayloads.length} task notifications`); + } catch (error) { + logger.error(`Failed to trigger batch notifications: ${error}`); + + return { + success: false, + totalTasks: tasks.length, + triggeredTasks: triggerPayloads.length, + error: error instanceof Error ? error.message : String(error), + }; + } + } + + return { + success: true, + totalTasks: tasks.length, + triggeredTasks: triggerPayloads.length, + }; + } +}); \ No newline at end of file