diff --git a/client/src/components/dms-post-form/rr-dms-post-form.jsx b/client/src/components/dms-post-form/rr-dms-post-form.jsx index a1d8154e6..c887826fd 100644 --- a/client/src/components/dms-post-form/rr-dms-post-form.jsx +++ b/client/src/components/dms-post-form/rr-dms-post-form.jsx @@ -208,16 +208,16 @@ export default function RRPostForm({ }); }; - // Check if early RO was created (job has dms_id) - const hasEarlyRO = !!job?.dms_id; + // Check if early RO was created (job has all early RO fields) + const hasEarlyRO = !!(job?.dms_id && job?.dms_customer_id && job?.dms_advisor_id); return ( {hasEarlyRO && ( - ✅ Early RO Created: {job.dms_id} + ✅ {t("jobs.labels.dms.earlyro.created")} {job.dms_id}
- This will update the existing RO with full job data. + {t("jobs.labels.dms.earlyro.willupdate")}
)}
= ); diff --git a/client/src/components/jobs-convert-button/jobs-convert-button.component.jsx b/client/src/components/jobs-convert-button/jobs-convert-button.component.jsx index 88019d0ce..72de99502 100644 --- a/client/src/components/jobs-convert-button/jobs-convert-button.component.jsx +++ b/client/src/components/jobs-convert-button/jobs-convert-button.component.jsx @@ -106,8 +106,8 @@ export function JobsConvertButton({ bodyshop, job, refetch, jobRO, insertAuditTr setEarlyRoCreated(true); // Mark early RO as created setEarlyRoCreatedThisSession(true); // Mark as created in this session notification.success({ - title: t("jobs.successes.early_ro_created", "Early RO Created"), - message: `RO Number: ${result.roNumber || "N/A"}` + title: t("jobs.successes.early_ro_created"), + description: `RO Number: ${result.roNumber || "N/A"}` }); // Delay refetch to keep success message visible for 2 seconds setTimeout(() => { diff --git a/client/src/components/jobs-detail-general/jobs-detail-general.component.jsx b/client/src/components/jobs-detail-general/jobs-detail-general.component.jsx index 172400edd..5eae62abf 100644 --- a/client/src/components/jobs-detail-general/jobs-detail-general.component.jsx +++ b/client/src/components/jobs-detail-general/jobs-detail-general.component.jsx @@ -251,7 +251,6 @@ export function JobsDetailGeneral({ bodyshop, jobRO, job, form }) { ))} - @@ -267,6 +266,21 @@ export function JobsDetailGeneral({ bodyshop, jobRO, job, form }) { + {bodyshop.rr_dealerid && ( + + + + )} + {bodyshop.rr_dealerid && ( + + + + )} + {bodyshop.rr_dealerid && ( + + + + )} ); diff --git a/client/src/graphql/jobs.queries.js b/client/src/graphql/jobs.queries.js index 98b349e8a..21d9522b1 100644 --- a/client/src/graphql/jobs.queries.js +++ b/client/src/graphql/jobs.queries.js @@ -494,6 +494,9 @@ export const GET_JOB_BY_PK = gql` ded_status deliverchecklist depreciation_taxes + dms_id + dms_advisor_id + dms_customer_id driveable employee_body employee_body_rel { @@ -1998,6 +2001,9 @@ export const QUERY_JOB_CLOSE_DETAILS = gql` qb_multiple_payers lbr_adjustments ownr_ea + dms_id + dms_customer_id + dms_advisor_id payments { amount created_at diff --git a/client/src/pages/dms/dms.container.jsx b/client/src/pages/dms/dms.container.jsx index d43f22a08..22d79a0bb 100644 --- a/client/src/pages/dms/dms.container.jsx +++ b/client/src/pages/dms/dms.container.jsx @@ -426,6 +426,24 @@ export function DmsContainer({ bodyshop, setBreadcrumbs, setSelectedHeader, inse if (data.jobs_by_pk?.date_exported) return ; + // Check if Reynolds mode requires early RO + const hasEarlyRO = !!(data.jobs_by_pk?.dms_id && data.jobs_by_pk?.dms_customer_id && data.jobs_by_pk?.dms_advisor_id); + + if (isRrMode && !hasEarlyRO) { + return ( + + + + } + /> + ); + } + return (
diff --git a/client/src/pages/jobs-admin/jobs-admin.page.jsx b/client/src/pages/jobs-admin/jobs-admin.page.jsx index 65635618b..e7f9606d8 100644 --- a/client/src/pages/jobs-admin/jobs-admin.page.jsx +++ b/client/src/pages/jobs-admin/jobs-admin.page.jsx @@ -1,10 +1,12 @@ -import { useQuery } from "@apollo/client/react"; -import { Button, Card, Col, Result, Row, Space, Typography } from "antd"; -import { useEffect, useState } from "react"; +import { useMutation, useQuery } from "@apollo/client/react"; +import { Button, Card, Col, Form, Input, Modal, Result, Row, Select, Space, Switch, Typography } from "antd"; +import { useEffect, useState, useCallback } from "react"; import { useTranslation } from "react-i18next"; import { connect } from "react-redux"; import { useParams } from "react-router-dom"; import { useTreatmentsWithConfig } from "@splitsoftware/splitio-react"; +import { some } from "lodash"; +import axios from "axios"; import AlertComponent from "../../components/alert/alert.component"; import JobCalculateTotals from "../../components/job-calculate-totals/job-calculate-totals.component"; import ScoreboardAddButton from "../../components/job-scoreboard-add-button/job-scoreboard-add-button.component"; @@ -21,14 +23,16 @@ import LoadingSpinner from "../../components/loading-spinner/loading-spinner.com import NotFound from "../../components/not-found/not-found.component"; import RbacWrapper from "../../components/rbac-wrapper/rbac-wrapper.component"; import RREarlyROModal from "../../components/dms-post-form/rr-early-ro-modal"; -import { GET_JOB_BY_PK } from "../../graphql/jobs.queries"; +import { GET_JOB_BY_PK, CONVERT_JOB_TO_RO } from "../../graphql/jobs.queries"; import { setBreadcrumbs, setSelectedHeader } from "../../redux/application/application.actions"; +import { insertAuditTrail } from "../../redux/application/application.actions"; import { selectBodyshop } from "../../redux/user/user.selectors"; import { createStructuredSelector } from "reselect"; import { useSocket } from "../../contexts/SocketIO/useSocket"; import { useNotification } from "../../contexts/Notifications/notificationContext"; import { DMS_MAP, getDmsMode } from "../../utils/dmsUtils"; import InstanceRenderManager from "../../utils/instanceRenderMgr"; +import AuditTrailMapping from "../../utils/AuditTrailMappings"; const mapStateToProps = createStructuredSelector({ bodyshop: selectBodyshop @@ -36,7 +40,8 @@ const mapStateToProps = createStructuredSelector({ const mapDispatchToProps = (dispatch) => ({ setBreadcrumbs: (breadcrumbs) => dispatch(setBreadcrumbs(breadcrumbs)), - setSelectedHeader: (key) => dispatch(setSelectedHeader(key)) + setSelectedHeader: (key) => dispatch(setSelectedHeader(key)), + insertAuditTrail: ({ jobid, operation, type }) => dispatch(insertAuditTrail({ jobid, operation, type })) }); const colSpan = { @@ -50,7 +55,7 @@ const cardStyle = { height: "100%" }; -export function JobsCloseContainer({ setBreadcrumbs, setSelectedHeader, bodyshop }) { +export function JobsCloseContainer({ setBreadcrumbs, setSelectedHeader, bodyshop, insertAuditTrail }) { const { jobId } = useParams(); const { loading, error, data, refetch } = useQuery(GET_JOB_BY_PK, { variables: { id: jobId }, @@ -61,6 +66,11 @@ export function JobsCloseContainer({ setBreadcrumbs, setSelectedHeader, bodyshop const { socket } = useSocket(); // Extract socket from context const notification = useNotification(); const [showEarlyROModal, setShowEarlyROModal] = useState(false); + const [showConvertModal, setShowConvertModal] = useState(false); + const [convertLoading, setConvertLoading] = useState(false); + const [form] = Form.useForm(); + const [mutationConvertJob] = useMutation(CONVERT_JOB_TO_RO); + const allFormValues = Form.useWatch([], form); // Get Fortellis treatment for proper DMS mode detection const { @@ -105,13 +115,53 @@ export function JobsCloseContainer({ setBreadcrumbs, setSelectedHeader, bodyshop const handleEarlyROSuccess = (result) => { notification.success({ - title: t("jobs.successes.early_ro_created", "Early RO Created"), - message: `RO Number: ${result.roNumber || "N/A"}` + title: t("jobs.successes.early_ro_created"), + description: `RO Number: ${result.roNumber || "N/A"}` }); setShowEarlyROModal(false); refetch?.(); }; + const handleConvert = async ({ employee_csr, category, ...values }) => { + if (!job?.id) return; + setConvertLoading(true); + const res = await mutationConvertJob({ + variables: { + jobId: job.id, + job: { + converted: true, + ...(bodyshop?.enforce_conversion_csr ? { employee_csr } : {}), + ...(bodyshop?.enforce_conversion_category ? { category } : {}), + ...values + } + } + }); + + if (values.ca_gst_registrant) { + await axios.post("/job/totalsssu", { + id: job.id + }); + } + + if (!res.errors) { + refetch(); + notification.success({ + title: t("jobs.successes.converted") + }); + + insertAuditTrail({ + jobid: job.id, + operation: AuditTrailMapping.jobconverted(res.data.update_jobs.returning[0].ro_number), + type: "jobconverted" + }); + + setShowConvertModal(false); + } + setConvertLoading(false); + }; + + const submitDisabled = useCallback(() => some(allFormValues, (v) => v === undefined), [allFormValues]); + if (loading) return ; if (error) return ; if (!data.jobs_by_pk) return ; @@ -138,7 +188,12 @@ export function JobsCloseContainer({ setBreadcrumbs, setSelectedHeader, bodyshop {isReynoldsMode && job?.converted && !job?.dms_id && !job?.dms_customer_id && !job?.dms_advisor_id && ( + )} + {isReynoldsMode && !job?.converted && !job?.dms_id && ( + )} @@ -176,6 +231,161 @@ export function JobsCloseContainer({ setBreadcrumbs, setSelectedHeader, bodyshop socket={socket} job={job} /> + + {/* Convert without Early RO Modal */} + setShowConvertModal(false)} + title={t("jobs.actions.convertwithoutearlyro", "Convert without Early RO")} + footer={null} + width={700} + destroyOnHidden + > + + + + + {bodyshop?.enforce_class && ( + + + + )} + {bodyshop?.enforce_referral && ( + <> + + + + + + + + )} + {bodyshop?.enforce_conversion_csr && ( + + + + )} + {bodyshop?.enforce_conversion_category && ( + + + + )} + {bodyshop?.region_config?.toLowerCase().startsWith("ca") && ( + + + + )} + + + + + + + + + + + + + ); } diff --git a/client/src/pages/jobs-close/jobs-close.component.jsx b/client/src/pages/jobs-close/jobs-close.component.jsx index 9a429cb2a..6587e8399 100644 --- a/client/src/pages/jobs-close/jobs-close.component.jsx +++ b/client/src/pages/jobs-close/jobs-close.component.jsx @@ -9,6 +9,7 @@ import { Form, Input, InputNumber, + Modal, Popconfirm, Row, Select, @@ -42,7 +43,7 @@ import { setModalContext } from "../../redux/modals/modals.actions.js"; import { selectBodyshop, selectCurrentUser } from "../../redux/user/user.selectors"; import AuditTrailMapping from "../../utils/AuditTrailMappings"; import dayjs from "../../utils/day"; -import { bodyshopHasDmsKey } from "../../utils/dmsUtils.js"; +import { bodyshopHasDmsKey, DMS_MAP, getDmsMode } from "../../utils/dmsUtils.js"; const mapStateToProps = createStructuredSelector({ bodyshop: selectBodyshop, @@ -71,6 +72,11 @@ export function JobsCloseComponent({ job, bodyshop, jobRO, insertAuditTrail, set const notification = useNotification(); const hasDMSKey = bodyshopHasDmsKey(bodyshop); + const dmsMode = getDmsMode(bodyshop, "off"); + const isReynoldsMode = dmsMode === DMS_MAP.reynolds; + const hasEarlyRO = !!(job?.dms_id && job?.dms_customer_id && job?.dms_advisor_id); + const canSendToDMS = !isReynoldsMode || hasEarlyRO; + const [showEarlyROModal, setShowEarlyROModal] = useState(false); const { treatments: { Qb_Multi_Ar, ClosingPeriod } @@ -82,18 +88,18 @@ export function JobsCloseComponent({ job, bodyshop, jobRO, insertAuditTrail, set const handleFinish = async ({ removefromproduction, ...values }) => { setLoading(true); - + // Validate that all joblines have valid IDs - const joblinesWithIds = values.joblines.filter(jl => jl && jl.id); + const joblinesWithIds = values.joblines.filter((jl) => jl && jl.id); if (joblinesWithIds.length !== values.joblines.length) { notification.error({ title: t("jobs.errors.invalidjoblines"), - message: t("jobs.errors.missingjoblineids") + description: t("jobs.errors.missingjoblineids") }); setLoading(false); return; } - + const result = await client.mutate({ mutation: generateJobLinesUpdatesForInvoicing(values.joblines) }); @@ -208,9 +214,17 @@ export function JobsCloseComponent({ job, bodyshop, jobRO, insertAuditTrail, set {bodyshopHasDmsKey(bodyshop) && ( - - - + <> + {canSendToDMS ? ( + + + + ) : ( + + )} + )} + + +
); } diff --git a/client/src/translations/en_us/common.json b/client/src/translations/en_us/common.json index 5f5c5514f..626ab5e4c 100644 --- a/client/src/translations/en_us/common.json +++ b/client/src/translations/en_us/common.json @@ -1047,7 +1047,9 @@ }, "dms": { "errors": { - "alreadyexported": "This job has already been sent to the DMS. If you need to resend it, please use admin permissions to mark the job for re-export." + "alreadyexported": "This job has already been sent to the DMS. If you need to resend it, please use admin permissions to mark the job for re-export.", + "earlyrorequired": "Early RO Required", + "earlyrorequired.message": "This job requires an early Repair Order to be created before posting to Reynolds. Please use the admin panel to create the early RO first." }, "labels": { "refreshallocations": "Refresh to see DMS Allocations." @@ -1244,6 +1246,7 @@ "deselectall": "Deselect All", "download": "Download", "edit": "Edit", + "gotoadmin": "Go to Admin Panel", "login": "Login", "next": "Next", "ok": "Ok", @@ -1622,11 +1625,13 @@ "changestatus": "Change Status", "changestimator": "Change Estimator", "convert": "Convert", + "convertwithoutearlyro": "Convert without Early RO", "createiou": "Create IOU", "deliver": "Deliver", "deliver_quick": "Quick Deliver", "dms": { "addpayer": "Add Payer", + "createearlyro": "Create RR RO", "createnewcustomer": "Create New Customer", "findmakemodelcode": "Find Make/Model Code", "getmakes": "Get Makes", @@ -1635,6 +1640,7 @@ }, "post": "Post", "refetchmakesmodels": "Refetch Make and Model Codes", + "update_ro": "Update RO", "usegeneric": "Use Generic Customer", "useselected": "Use Selected Customer" }, @@ -1794,6 +1800,7 @@ }, "cost": "Cost", "cost_dms_acctnumber": "Cost DMS Acct #", + "customer": "Customer #", "dms_make": "DMS Make", "dms_model": "DMS Model", "dms_model_override": "Override DMS Make/Model", @@ -2107,6 +2114,11 @@ "damageto": "Damage to $t(jobs.fields.area_of_damage_impact.{{area_of_damage}}).", "defaultstory": "B/S RO: {{ro_number}}. Owner: {{ownr_nm}}. Insurance Co: {{ins_co_nm}}. Claim/PO #: {{clm_po}}", "disablebillwip": "Cost and WIP for bills has been ignored per shop configuration.", + "earlyro": { + "created": "Early RO Created:", + "fields": "Required fields:", + "willupdate": "This will update the existing RO with full job data." + }, "invoicedatefuture": "Invoice date must be today or in the future for CDK posting.", "kmoutnotgreaterthankmin": "Mileage out must be greater than mileage in.", "logs": "Logs", @@ -2264,6 +2276,7 @@ "delete": "Job deleted successfully.", "deleted": "Job deleted successfully.", "duplicated": "Job duplicated successfully. ", + "early_ro_created": "Early RO Created", "exported": "Job(s) exported successfully. ", "invoiced": "Job closed and invoiced successfully.", "ioucreated": "IOU created successfully. Click to see.", diff --git a/client/src/translations/es/common.json b/client/src/translations/es/common.json index 6559e1038..664c5081c 100644 --- a/client/src/translations/es/common.json +++ b/client/src/translations/es/common.json @@ -1047,7 +1047,9 @@ }, "dms": { "errors": { - "alreadyexported": "" + "alreadyexported": "", + "earlyrorequired": "", + "earlyrorequired.message": "" }, "labels": { "refreshallocations": "" @@ -1244,6 +1246,7 @@ "deselectall": "", "download": "", "edit": "Editar", + "gotoadmin": "", "login": "", "next": "", "ok": "", @@ -1622,11 +1625,13 @@ "changestatus": "Cambiar Estado", "changestimator": "", "convert": "Convertir", + "convertwithoutearlyro": "", "createiou": "", "deliver": "", "deliver_quick": "", "dms": { "addpayer": "", + "createearlyro": "", "createnewcustomer": "", "findmakemodelcode": "", "getmakes": "", @@ -1635,6 +1640,7 @@ }, "post": "", "refetchmakesmodels": "", + "update_ro": "", "usegeneric": "", "useselected": "" }, @@ -1794,6 +1800,7 @@ }, "cost": "", "cost_dms_acctnumber": "", + "customer": "", "dms_make": "", "dms_model": "", "dms_model_override": "", @@ -2107,6 +2114,11 @@ "damageto": "", "defaultstory": "", "disablebillwip": "", + "earlyro": { + "created": "", + "fields": "", + "willupdate": "" + }, "invoicedatefuture": "", "kmoutnotgreaterthankmin": "", "logs": "", @@ -2264,6 +2276,7 @@ "delete": "", "deleted": "Trabajo eliminado con éxito.", "duplicated": "", + "early_ro_created": "", "exported": "", "invoiced": "", "ioucreated": "", diff --git a/client/src/translations/fr/common.json b/client/src/translations/fr/common.json index 00af4c6aa..67a14d5ff 100644 --- a/client/src/translations/fr/common.json +++ b/client/src/translations/fr/common.json @@ -1047,7 +1047,9 @@ }, "dms": { "errors": { - "alreadyexported": "" + "alreadyexported": "", + "earlyrorequired": "", + "earlyrorequired.message": "" }, "labels": { "refreshallocations": "" @@ -1244,6 +1246,7 @@ "deselectall": "", "download": "", "edit": "modifier", + "gotoadmin": "", "login": "", "next": "", "ok": "", @@ -1622,11 +1625,13 @@ "changestatus": "Changer le statut", "changestimator": "", "convert": "Convertir", + "convertwithoutearlyro": "", "createiou": "", "deliver": "", "deliver_quick": "", "dms": { "addpayer": "", + "createearlyro": "", "createnewcustomer": "", "findmakemodelcode": "", "getmakes": "", @@ -1635,6 +1640,7 @@ }, "post": "", "refetchmakesmodels": "", + "update_ro": "", "usegeneric": "", "useselected": "" }, @@ -1794,6 +1800,7 @@ }, "cost": "", "cost_dms_acctnumber": "", + "customer": "", "dms_make": "", "dms_model": "", "dms_model_override": "", @@ -2107,6 +2114,11 @@ "damageto": "", "defaultstory": "", "disablebillwip": "", + "earlyro": { + "created": "", + "fields": "", + "willupdate": "" + }, "invoicedatefuture": "", "kmoutnotgreaterthankmin": "", "logs": "", @@ -2264,6 +2276,7 @@ "delete": "", "deleted": "Le travail a bien été supprimé.", "duplicated": "", + "early_ro_created": "", "exported": "", "invoiced": "", "ioucreated": "", diff --git a/server.js b/server.js index 07901ab6e..33cbe014c 100644 --- a/server.js +++ b/server.js @@ -40,6 +40,8 @@ const { loadEmailQueue } = require("./server/notifications/queues/emailQueue"); const { loadAppQueue } = require("./server/notifications/queues/appQueue"); const { SetLegacyWebsocketHandlers } = require("./server/web-sockets/web-socket"); const { loadFcmQueue } = require("./server/notifications/queues/fcmQueue"); +const { loadChatterApiQueue } = require("./server/data/queues/chatterApiQueue"); +const { processChatterApiJob } = require("./server/data/chatter-api"); const CLUSTER_RETRY_BASE_DELAY = 100; const CLUSTER_RETRY_MAX_DELAY = 5000; @@ -391,6 +393,15 @@ const applySocketIO = async ({ server, app }) => { const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { const queueSettings = { pubClient, logger, redisHelpers, ioRedis }; + // Load chatterApi queue with processJob function and redis helpers + const chatterApiQueue = await loadChatterApiQueue({ + pubClient, + logger, + processJob: processChatterApiJob, + getChatterToken: redisHelpers.getChatterToken, + setChatterToken: redisHelpers.setChatterToken + }); + // Assuming loadEmailQueue and loadAppQueue return Promises const [notificationsEmailsQueue, notificationsAppQueue, notificationsFcmQueue] = await Promise.all([ loadEmailQueue(queueSettings), @@ -410,6 +421,10 @@ const loadQueues = async ({ pubClient, logger, redisHelpers, ioRedis }) => { notificationsFcmQueue.on("error", (error) => { logger.log(`Error in notificationsFCMQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); }); + + chatterApiQueue.on("error", (error) => { + logger.log(`Error in chatterApiQueue: ${error}`, "ERROR", "queue", "api", null, { error: error?.message }); + }); }; /** diff --git a/server/data/chatter-api.js b/server/data/chatter-api.js index caee129f0..1aecb7cfb 100644 --- a/server/data/chatter-api.js +++ b/server/data/chatter-api.js @@ -40,7 +40,6 @@ const logger = require("../utils/logger"); const { ChatterApiClient, getChatterApiToken, CHATTER_BASE_URL } = require("../chatter/chatter-client"); const client = require("../graphql-client/graphql-client").client; -const { sendServerEmail } = require("../email/sendemail"); const CHATTER_EVENT = process.env.NODE_ENV === "production" ? "delivery" : "TEST_INTEGRATION"; const MAX_CONCURRENCY = Number(process.env.CHATTER_API_CONCURRENCY || 5); @@ -53,74 +52,98 @@ const clientCache = new Map(); // companyId -> ChatterApiClient const tokenInFlight = new Map(); // companyId -> Promise (for in-flight deduplication) const companyRateLimiters = new Map(); // companyId -> rate limiter +/** + * Core processing function for Chatter API jobs. + * This can be called by the HTTP handler or the BullMQ worker. + * + * @param {Object} options - Processing options + * @param {string} options.start - Start date for the delivery window + * @param {string} options.end - End date for the delivery window + * @param {Array} options.bodyshopIds - Optional specific shops to process + * @param {boolean} options.skipUpload - Dry-run flag + * @param {Object} options.sessionUtils - Optional session utils for token caching + * @returns {Promise} Result with totals, allShopSummaries, and allErrors + */ +async function processChatterApiJob({ start, end, bodyshopIds, skipUpload, sessionUtils }) { + logger.log("chatter-api-start", "DEBUG", "api", null, null); + + const allErrors = []; + const allShopSummaries = []; + + // Shops that DO have chatter_company_id + const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY); + + const shopsToProcess = + bodyshopIds?.length > 0 ? bodyshops.filter((shop) => bodyshopIds.includes(shop.id)) : bodyshops; + + logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length }); + + if (shopsToProcess.length === 0) { + logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null); + return { + totals: { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 }, + allShopSummaries: [], + allErrors: [] + }; + } + + await processBatchApi({ + shopsToProcess, + start, + end, + skipUpload, + allShopSummaries, + allErrors, + sessionUtils + }); + + const totals = allShopSummaries.reduce( + (acc, s) => { + acc.shops += 1; + acc.jobs += s.jobs || 0; + acc.sent += s.sent || 0; + acc.duplicates += s.duplicates || 0; + acc.failed += s.failed || 0; + return acc; + }, + { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 } + ); + + logger.log("chatter-api-end", "DEBUG", "api", null, totals); + + return { totals, allShopSummaries, allErrors }; +} + exports.default = async (req, res) => { if (process.env.NODE_ENV !== "production") return res.sendStatus(403); if (req.headers["x-imex-auth"] !== process.env.AUTOHOUSE_AUTH_TOKEN) return res.sendStatus(401); res.status(202).json({ success: true, - message: "Processing Chatter-API Cron request ...", + message: "Chatter API job queued for processing", timestamp: new Date().toISOString() }); try { - logger.log("chatter-api-start", "DEBUG", "api", null, null); + const { dispatchChatterApiJob } = require("./queues/chatterApiQueue"); + const { start, end, bodyshopIds, skipUpload } = req.body; - const allErrors = []; - const allShopSummaries = []; - - // Shops that DO have chatter_company_id - const { bodyshops } = await client.request(queries.GET_CHATTER_SHOPS_WITH_COMPANY); - - const specificShopIds = req.body.bodyshopIds; - const { start, end, skipUpload } = req.body; // keep same flag; now acts like "dry run" - - const shopsToProcess = - specificShopIds?.length > 0 ? bodyshops.filter((shop) => specificShopIds.includes(shop.id)) : bodyshops; - - logger.log("chatter-api-shopsToProcess-generated", "DEBUG", "api", null, { count: shopsToProcess.length }); - - if (shopsToProcess.length === 0) { - logger.log("chatter-api-shopsToProcess-empty", "DEBUG", "api", null, null); - return; - } - - await processBatchApi({ - shopsToProcess, + await dispatchChatterApiJob({ start, end, - skipUpload, - allShopSummaries, - allErrors, - sessionUtils: req.sessionUtils + bodyshopIds, + skipUpload }); - - const totals = allShopSummaries.reduce( - (acc, s) => { - acc.shops += 1; - acc.jobs += s.jobs || 0; - acc.sent += s.sent || 0; - acc.duplicates += s.duplicates || 0; - acc.failed += s.failed || 0; - return acc; - }, - { shops: 0, jobs: 0, sent: 0, duplicates: 0, failed: 0 } - ); - - await sendServerEmail({ - subject: `Chatter API Report ${moment().format("MM-DD-YY")}`, - text: - `Totals:\n${JSON.stringify(totals, null, 2)}\n\n` + - `Shop summaries:\n${JSON.stringify(allShopSummaries, null, 2)}\n\n` + - `Errors:\n${JSON.stringify(allErrors, null, 2)}\n` - }); - - logger.log("chatter-api-end", "DEBUG", "api", null, totals); } catch (error) { - logger.log("chatter-api-error", "ERROR", "api", null, { error: error.message, stack: error.stack }); + logger.log("chatter-api-queue-dispatch-error", "ERROR", "api", null, { + error: error.message, + stack: error.stack + }); } }; +exports.processChatterApiJob = processChatterApiJob; + async function processBatchApi({ shopsToProcess, start, end, skipUpload, allShopSummaries, allErrors, sessionUtils }) { for (const bodyshop of shopsToProcess) { const summary = { diff --git a/server/data/queues/chatterApiQueue.js b/server/data/queues/chatterApiQueue.js new file mode 100644 index 000000000..dc0f63f47 --- /dev/null +++ b/server/data/queues/chatterApiQueue.js @@ -0,0 +1,178 @@ +const { Queue, Worker } = require("bullmq"); +const { registerCleanupTask } = require("../../utils/cleanupManager"); +const getBullMQPrefix = require("../../utils/getBullMQPrefix"); +const devDebugLogger = require("../../utils/devDebugLogger"); +const moment = require("moment-timezone"); +const { sendServerEmail } = require("../../email/sendemail"); + +let chatterApiQueue; +let chatterApiWorker; + +/** + * Initializes the Chatter API queue and worker. + * + * @param {Object} options - Configuration options for queue initialization. + * @param {Object} options.pubClient - Redis client instance for queue communication. + * @param {Object} options.logger - Logger instance for logging events and debugging. + * @param {Function} options.processJob - Function to process the Chatter API job. + * @param {Function} options.getChatterToken - Function to get Chatter token from Redis. + * @param {Function} options.setChatterToken - Function to set Chatter token in Redis. + * @returns {Queue} The initialized `chatterApiQueue` instance. + */ +const loadChatterApiQueue = async ({ pubClient, logger, processJob, getChatterToken, setChatterToken }) => { + if (!chatterApiQueue) { + const prefix = getBullMQPrefix(); + + devDebugLogger(`Initializing Chatter API Queue with prefix: ${prefix}`); + + chatterApiQueue = new Queue("chatterApi", { + prefix, + connection: pubClient, + defaultJobOptions: { + removeOnComplete: true, + removeOnFail: false, + attempts: 3, + backoff: { + type: "exponential", + delay: 60000 // 1 minute base delay + } + } + }); + + chatterApiWorker = new Worker( + "chatterApi", + async (job) => { + const { start, end, bodyshopIds, skipUpload } = job.data; + + logger.log("chatter-api-queue-job-start", "INFO", "api", null, { + jobId: job.id, + start, + end, + bodyshopIds, + skipUpload + }); + + try { + // Provide sessionUtils-like object with token caching functions + const sessionUtils = { + getChatterToken, + setChatterToken + }; + + const result = await processJob({ + start, + end, + bodyshopIds, + skipUpload, + sessionUtils + }); + + logger.log("chatter-api-queue-job-complete", "INFO", "api", null, { + jobId: job.id, + totals: result.totals + }); + + // Send email summary + await sendServerEmail({ + subject: `Chatter API Report ${moment().format("MM-DD-YY")}`, + text: + `Totals:\n${JSON.stringify(result.totals, null, 2)}\n\n` + + `Shop summaries:\n${JSON.stringify(result.allShopSummaries, null, 2)}\n\n` + + `Errors:\n${JSON.stringify(result.allErrors, null, 2)}\n` + }); + + return result; + } catch (error) { + logger.log("chatter-api-queue-job-error", "ERROR", "api", null, { + jobId: job.id, + error: error.message, + stack: error.stack + }); + + // Send error email + await sendServerEmail({ + subject: `Chatter API Error ${moment().format("MM-DD-YY")}`, + text: `Job failed:\n${error.message}\n\n${error.stack}` + }); + + throw error; + } + }, + { + prefix, + connection: pubClient, + concurrency: 1, // Process one job at a time + lockDuration: 14400000 // 4 hours - allow long-running jobs + } + ); + + // Event handlers + chatterApiWorker.on("completed", (job) => { + devDebugLogger(`Chatter API job ${job.id} completed`); + }); + + chatterApiWorker.on("failed", (job, err) => { + logger.log("chatter-api-queue-job-failed", "ERROR", "api", null, { + jobId: job?.id, + message: err?.message, + stack: err?.stack + }); + }); + + chatterApiWorker.on("progress", (job, progress) => { + devDebugLogger(`Chatter API job ${job.id} progress: ${progress}%`); + }); + + // Register cleanup task + const shutdown = async () => { + devDebugLogger("Closing Chatter API queue worker..."); + await chatterApiWorker.close(); + devDebugLogger("Chatter API queue worker closed"); + }; + registerCleanupTask(shutdown); + } + + return chatterApiQueue; +}; + +/** + * Retrieves the initialized `chatterApiQueue` instance. + * + * @returns {Queue} The `chatterApiQueue` instance. + * @throws {Error} If `chatterApiQueue` is not initialized. + */ +const getQueue = () => { + if (!chatterApiQueue) { + throw new Error("Chatter API queue not initialized. Ensure loadChatterApiQueue is called during bootstrap."); + } + return chatterApiQueue; +}; + +/** + * Dispatches a Chatter API job to the queue. + * + * @param {Object} options - Options for the job. + * @param {string} options.start - Start date for the delivery window. + * @param {string} options.end - End date for the delivery window. + * @param {Array} options.bodyshopIds - Optional specific shops to process. + * @param {boolean} options.skipUpload - Dry-run flag. + * @returns {Promise} Resolves when the job is added to the queue. + */ +const dispatchChatterApiJob = async ({ start, end, bodyshopIds, skipUpload }) => { + const queue = getQueue(); + + const jobData = { + start: start || moment().subtract(1, "days").startOf("day").toISOString(), + end: end || moment().endOf("day").toISOString(), + bodyshopIds: bodyshopIds || [], + skipUpload: skipUpload || false + }; + + await queue.add("process-chatter-api", jobData, { + jobId: `chatter-api-${moment().format("YYYY-MM-DD-HHmmss")}` + }); + + devDebugLogger(`Added Chatter API job to queue: ${JSON.stringify(jobData)}`); +}; + +module.exports = { loadChatterApiQueue, getQueue, dispatchChatterApiJob };