diff --git a/server/notifications/queues/appQueue.js b/server/notifications/queues/appQueue.js index 27e83a6b6..bbbf270fd 100644 --- a/server/notifications/queues/appQueue.js +++ b/server/notifications/queues/appQueue.js @@ -18,6 +18,8 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const redisKey = `app:notifications:${jobId}`; const lastSentKey = `${redisKey}:lastSent`; + const lockKey = `lock:send-notifications:${jobId}`; + const recurringFlagKey = `app:recurring:${jobId}`; if (job.name === "add-notification") { const notification = { key, variables, timestamp: Date.now() }; @@ -63,22 +65,31 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (hasNewNotifications) { await pubClient.set(lastSentKey, Date.now(), "EX", 300); } else { - // Only remove if no active "add-notification" jobs are pending const activeJobs = await appQueue.getActive(); const hasPendingAdds = activeJobs.some((j) => j.name === "add-notification" && j.data.jobId === jobId); if (!hasPendingAdds) { - const recurringJobKey = `send-notifications:${jobId}`; - const removed = await appQueue.removeRepeatable("send-notifications", { - every: 30 * 1000, - jobId: recurringJobKey - }); - if (removed) { - logger.logger.info( - `Successfully removed recurring send-notifications job for jobId ${jobId} due to no new notifications` - ); + const lockAcquired = await pubClient.set(lockKey, "locked", "NX", "EX", 10); + if (lockAcquired) { + const recurringJobKey = `send-notifications:${jobId}`; + const repeatableJobs = await appQueue.getRepeatableJobs(); + const jobExists = repeatableJobs.some((j) => j.key === recurringJobKey); + if (jobExists) { + await appQueue.removeRepeatableByKey(recurringJobKey); + // Drain all remaining send-notifications jobs for this jobId + await appQueue.drain(false); // false to not force removal of active jobs + logger.logger.info( + `Successfully removed recurring send-notifications job and drained queue for jobId ${jobId} with key ${recurringJobKey}` + ); + } else { + logger.logger.info( + `No recurring send-notifications job found for jobId ${jobId} with key ${recurringJobKey} - processing leftover scheduled instance` + ); + } + await pubClient.del(lockKey); + await pubClient.del(recurringFlagKey); } else { - logger.logger.warn( - `Failed to remove recurring send-notifications job for jobId ${jobId} - may already be removed` + logger.logger.info( + `Skipped removal of send-notifications for jobId ${jobId} - lock held by another worker` ); } } else { @@ -100,20 +111,25 @@ const loadAppQueue = async ({ pubClient, logger, redisHelpers, ioRedis }) => { if (job.name === "add-notification") { const { jobId } = job.data; const recurringJobKey = `send-notifications:${jobId}`; - const existingJobs = await appQueue.getRepeatableJobs(); - if (!existingJobs.some((j) => j.key === recurringJobKey)) { - await appQueue.add( - "send-notifications", - { jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients }, - { - repeat: { - every: 30 * 1000, // Every 30 seconds - limit: 10 // 5 minutes - }, - jobId: recurringJobKey - } - ); - logger.logger.info(`Scheduled 30s notification send for jobId ${jobId}`); + const recurringFlagKey = `app:recurring:${jobId}`; + const flagSet = await pubClient.setnx(recurringFlagKey, "active"); + if (flagSet) { + const existingJobs = await appQueue.getRepeatableJobs(); + if (!existingJobs.some((j) => j.key === recurringJobKey)) { + await appQueue.add( + "send-notifications", + { jobId, bodyShopId: job.data.bodyShopId, recipients: job.data.recipients }, + { + repeat: { + every: 30 * 1000, + limit: 10 + }, + jobId: recurringJobKey + } + ); + logger.logger.info(`Scheduled 30s notification send for jobId ${jobId} with key ${recurringJobKey}`); + await pubClient.expire(recurringFlagKey, 300); + } } } logger.logger.info(`Job ${job.id} completed`);