diff --git a/package-lock.json b/package-lock.json index 3b809e36f..e40ec328c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "@opensearch-project/opensearch": "^2.5.0", "aws4": "^1.12.0", "axios": "^1.6.5", + "better-queue": "^3.8.12", "bluebird": "^3.7.2", "body-parser": "^1.20.2", "cloudinary": "^2.0.2", @@ -2860,6 +2861,21 @@ "tweetnacl": "^0.14.3" } }, + "node_modules/better-queue": { + "version": "3.8.12", + "resolved": "https://registry.npmjs.org/better-queue/-/better-queue-3.8.12.tgz", + "integrity": "sha512-D9KZ+Us+2AyaCz693/9AyjTg0s8hEmkiM/MB3i09cs4MdK1KgTSGJluXRYmOulR69oLZVo2XDFtqsExDt8oiLA==", + "dependencies": { + "better-queue-memory": "^1.0.1", + "node-eta": "^0.9.0", + "uuid": "^9.0.0" + } + }, + "node_modules/better-queue-memory": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/better-queue-memory/-/better-queue-memory-1.0.4.tgz", + "integrity": "sha512-SWg5wFIShYffEmJpI6LgbL8/3Dqhku7xI1oEiy6FroP9DbcZlG0ZDjxvPdP9t7hTGW40IpIcC6zVoGT1oxjOuA==" + }, "node_modules/bignumber.js": { "version": "9.1.2", "resolved": "https://registry.npmjs.org/bignumber.js/-/bignumber.js-9.1.2.tgz", @@ -5493,6 +5509,11 @@ "node": ">= 0.4.0" } }, + "node_modules/node-eta": { + "version": "0.9.0", + "resolved": "https://registry.npmjs.org/node-eta/-/node-eta-0.9.0.tgz", + "integrity": "sha512-mTCTZk29tmX1OGfVkPt63H3c3VqXrI2Kvua98S7iUIB/Gbp0MNw05YtUomxQIxnnKMyRIIuY9izPcFixzhSBrA==" + }, "node_modules/node-fetch": { "version": "2.7.0", "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", diff --git a/package.json b/package.json index 249d3504c..37acd18ab 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "@opensearch-project/opensearch": "^2.5.0", "aws4": "^1.12.0", "axios": "^1.6.5", + "better-queue": "^3.8.12", "bluebird": "^3.7.2", "body-parser": "^1.20.2", "cloudinary": "^2.0.2", diff --git a/server/email/generateTemplate.js b/server/email/generateTemplate.js index 6ec5d1745..7168d435e 100644 --- a/server/email/generateTemplate.js +++ b/server/email/generateTemplate.js @@ -76,7 +76,7 @@ const generateEmailTemplate = (strings) => { - +

${strings?.dateLine || now()}

${strings?.dateLine || now()}

