From f6328d10f72a43b359f79e9239095981452d3dbd Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Thu, 27 Feb 2025 20:16:33 -0500 Subject: [PATCH] feature/IO-3096-GlobalNotifications -Read Status Sync accross all clients. --- client/src/App/App.jsx | 4 +- .../src/contexts/SocketIO/socketContext.jsx | 313 +++++++++++------- server/utils/redisHelpers.js | 22 ++ server/web-sockets/redisSocketEvents.js | 49 ++- 4 files changed, 266 insertions(+), 122 deletions(-) diff --git a/client/src/App/App.jsx b/client/src/App/App.jsx index ba37cc243..90f3237cd 100644 --- a/client/src/App/App.jsx +++ b/client/src/App/App.jsx @@ -201,7 +201,7 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline path="/manage/*" element={ - + @@ -213,7 +213,7 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline path="/tech/*" element={ - + diff --git a/client/src/contexts/SocketIO/socketContext.jsx b/client/src/contexts/SocketIO/socketContext.jsx index ff2f60561..b8598c169 100644 --- a/client/src/contexts/SocketIO/socketContext.jsx +++ b/client/src/contexts/SocketIO/socketContext.jsx @@ -11,14 +11,13 @@ import { MARK_ALL_NOTIFICATIONS_READ, MARK_NOTIFICATION_READ } from "../../graphql/notifications.queries.js"; -import { useMutation } from "@apollo/client"; +import { gql, useMutation } from "@apollo/client"; const SocketContext = createContext(null); -// This is how many notifications the database will populate on load, and the increment for load more export const INITIAL_NOTIFICATIONS = 10; -export const SocketProvider = ({ children, bodyshop, navigate }) => { +export const SocketProvider = ({ children, bodyshop, navigate, currentUser }) => { const socketRef = useRef(null); const [clientId, setClientId] = useState(null); const [isConnected, setIsConnected] = useState(false); @@ -30,21 +29,21 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { const timestamp = new Date().toISOString(); const updatedNotification = update_notifications.returning[0]; - // Update the notifications list cache.modify({ fields: { notifications(existing = [], { readField }) { - return existing.map((notif) => { - if (readField("id", notif) === updatedNotification.id) { - return { ...notif, read: timestamp }; - } - return notif; - }); + return existing.map((notif) => + readField("id", notif) === updatedNotification.id + ? { + ...notif, + read: timestamp + } + : notif + ); } } }); - // Update the unread count in notifications_aggregate const unreadCountQuery = cache.readQuery({ query: GET_UNREAD_COUNT, variables: { associationid: userAssociationId } @@ -65,10 +64,16 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { } }); } + + if (socketRef.current && isConnected) { + socketRef.current.emit("sync-notification-read", { + email: currentUser?.email, + bodyshopId: bodyshop.id, + notificationId: updatedNotification.id + }); + } }, - onError: (err) => { - console.error("MARK_NOTIFICATION_READ error in SocketProvider:", err); - } + onError: (err) => console.error("MARK_NOTIFICATION_READ error:", err) }); const [markAllNotificationsRead] = useMutation(MARK_ALL_NOTIFICATIONS_READ, { @@ -78,12 +83,11 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { cache.modify({ fields: { notifications(existing = [], { readField }) { - return existing.map((notif) => { - if (readField("read", notif) === null && readField("associationid", notif) === userAssociationId) { - return { ...notif, read: timestamp }; - } - return notif; - }); + return existing.map((notif) => + readField("read", notif) === null && readField("associationid", notif) === userAssociationId + ? { ...notif, read: timestamp } + : notif + ); }, notifications_aggregate() { return { aggregate: { count: 0, __typename: "notifications_aggregate_fields" } }; @@ -94,21 +98,13 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { const baseWhereClause = { associationid: { _eq: userAssociationId } }; const cachedNotifications = cache.readQuery({ query: GET_NOTIFICATIONS, - variables: { - limit: INITIAL_NOTIFICATIONS, - offset: 0, - where: baseWhereClause - } + variables: { limit: INITIAL_NOTIFICATIONS, offset: 0, where: baseWhereClause } }); if (cachedNotifications?.notifications) { cache.writeQuery({ query: GET_NOTIFICATIONS, - variables: { - limit: INITIAL_NOTIFICATIONS, - offset: 0, - where: baseWhereClause - }, + variables: { limit: INITIAL_NOTIFICATIONS, offset: 0, where: baseWhereClause }, data: { notifications: cachedNotifications.notifications.map((notif) => notif.read === null ? { ...notif, read: timestamp } : notif @@ -116,10 +112,15 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { } }); } + + if (socketRef.current && isConnected) { + socketRef.current.emit("sync-all-notifications-read", { + email: currentUser?.email, + bodyshopId: bodyshop.id + }); + } }, - onError: (err) => { - console.error("MARK_ALL_NOTIFICATIONS_READ error in SocketProvider:", err); - } + onError: (err) => console.error("MARK_ALL_NOTIFICATIONS_READ error:", err) }); useEffect(() => { @@ -191,7 +192,6 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { const handleNotification = (data) => { const { jobId, jobRoNumber, notificationId, associationId, notifications } = data; - if (associationId !== userAssociationId) return; const newNotification = { @@ -204,9 +204,7 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { scenario_meta: JSON.stringify(notifications.map((notif) => notif.variables || {})), created_at: new Date(notifications[0].timestamp).toISOString(), read: null, - job: { - ro_number: jobRoNumber - } + job: { ro_number: jobRoNumber } }; const baseVariables = { @@ -221,84 +219,166 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { query: GET_NOTIFICATIONS, variables: baseVariables })?.notifications || []; - - if (existingNotifications.some((n) => n.id === newNotification.id)) { - return; - } - - client.cache.writeQuery({ - query: GET_NOTIFICATIONS, - variables: baseVariables, - data: { - notifications: [newNotification, ...existingNotifications].sort( - (a, b) => new Date(b.created_at) - new Date(a.created_at) - ) - }, - broadcast: true - }); - - const unreadVariables = { - ...baseVariables, - where: { ...baseVariables.where, read: { _is_null: true } } - }; - const unreadNotifications = - client.cache.readQuery({ - query: GET_NOTIFICATIONS, - variables: unreadVariables - })?.notifications || []; - - if (newNotification.read === null && !unreadNotifications.some((n) => n.id === newNotification.id)) { + if (!existingNotifications.some((n) => n.id === newNotification.id)) { client.cache.writeQuery({ query: GET_NOTIFICATIONS, - variables: unreadVariables, + variables: baseVariables, data: { - notifications: [newNotification, ...unreadNotifications].sort( + notifications: [newNotification, ...existingNotifications].sort( (a, b) => new Date(b.created_at) - new Date(a.created_at) ) }, broadcast: true }); + + const unreadVariables = { + ...baseVariables, + where: { ...baseVariables.where, read: { _is_null: true } } + }; + const unreadNotifications = + client.cache.readQuery({ + query: GET_NOTIFICATIONS, + variables: unreadVariables + })?.notifications || []; + if (newNotification.read === null && !unreadNotifications.some((n) => n.id === newNotification.id)) { + client.cache.writeQuery({ + query: GET_NOTIFICATIONS, + variables: unreadVariables, + data: { + notifications: [newNotification, ...unreadNotifications].sort( + (a, b) => new Date(b.created_at) - new Date(a.created_at) + ) + }, + broadcast: true + }); + } + + client.cache.modify({ + id: "ROOT_QUERY", + fields: { + notifications_aggregate(existing = { aggregate: { count: 0 } }) { + return { + ...existing, + aggregate: { + ...existing.aggregate, + count: existing.aggregate.count + (newNotification.read === null ? 1 : 0) + } + }; + } + } + }); + + notification.info({ + message: `Changes for ${jobRoNumber}:`, + description: ( + + ) + }); + } + } catch (error) { + console.error(`Error handling new notification: ${error?.message || ""}`); + } + }; + + const handleSyncNotificationRead = ({ notificationId, timestamp }) => { + try { + const notificationRef = client.cache.identify({ + __typename: "notifications", + id: notificationId + }); + client.cache.writeFragment({ + id: notificationRef, + fragment: gql` + fragment UpdateNotificationRead on notifications { + read + } + `, + data: { read: timestamp } + }); + + const unreadCountData = client.cache.readQuery({ + query: GET_UNREAD_COUNT, + variables: { associationid: userAssociationId } + }); + if (unreadCountData?.notifications_aggregate?.aggregate?.count > 0) { + const newCount = Math.max(unreadCountData.notifications_aggregate.aggregate.count - 1, 0); + client.cache.writeQuery({ + query: GET_UNREAD_COUNT, + variables: { associationid: userAssociationId }, + data: { + notifications_aggregate: { + __typename: "notifications_aggregate", + aggregate: { + __typename: "notifications_aggregate_fields", + count: newCount + } + } + } + }); + } + } catch (error) { + console.error("Error in handleSyncNotificationRead:", error); + } + }; + + const handleSyncAllNotificationsRead = ({ timestamp }) => { + try { + const queryVars = { + limit: INITIAL_NOTIFICATIONS, + offset: 0, + where: { associationid: { _eq: userAssociationId } } + }; + const cachedData = client.cache.readQuery({ + query: GET_NOTIFICATIONS, + variables: queryVars + }); + + if (cachedData?.notifications) { + cachedData.notifications.forEach((notif) => { + if (!notif.read) { + const notifRef = client.cache.identify({ __typename: "notifications", id: notif.id }); + client.cache.writeFragment({ + id: notifRef, + fragment: gql` + fragment UpdateNotificationRead on notifications { + read + } + `, + data: { read: timestamp } + }); + } + }); } - client.cache.modify({ - id: "ROOT_QUERY", - fields: { - notifications_aggregate(existing = { aggregate: { count: 0 } }) { - const isUnread = newNotification.read === null; - const countChange = isUnread ? 1 : 0; - return { - ...existing, - aggregate: { - ...existing.aggregate, - count: existing.aggregate.count + countChange - } - }; + client.cache.writeQuery({ + query: GET_UNREAD_COUNT, + variables: { associationid: userAssociationId }, + data: { + notifications_aggregate: { + __typename: "notifications_aggregate", + aggregate: { + __typename: "notifications_aggregate_fields", + count: 0 + } } } }); - - notification.info({ - message: `Changes for ${jobRoNumber}:`, - description: ( - - ) - }); } catch (error) { - console.error(`Something went wrong handling a new notification: ${error?.message || ""}`); + console.error(`Error In HandleSyncAllNotificationsRead: ${error?.message || ""}`); } }; @@ -307,21 +387,9 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { socketInstance.on("connect_error", handleConnectionError); socketInstance.on("disconnect", handleDisconnect); socketInstance.on("bodyshop-message", handleBodyshopMessage); - socketInstance.on("message", (message) => { - try { - if (typeof message === "string" && message.startsWith("42")) { - const parsedMessage = JSON.parse(message.slice(2)); - const [event, data] = parsedMessage; - if (event === "notification") handleNotification(data); - } else if (Array.isArray(message)) { - const [event, data] = message; - if (event === "notification") handleNotification(data); - } - } catch (error) { - console.error("Error parsing socket message:", error); - } - }); socketInstance.on("notification", handleNotification); + socketInstance.on("sync-notification-read", handleSyncNotificationRead); + socketInstance.on("sync-all-notifications-read", handleSyncAllNotificationsRead); }; const unsubscribe = auth.onIdTokenChanged(async (user) => { @@ -351,7 +419,15 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => { setIsConnected(false); } }; - }, [bodyshop, notification, userAssociationId, markNotificationRead, markAllNotificationsRead, navigate]); + }, [ + bodyshop, + notification, + userAssociationId, + markNotificationRead, + markAllNotificationsRead, + navigate, + currentUser + ]); return ( { export const useSocket = () => { const context = useContext(SocketContext); - if (!context) { - throw new Error("useSocket must be used within a SocketProvider"); - } + // NOTE: Not sure if we absolutely require this, does cause slipups on dev env + if (!context) throw new Error("useSocket must be used within a SocketProvider"); return context; }; diff --git a/server/utils/redisHelpers.js b/server/utils/redisHelpers.js index affd7cdd8..11a42dc0c 100644 --- a/server/utils/redisHelpers.js +++ b/server/utils/redisHelpers.js @@ -213,6 +213,27 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { } }; + const getUserSocketMappingByBodyshop = async (email, bodyshopId) => { + const userKey = `user:${email}`; + const socketMappingKey = `${userKey}:socketMapping`; + try { + // Retrieve all socket mappings for the user + const mapping = await pubClient.hgetall(socketMappingKey); + const ttl = await pubClient.ttl(socketMappingKey); + // Filter socket IDs for the provided bodyshopId + const socketIds = Object.entries(mapping).reduce((acc, [socketId, bId]) => { + if (bId === bodyshopId) { + acc.push(socketId); + } + return acc; + }, []); + return { socketIds, ttl }; + } catch (error) { + logger.log(`Error retrieving socket mappings for ${email} by bodyshop ${bodyshopId}: ${error}`, "ERROR", "redis"); + throw error; + } + }; + const api = { setSessionData, getSessionData, @@ -228,6 +249,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => { getUsersInRoom, addUserSocketMapping, removeUserSocketMapping, + getUserSocketMappingByBodyshop, getUserSocketMapping, refreshUserSocketTTL }; diff --git a/server/web-sockets/redisSocketEvents.js b/server/web-sockets/redisSocketEvents.js index 59baf0d0a..50c7db961 100644 --- a/server/web-sockets/redisSocketEvents.js +++ b/server/web-sockets/redisSocketEvents.js @@ -2,7 +2,7 @@ const { admin } = require("../firebase/firebase-handler"); const redisSocketEvents = ({ io, - redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL }, + redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL, getUserSocketMappingByBodyshop }, ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom }, logger }) => { @@ -182,11 +182,58 @@ const redisSocketEvents = ({ socket.on("leave-bodyshop-conversation", leaveConversationRoom); }; + // Sync Notification Read Events + const registerSyncEvents = (socket) => { + socket.on("sync-notification-read", async ({ email, bodyshopId, notificationId }) => { + try { + const userEmail = socket.user.email; + const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId); + const timestamp = new Date().toISOString(); + + if (socketMapping?.socketIds) { + socketMapping?.socketIds.forEach((socketId) => { + if (socketId !== socket.id) { + // Avoid sending back to the originating socket + io.to(socketId).emit("sync-notification-read", { notificationId, timestamp }); + } + }); + createLogEvent( + socket, + "debug", + `Synced notification ${notificationId} read for ${userEmail} in bodyshop ${bodyshopId}` + ); + } + } catch (error) { + createLogEvent(socket, "error", `Error syncing notification read: ${error.message}`); + } + }); + + socket.on("sync-all-notifications-read", async ({ email, bodyshopId }) => { + try { + const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId); + const timestamp = new Date().toISOString(); + + if (socketMapping?.socketIds) { + socketMapping?.socketIds.forEach((socketId) => { + if (socketId !== socket.id) { + // Avoid sending back to the originating socket + io.to(socketId).emit("sync-all-notifications-read", { timestamp }); + } + }); + createLogEvent(socket, "debug", `Synced all notifications read for ${email} in bodyshop ${bodyshopId}`); + } + } catch (error) { + createLogEvent(socket, "error", `Error syncing all notifications read: ${error.message}`); + } + }); + }; + // Call Handlers registerRoomAndBroadcastEvents(socket); registerUpdateEvents(socket); registerMessagingEvents(socket); registerDisconnectEvents(socket); + registerSyncEvents(socket); }; // Associate Middleware and Handlers