diff --git a/hasura/metadata/tables.yaml b/hasura/metadata/tables.yaml index d8b30cc80..e713d16f0 100644 --- a/hasura/metadata/tables.yaml +++ b/hasura/metadata/tables.yaml @@ -3256,6 +3256,7 @@ query_params: {} template_engine: Kriti url: '{{$base_url}}/notifications/events/handleJobLinesChange' + version: 1 - table: name: joblines_status schema: public diff --git a/hasura/migrations/1740162115648_alter_table_public_notifications_add_column_html_body/down.sql b/hasura/migrations/1740162115648_alter_table_public_notifications_add_column_html_body/down.sql new file mode 100644 index 000000000..bb56e2f77 --- /dev/null +++ b/hasura/migrations/1740162115648_alter_table_public_notifications_add_column_html_body/down.sql @@ -0,0 +1,4 @@ +-- Could not auto-generate a down migration. +-- Please write an appropriate down migration for the SQL below: +-- alter table "public"."notifications" add column "html_body" text +-- not null; diff --git a/hasura/migrations/1740162115648_alter_table_public_notifications_add_column_html_body/up.sql b/hasura/migrations/1740162115648_alter_table_public_notifications_add_column_html_body/up.sql new file mode 100644 index 000000000..be7753c99 --- /dev/null +++ b/hasura/migrations/1740162115648_alter_table_public_notifications_add_column_html_body/up.sql @@ -0,0 +1,2 @@ +alter table "public"."notifications" add column "html_body" text + not null; diff --git a/hasura/migrations/1740162296457_alter_table_public_notifications_alter_column_fcm_title/down.sql b/hasura/migrations/1740162296457_alter_table_public_notifications_alter_column_fcm_title/down.sql new file mode 100644 index 000000000..a2d9ebaa9 --- /dev/null +++ b/hasura/migrations/1740162296457_alter_table_public_notifications_alter_column_fcm_title/down.sql @@ -0,0 +1 @@ +alter table "public"."notifications" alter column "fcm_title" set not null; diff --git a/hasura/migrations/1740162296457_alter_table_public_notifications_alter_column_fcm_title/up.sql b/hasura/migrations/1740162296457_alter_table_public_notifications_alter_column_fcm_title/up.sql new file mode 100644 index 000000000..3755dc382 --- /dev/null +++ b/hasura/migrations/1740162296457_alter_table_public_notifications_alter_column_fcm_title/up.sql @@ -0,0 +1 @@ +alter table "public"."notifications" alter column "fcm_title" drop not null; diff --git a/hasura/migrations/1740162315085_alter_table_public_notifications_alter_column_fcm_message/down.sql b/hasura/migrations/1740162315085_alter_table_public_notifications_alter_column_fcm_message/down.sql new file mode 100644 index 000000000..dadd5d448 --- /dev/null +++ b/hasura/migrations/1740162315085_alter_table_public_notifications_alter_column_fcm_message/down.sql @@ -0,0 +1 @@ +alter table "public"."notifications" alter column "fcm_message" set not null; diff --git a/hasura/migrations/1740162315085_alter_table_public_notifications_alter_column_fcm_message/up.sql b/hasura/migrations/1740162315085_alter_table_public_notifications_alter_column_fcm_message/up.sql new file mode 100644 index 000000000..387272491 --- /dev/null +++ b/hasura/migrations/1740162315085_alter_table_public_notifications_alter_column_fcm_message/up.sql @@ -0,0 +1 @@ +alter table "public"."notifications" alter column "fcm_message" drop not null; diff --git a/server/email/sendemail.js b/server/email/sendemail.js index 81787fe49..8d306258f 100644 --- a/server/email/sendemail.js +++ b/server/email/sendemail.js @@ -69,11 +69,11 @@ const sendServerEmail = async ({ subject, text }) => { } }, (err, info) => { - logger.log("server-email-failure", err ? "error" : "debug", null, null, { message: err || info }); + logger.log("server-email-failure", err ? "error" : "debug", null, null, { message: err?.message }); } ); } catch (error) { - logger.log("server-email-failure", "error", null, null, { error }); + logger.log("server-email-failure", "error", null, null, { message: error?.message }); } }; @@ -92,11 +92,11 @@ const sendTaskEmail = async ({ to, subject, type = "text", html, text, attachmen }, (err, info) => { // (message, type, user, record, meta - logger.log("server-email", err ? "error" : "debug", null, null, { message: err ? err?.message : info }); + logger.log("server-email", err ? "error" : "debug", null, null, { message: err?.message }); } ); } catch (error) { - logger.log("server-email-failure", "error", null, null, { error }); + logger.log("server-email-failure", "error", null, null, { message: error?.message }); } }; @@ -125,7 +125,7 @@ const sendEmail = async (req, res) => { cc: req.body.cc, subject: req.body.subject, templateStrings: req.body.templateStrings, - error + errorMessage: error?.message }); } }) @@ -194,7 +194,7 @@ const sendEmail = async (req, res) => { cc: req.body.cc, subject: req.body.subject, templateStrings: req.body.templateStrings, - error: err + errorMessage: err?.message }); logEmail(req, { to: req.body.to, @@ -202,7 +202,7 @@ const sendEmail = async (req, res) => { subject: req.body.subject, bodyshopid: req.body.bodyshopid }); - res.status(500).json({ success: false, error: err }); + res.status(500).json({ success: false, errorMessage: err?.message }); } } ); @@ -270,14 +270,14 @@ ${body.bounce?.bouncedRecipients.map( }, (err, info) => { logger.log("sns-error", err ? "error" : "debug", "api", null, { - message: err ? err?.message : info + errorMessage: err?.message }); } ); } } catch (error) { logger.log("sns-error", "ERROR", "api", null, { - error: JSON.stringify(error) + errorMessage: error?.message }); } res.sendStatus(200); diff --git a/server/graphql-client/queries.js b/server/graphql-client/queries.js index 113369e4e..d7c97ad41 100644 --- a/server/graphql-client/queries.js +++ b/server/graphql-client/queries.js @@ -2738,6 +2738,7 @@ query GET_NOTIFICATION_ASSOCIATIONS($emails: [String!]!, $shopid: uuid!) { useremail: { _in: $emails }, shopid: { _eq: $shopid } }) { + id useremail notification_settings } diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index e7c7ef759..e5ffb9617 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -1,4 +1,5 @@ const { Queue, Worker } = require("bullmq"); +const graphQLClient = require("../../graphql-client/graphql-client").client; // Base time-related constant in minutes, sourced from environment variable or defaulting to 1 const APP_CONSOLIDATION_DELAY_IN_MINS = (() => { @@ -10,15 +11,70 @@ const APP_CONSOLIDATION_DELAY_IN_MINS = (() => { // Base time-related constant (in milliseconds) / DO NOT TOUCH const APP_CONSOLIDATION_DELAY = APP_CONSOLIDATION_DELAY_IN_MINS * 60000; // 1 minute (base timeout) -// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH, these are pegged to APP_CONSOLIDATION_DELAY -const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, for notification storage) -const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s, buffer for consolidation flag) -const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base, for lock duration) -const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base, for rate limiting) +// Derived time-related constants based on APP_CONSOLIDATION_DELAY / DO NOT TOUCH +const NOTIFICATION_STORAGE_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) +const CONSOLIDATION_FLAG_EXPIRATION = APP_CONSOLIDATION_DELAY * 1.5; // 1.5 minutes (90s) +const LOCK_EXPIRATION = APP_CONSOLIDATION_DELAY * 0.25; // 15 seconds (quarter of base) +const RATE_LIMITER_DURATION = APP_CONSOLIDATION_DELAY * 0.1; // 6 seconds (tenth of base) let addQueue; let consolidateQueue; +// GraphQL mutation to insert notifications +const INSERT_NOTIFICATIONS_MUTATION = ` + mutation INSERT_NOTIFICATIONS($objects: [notifications_insert_input!]!) { + insert_notifications(objects: $objects) { + affected_rows + returning { + id + jobid + associationid + ui_translation_string + ui_translation_meta + html_body + } + } + } +`; + +/** + * Builds an HTML unordered list from an array of notification bodies. + * + * @param {Array} notifications - Array of notification objects with a 'body' field. + * @returns {string} HTML string representing an unordered list of bodies. + */ +const buildHtmlBody = (notifications) => { + const listItems = notifications.map((n) => `
  • ${n.body}
  • `).join(""); + return ``; +}; + +/** + * Determines the key and variables for a batch of notifications. + * + * @param {Array} notifications - Array of notification objects with 'key' and 'variables'. + * @returns {Object} An object with 'key' and 'variables' properties. + */ +const determineKeyAndVariables = (notifications) => { + if (notifications.length === 1) { + // Single notification: use the original key and variables + return { + key: notifications[0].key, + variables: notifications[0].variables + }; + } else { + // Multiple notifications: use a generic key and consolidate variables with their original keys + return { + key: "notifications.job.multipleChanges", + variables: { + variables: notifications.map((n) => ({ + key: n.key, // Include the original key in each variables object + ...n.variables + })) + } + }; + } +}; + /** * Initializes the notification queues and workers for adding and consolidating notifications. * @@ -30,43 +86,37 @@ let consolidateQueue; * @returns {Queue} The initialized `addQueue` instance for dispatching notifications. */ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { - // Only initialize if queues don't already exist if (!addQueue || !consolidateQueue) { logger.logger.info("Initializing Notifications Queues"); - // Create queue for adding notifications addQueue = new Queue("notificationsAdd", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); - // Create queue for consolidating notifications consolidateQueue = new Queue("notificationsConsolidate", { connection: pubClient, prefix: "{BULLMQ}", defaultJobOptions: { removeOnComplete: true, removeOnFail: true } }); - // Worker to process jobs from the addQueue const addWorker = new Worker( "notificationsAdd", async (job) => { - const { jobId, key, variables, recipients } = job.data; + const { jobId, key, variables, recipients, body } = job.data; logger.logger.info(`Adding notifications for jobId ${jobId}`); const redisKeyPrefix = `app:notifications:${jobId}`; - const notification = { key, variables, timestamp: Date.now() }; + const notification = { key, variables, body, timestamp: Date.now() }; - // Store notification for each recipient in Redis for (const recipient of recipients) { const { user } = recipient; const userKey = `${redisKeyPrefix}:${user}`; const existingNotifications = await pubClient.get(userKey); const notifications = existingNotifications ? JSON.parse(existingNotifications) : []; notifications.push(notification); - // Set with expiration to avoid stale data - await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); // Convert to seconds + await pubClient.set(userKey, JSON.stringify(notifications), "EX", NOTIFICATION_STORAGE_EXPIRATION / 1000); logger.logger.debug(`Stored notification for ${user} under ${userKey}: ${JSON.stringify(notifications)}`); } @@ -75,20 +125,18 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { logger.logger.debug(`Consolidation flag set for jobId ${jobId}: ${flagSet}`); if (flagSet) { - // Schedule consolidation job with delay and retries await consolidateQueue.add( "consolidate-notifications", { jobId, recipients }, { jobId: `consolidate:${jobId}`, delay: APP_CONSOLIDATION_DELAY, - attempts: 3, // Retry up to 3 times - backoff: LOCK_EXPIRATION // Retry delay matches lock expiration (15s) + attempts: 3, + backoff: LOCK_EXPIRATION } ); logger.logger.info(`Scheduled consolidation for jobId ${jobId}`); - // Set expiration on flag - await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); // Convert to seconds + await pubClient.expire(consolidateKey, CONSOLIDATION_FLAG_EXPIRATION / 1000); } else { logger.logger.debug(`Consolidation already scheduled for jobId ${jobId}`); } @@ -100,7 +148,6 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } ); - // Worker to process jobs from the consolidateQueue const consolidateWorker = new Worker( "notificationsConsolidate", async (job) => { @@ -109,8 +156,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const redisKeyPrefix = `app:notifications:${jobId}`; const lockKey = `lock:consolidate:${jobId}`; - // Acquire a lock to prevent concurrent consolidation - const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); // Convert to seconds + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", LOCK_EXPIRATION / 1000); logger.logger.debug(`Lock acquisition for jobId ${jobId}: ${lockAcquired}`); if (lockAcquired) { @@ -119,7 +165,6 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const uniqueUsers = [...new Set(recipients.map((r) => r.user))]; logger.logger.debug(`Unique users for jobId ${jobId}: ${uniqueUsers}`); - // Retrieve and structure notifications by user and bodyShopId for (const user of uniqueUsers) { const userKey = `${redisKeyPrefix}:${user}`; const notifications = await pubClient.get(userKey); @@ -141,29 +186,71 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { logger.logger.debug(`Consolidated notifications: ${JSON.stringify(allNotifications)}`); - // Emit notifications to users via Socket.io + // Insert notifications into the database and collect IDs + const notificationInserts = []; + const notificationIdMap = new Map(); + + for (const [user, bodyShopData] of Object.entries(allNotifications)) { + const userRecipients = recipients.filter((r) => r.user === user); + const employeeId = userRecipients[0]?.employeeId; + + for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { + const { key, variables } = determineKeyAndVariables(notifications); + const htmlBody = buildHtmlBody(notifications); + notificationInserts.push({ + jobid: jobId, + associationid: employeeId || null, + ui_translation_string: key, + ui_translation_meta: JSON.stringify(variables), + html_body: htmlBody + }); + notificationIdMap.set(`${user}:${bodyShopId}`, null); + } + } + + if (notificationInserts.length > 0) { + const insertResponse = await graphQLClient.request(INSERT_NOTIFICATIONS_MUTATION, { + objects: notificationInserts + }); + logger.logger.info( + `Inserted ${insertResponse.insert_notifications.affected_rows} notifications for jobId ${jobId}` + ); + + insertResponse.insert_notifications.returning.forEach((row, index) => { + const user = uniqueUsers[Math.floor(index / Object.keys(allNotifications[uniqueUsers[0]]).length)]; + const bodyShopId = Object.keys(allNotifications[user])[ + index % Object.keys(allNotifications[user]).length + ]; + notificationIdMap.set(`${user}:${bodyShopId}`, row.id); + }); + } + + // Emit notifications to users via Socket.io with notification ID for (const [user, bodyShopData] of Object.entries(allNotifications)) { const userMapping = await redisHelpers.getUserSocketMapping(user); logger.logger.debug(`User socket mapping for ${user}: ${JSON.stringify(userMapping)}`); for (const [bodyShopId, notifications] of Object.entries(bodyShopData)) { + const notificationId = notificationIdMap.get(`${user}:${bodyShopId}`); if (userMapping && userMapping[bodyShopId]?.socketIds) { userMapping[bodyShopId].socketIds.forEach((socketId) => { logger.logger.debug( `Emitting to socket ${socketId}: ${JSON.stringify({ jobId, bodyShopId, - notifications + notifications, + notificationId })}` ); ioRedis.to(socketId).emit("notification", { jobId, bodyShopId, - notifications + notifications, + notificationId }); }); logger.logger.info( - `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId}` + `Sent ${notifications.length} consolidated notifications to ${user} for jobId ${jobId} with notificationId ${notificationId}` ); } else { logger.logger.warn(`No socket IDs found for ${user} in bodyShopId ${bodyShopId}`); @@ -174,7 +261,7 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { await pubClient.del(`app:consolidate:${jobId}`); } catch (err) { logger.logger.error(`Consolidation error for jobId ${jobId}: ${err.message}`, { error: err }); - throw err; // Trigger retry if attempts remain + throw err; } finally { await pubClient.del(lockKey); } @@ -190,20 +277,15 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { } ); - // Log worker completion events addWorker.on("completed", (job) => logger.logger.info(`Add job ${job.id} completed`)); consolidateWorker.on("completed", (job) => logger.logger.info(`Consolidate job ${job.id} completed`)); - - // Log worker failure events with error details addWorker.on("failed", (job, err) => logger.logger.error(`Add job ${job.id} failed: ${err.message}`, { error: err }) ); - consolidateWorker.on("failed", (job, err) => logger.logger.error(`Consolidate job ${job.id} failed: ${err.message}`, { error: err }) ); - // Graceful shutdown handler for workers const shutdown = async () => { logger.logger.info("Closing app queue workers..."); await Promise.all([addWorker.close(), consolidateWorker.close()]); @@ -240,10 +322,10 @@ const dispatchAppsToQueue = async ({ appsToDispatch, logger }) => { const appQueue = getQueue(); for (const app of appsToDispatch) { - const { jobId, bodyShopId, key, variables, recipients } = app; + const { jobId, bodyShopId, key, variables, recipients, body } = app; await appQueue.add( "add-notification", - { jobId, bodyShopId, key, variables, recipients }, + { jobId, bodyShopId, key, variables, recipients, body }, { jobId: `${jobId}:${Date.now()}` } ); logger.logger.info(`Added notification to queue for jobId ${jobId} with ${recipients.length} recipients`); diff --git a/server/notifications/scenarioBuilders.js b/server/notifications/scenarioBuilders.js index 432b66bd4..96b09465c 100644 --- a/server/notifications/scenarioBuilders.js +++ b/server/notifications/scenarioBuilders.js @@ -8,8 +8,8 @@ const { getJobAssignmentType } = require("./stringHelpers"); */ const populateWatchers = (data, result) => { data.scenarioWatchers.forEach((recipients) => { - const { user, app, fcm, email, firstName, lastName } = recipients; - if (app === true) result.app.recipients.push({ user, bodyShopId: data.bodyShopId }); + const { user, app, fcm, email, firstName, lastName, employeeId } = recipients; + if (app === true) result.app.recipients.push({ user, bodyShopId: data.bodyShopId, employeeId }); if (fcm === true) result.fcm.recipients.push(user); if (email === true) result.email.recipients.push({ user, firstName, lastName }); }); diff --git a/server/notifications/scenarioParser.js b/server/notifications/scenarioParser.js index a7c3a7815..1a599420f 100644 --- a/server/notifications/scenarioParser.js +++ b/server/notifications/scenarioParser.js @@ -115,7 +115,7 @@ const scenarioParser = async (req, jobIdField) => { }) .map((assoc) => { const settings = assoc.notification_settings[scenario.key]; - const watcherEmail = assoc.user || assoc.useremail; + const watcherEmail = assoc.useremail; const matchingWatcher = jobWatchers.find((watcher) => watcher.email === watcherEmail); // Build watcher object with notification preferences and personal details @@ -126,7 +126,7 @@ const scenarioParser = async (req, jobIdField) => { fcm: settings.fcm, firstName: matchingWatcher ? matchingWatcher.firstName : undefined, lastName: matchingWatcher ? matchingWatcher.lastName : undefined, - employeeId: matchingWatcher ? matchingWatcher.employeeId : undefined + employeeId: matchingWatcher ? assoc.id : undefined }; }) }));