diff --git a/server/email/tasksEmails.js b/server/email/tasksEmails.js index 74b4e0487..d31eb7b8f 100644 --- a/server/email/tasksEmails.js +++ b/server/email/tasksEmails.js @@ -11,7 +11,7 @@ const client = require("../graphql-client/graphql-client").client; const queries = require("../graphql-client/queries"); const generateEmailTemplate = require("./generateTemplate"); const moment = require("moment"); -const { UPDATE_TASKS_REMIND_AT_SENT } = require("../graphql-client/queries"); +const { taskEmailQueue } = require("./tasksEmailsQueue"); const ses = new aws.SES({ apiVersion: "latest", @@ -27,6 +27,41 @@ const transporter = nodemailer.createTransport({ sendingRate: 40 // 40 emails per second. }); +// Initialize the Tasks Email Queue +const tasksEmailQueue = taskEmailQueue(); + +// Cleanup function for the Tasks Email Queue +const tasksEmailQueueCleanup = async () => { + try { + // Example async operation + console.log("Performing Tasks Email Reminder process cleanup..."); + await new Promise((resolve) => tasksEmailQueue.destroy(() => resolve())); + } catch (err) { + console.error("Tasks Email Reminder process cleanup failed:", err); + } +}; + +// Handling SIGINT (e.g., Ctrl+C) +process.on("SIGINT", async () => { + await tasksEmailQueueCleanup(); + process.exit(0); +}); +// Handling SIGTERM (e.g., sent by system shutdown) +process.on("SIGTERM", async () => { + await tasksEmailQueueCleanup(); + process.exit(0); +}); +// Handling uncaught exceptions +process.on("uncaughtException", async (err) => { + await tasksEmailQueueCleanup(); + process.exit(1); // Exit with an 'error' code +}); +// Handling unhandled promise rejections +process.on("unhandledRejection", async (reason, promise) => { + await tasksEmailQueueCleanup(); + process.exit(1); // Exit with an 'error' code +}); + const fromEmails = InstanceManager({ imex: "ImEX Online ", rome: "Rome Online ", @@ -75,27 +110,29 @@ const generateTemplateArgs = (title, createdBy, dueDate, taskId) => { */ const sendMail = (type, to, subject, html, taskIds, successCallback) => { // Push next messages to Nodemailer - transporter.once("idle", () => { - if (transporter.isIdle()) { - transporter.sendMail( - { - from: fromEmails, - to, - subject, - html - }, - (error, info) => { - if (info) { - if (typeof successCallback === "function" && taskIds && taskIds.length) { - successCallback(taskIds); - } - } else { - logger.log(`task-${type}-email-failure`, "error", null, null, error); - } + //transporter.once("idle", () => { + // Note: This is commented out because despite being in the documentation, it does not work + // and stackoverflow suggests it is not needed + // if (transporter.isIdle()) { + transporter.sendMail( + { + from: fromEmails, + to, + subject, + html + }, + (error, info) => { + if (info) { + if (typeof successCallback === "function" && taskIds && taskIds.length) { + successCallback(taskIds); } - ); + } else { + logger.log(`task-${type}-email-failure`, "error", null, null, error); + } } - }); + ); + // } + // }); }; /** @@ -179,7 +216,8 @@ const tasksRemindEmail = async (req, res) => { if (recipient.count === 1) { const onlyTask = groupedTasks[recipient.email][0]; - emailData.subject = `New Task Reminder - ${onlyTask.title} - ${formatDate(onlyTask.due_date)}`; + emailData.subject = + `New Task Reminder - ${onlyTask.title} ${onlyTask.due_date ? `- ${formatDate(onlyTask.due_date)}` : ""}`.trim(); emailData.html = generateEmailTemplate( generateTemplateArgs(onlyTask.title, onlyTask.created_by, onlyTask.due_date, onlyTask.id) @@ -194,9 +232,8 @@ const tasksRemindEmail = async (req, res) => { subHeader: `Please sign in to your account to view the Task details.`, body: `` @@ -206,10 +243,9 @@ const tasksRemindEmail = async (req, res) => { if (emailData?.subject && emailData?.html) { // Send Email sendMail("remind", emailData.to, emailData.subject, emailData.html, taskIds, (taskIds) => { - client.request(UPDATE_TASKS_REMIND_AT_SENT, { - taskIds, - now: moment().toISOString() - }); + for (const taskId of taskIds) { + tasksEmailQueue.push(taskId); + } }); } }); diff --git a/server/email/tasksEmailsQueue.js b/server/email/tasksEmailsQueue.js new file mode 100644 index 000000000..c74c6bd3d --- /dev/null +++ b/server/email/tasksEmailsQueue.js @@ -0,0 +1,37 @@ +const path = require("path"); +require("dotenv").config({ + path: path.resolve(process.cwd(), `.env.${process.env.NODE_ENV || "development"}`) +}); +const Queue = require("better-queue"); +const moment = require("moment/moment"); +const { client } = require("../graphql-client/graphql-client"); +const { UPDATE_TASKS_REMIND_AT_SENT } = require("../graphql-client/queries"); +const logger = require("../utils/logger"); +const taskEmailQueue = () => + new Queue( + (taskIds, cb) => { + console.log("Processing reminds for taskIds: ", taskIds.join(", ")); + + // Set the remind_at_sent to the current time. + const now = moment().toISOString(); + + client + .request(UPDATE_TASKS_REMIND_AT_SENT, { taskIds, now }) + .then((taskResponse) => { + logger.log("task-remind-email-queue", "info", null, null, taskResponse); + cb(null, taskResponse); + }) + .catch((err) => { + logger.log("task-remind-email-queue", "error", null, null, err); + cb(err); + }); + }, + { + batchSize: 50, + batchDelay: 5000, + // The lower this is, the more likely we are to hit the rate limit. + batchDelayTimeout: 1000 + } + ); + +module.exports = { taskEmailQueue };