feature/IO-3096-GlobalNotifications -Read Status Sync accross all clients.

This commit is contained in:
Dave Richer
2025-02-27 20:16:33 -05:00
parent 3766c3d938
commit f6328d10f7
4 changed files with 266 additions and 122 deletions

View File

@@ -201,7 +201,7 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline
path="/manage/*" path="/manage/*"
element={ element={
<ErrorBoundary> <ErrorBoundary>
<SocketProvider bodyshop={bodyshop} navigate={navigate}> <SocketProvider bodyshop={bodyshop} navigate={navigate} currentUser={currentUser}>
<PrivateRoute isAuthorized={currentUser.authorized} /> <PrivateRoute isAuthorized={currentUser.authorized} />
</SocketProvider> </SocketProvider>
</ErrorBoundary> </ErrorBoundary>
@@ -213,7 +213,7 @@ export function App({ bodyshop, checkUserSession, currentUser, online, setOnline
path="/tech/*" path="/tech/*"
element={ element={
<ErrorBoundary> <ErrorBoundary>
<SocketProvider bodyshop={bodyshop} navigate={navigate}> <SocketProvider bodyshop={bodyshop} navigate={navigate} currentUser={currentUser}>
<PrivateRoute isAuthorized={currentUser.authorized} /> <PrivateRoute isAuthorized={currentUser.authorized} />
</SocketProvider> </SocketProvider>
</ErrorBoundary> </ErrorBoundary>

View File

@@ -11,14 +11,13 @@ import {
MARK_ALL_NOTIFICATIONS_READ, MARK_ALL_NOTIFICATIONS_READ,
MARK_NOTIFICATION_READ MARK_NOTIFICATION_READ
} from "../../graphql/notifications.queries.js"; } from "../../graphql/notifications.queries.js";
import { useMutation } from "@apollo/client"; import { gql, useMutation } from "@apollo/client";
const SocketContext = createContext(null); const SocketContext = createContext(null);
// This is how many notifications the database will populate on load, and the increment for load more
export const INITIAL_NOTIFICATIONS = 10; export const INITIAL_NOTIFICATIONS = 10;
export const SocketProvider = ({ children, bodyshop, navigate }) => { export const SocketProvider = ({ children, bodyshop, navigate, currentUser }) => {
const socketRef = useRef(null); const socketRef = useRef(null);
const [clientId, setClientId] = useState(null); const [clientId, setClientId] = useState(null);
const [isConnected, setIsConnected] = useState(false); const [isConnected, setIsConnected] = useState(false);
@@ -30,21 +29,21 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
const timestamp = new Date().toISOString(); const timestamp = new Date().toISOString();
const updatedNotification = update_notifications.returning[0]; const updatedNotification = update_notifications.returning[0];
// Update the notifications list
cache.modify({ cache.modify({
fields: { fields: {
notifications(existing = [], { readField }) { notifications(existing = [], { readField }) {
return existing.map((notif) => { return existing.map((notif) =>
if (readField("id", notif) === updatedNotification.id) { readField("id", notif) === updatedNotification.id
return { ...notif, read: timestamp }; ? {
} ...notif,
return notif; read: timestamp
}); }
: notif
);
} }
} }
}); });
// Update the unread count in notifications_aggregate
const unreadCountQuery = cache.readQuery({ const unreadCountQuery = cache.readQuery({
query: GET_UNREAD_COUNT, query: GET_UNREAD_COUNT,
variables: { associationid: userAssociationId } variables: { associationid: userAssociationId }
@@ -65,10 +64,16 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
} }
}); });
} }
if (socketRef.current && isConnected) {
socketRef.current.emit("sync-notification-read", {
email: currentUser?.email,
bodyshopId: bodyshop.id,
notificationId: updatedNotification.id
});
}
}, },
onError: (err) => { onError: (err) => console.error("MARK_NOTIFICATION_READ error:", err)
console.error("MARK_NOTIFICATION_READ error in SocketProvider:", err);
}
}); });
const [markAllNotificationsRead] = useMutation(MARK_ALL_NOTIFICATIONS_READ, { const [markAllNotificationsRead] = useMutation(MARK_ALL_NOTIFICATIONS_READ, {
@@ -78,12 +83,11 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
cache.modify({ cache.modify({
fields: { fields: {
notifications(existing = [], { readField }) { notifications(existing = [], { readField }) {
return existing.map((notif) => { return existing.map((notif) =>
if (readField("read", notif) === null && readField("associationid", notif) === userAssociationId) { readField("read", notif) === null && readField("associationid", notif) === userAssociationId
return { ...notif, read: timestamp }; ? { ...notif, read: timestamp }
} : notif
return notif; );
});
}, },
notifications_aggregate() { notifications_aggregate() {
return { aggregate: { count: 0, __typename: "notifications_aggregate_fields" } }; return { aggregate: { count: 0, __typename: "notifications_aggregate_fields" } };
@@ -94,21 +98,13 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
const baseWhereClause = { associationid: { _eq: userAssociationId } }; const baseWhereClause = { associationid: { _eq: userAssociationId } };
const cachedNotifications = cache.readQuery({ const cachedNotifications = cache.readQuery({
query: GET_NOTIFICATIONS, query: GET_NOTIFICATIONS,
variables: { variables: { limit: INITIAL_NOTIFICATIONS, offset: 0, where: baseWhereClause }
limit: INITIAL_NOTIFICATIONS,
offset: 0,
where: baseWhereClause
}
}); });
if (cachedNotifications?.notifications) { if (cachedNotifications?.notifications) {
cache.writeQuery({ cache.writeQuery({
query: GET_NOTIFICATIONS, query: GET_NOTIFICATIONS,
variables: { variables: { limit: INITIAL_NOTIFICATIONS, offset: 0, where: baseWhereClause },
limit: INITIAL_NOTIFICATIONS,
offset: 0,
where: baseWhereClause
},
data: { data: {
notifications: cachedNotifications.notifications.map((notif) => notifications: cachedNotifications.notifications.map((notif) =>
notif.read === null ? { ...notif, read: timestamp } : notif notif.read === null ? { ...notif, read: timestamp } : notif
@@ -116,10 +112,15 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
} }
}); });
} }
if (socketRef.current && isConnected) {
socketRef.current.emit("sync-all-notifications-read", {
email: currentUser?.email,
bodyshopId: bodyshop.id
});
}
}, },
onError: (err) => { onError: (err) => console.error("MARK_ALL_NOTIFICATIONS_READ error:", err)
console.error("MARK_ALL_NOTIFICATIONS_READ error in SocketProvider:", err);
}
}); });
useEffect(() => { useEffect(() => {
@@ -191,7 +192,6 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
const handleNotification = (data) => { const handleNotification = (data) => {
const { jobId, jobRoNumber, notificationId, associationId, notifications } = data; const { jobId, jobRoNumber, notificationId, associationId, notifications } = data;
if (associationId !== userAssociationId) return; if (associationId !== userAssociationId) return;
const newNotification = { const newNotification = {
@@ -204,9 +204,7 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
scenario_meta: JSON.stringify(notifications.map((notif) => notif.variables || {})), scenario_meta: JSON.stringify(notifications.map((notif) => notif.variables || {})),
created_at: new Date(notifications[0].timestamp).toISOString(), created_at: new Date(notifications[0].timestamp).toISOString(),
read: null, read: null,
job: { job: { ro_number: jobRoNumber }
ro_number: jobRoNumber
}
}; };
const baseVariables = { const baseVariables = {
@@ -221,84 +219,166 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
query: GET_NOTIFICATIONS, query: GET_NOTIFICATIONS,
variables: baseVariables variables: baseVariables
})?.notifications || []; })?.notifications || [];
if (!existingNotifications.some((n) => n.id === newNotification.id)) {
if (existingNotifications.some((n) => n.id === newNotification.id)) {
return;
}
client.cache.writeQuery({
query: GET_NOTIFICATIONS,
variables: baseVariables,
data: {
notifications: [newNotification, ...existingNotifications].sort(
(a, b) => new Date(b.created_at) - new Date(a.created_at)
)
},
broadcast: true
});
const unreadVariables = {
...baseVariables,
where: { ...baseVariables.where, read: { _is_null: true } }
};
const unreadNotifications =
client.cache.readQuery({
query: GET_NOTIFICATIONS,
variables: unreadVariables
})?.notifications || [];
if (newNotification.read === null && !unreadNotifications.some((n) => n.id === newNotification.id)) {
client.cache.writeQuery({ client.cache.writeQuery({
query: GET_NOTIFICATIONS, query: GET_NOTIFICATIONS,
variables: unreadVariables, variables: baseVariables,
data: { data: {
notifications: [newNotification, ...unreadNotifications].sort( notifications: [newNotification, ...existingNotifications].sort(
(a, b) => new Date(b.created_at) - new Date(a.created_at) (a, b) => new Date(b.created_at) - new Date(a.created_at)
) )
}, },
broadcast: true broadcast: true
}); });
const unreadVariables = {
...baseVariables,
where: { ...baseVariables.where, read: { _is_null: true } }
};
const unreadNotifications =
client.cache.readQuery({
query: GET_NOTIFICATIONS,
variables: unreadVariables
})?.notifications || [];
if (newNotification.read === null && !unreadNotifications.some((n) => n.id === newNotification.id)) {
client.cache.writeQuery({
query: GET_NOTIFICATIONS,
variables: unreadVariables,
data: {
notifications: [newNotification, ...unreadNotifications].sort(
(a, b) => new Date(b.created_at) - new Date(a.created_at)
)
},
broadcast: true
});
}
client.cache.modify({
id: "ROOT_QUERY",
fields: {
notifications_aggregate(existing = { aggregate: { count: 0 } }) {
return {
...existing,
aggregate: {
...existing.aggregate,
count: existing.aggregate.count + (newNotification.read === null ? 1 : 0)
}
};
}
}
});
notification.info({
message: `Changes for ${jobRoNumber}:`,
description: (
<ul
className="notification-alert-unorderd-list"
onClick={() => {
markNotificationRead({ variables: { id: notificationId } })
.then(() => navigate(`/manage/jobs/${jobId}`))
.catch((e) => console.error(`Error marking notification read: ${e?.message || ""}`));
}}
style={{ cursor: "pointer" }}
>
{notifications.map((notif, index) => (
<li className="notification-alert-unorderd-list-item" key={index}>
{notif.body}
</li>
))}
</ul>
)
});
}
} catch (error) {
console.error(`Error handling new notification: ${error?.message || ""}`);
}
};
const handleSyncNotificationRead = ({ notificationId, timestamp }) => {
try {
const notificationRef = client.cache.identify({
__typename: "notifications",
id: notificationId
});
client.cache.writeFragment({
id: notificationRef,
fragment: gql`
fragment UpdateNotificationRead on notifications {
read
}
`,
data: { read: timestamp }
});
const unreadCountData = client.cache.readQuery({
query: GET_UNREAD_COUNT,
variables: { associationid: userAssociationId }
});
if (unreadCountData?.notifications_aggregate?.aggregate?.count > 0) {
const newCount = Math.max(unreadCountData.notifications_aggregate.aggregate.count - 1, 0);
client.cache.writeQuery({
query: GET_UNREAD_COUNT,
variables: { associationid: userAssociationId },
data: {
notifications_aggregate: {
__typename: "notifications_aggregate",
aggregate: {
__typename: "notifications_aggregate_fields",
count: newCount
}
}
}
});
}
} catch (error) {
console.error("Error in handleSyncNotificationRead:", error);
}
};
const handleSyncAllNotificationsRead = ({ timestamp }) => {
try {
const queryVars = {
limit: INITIAL_NOTIFICATIONS,
offset: 0,
where: { associationid: { _eq: userAssociationId } }
};
const cachedData = client.cache.readQuery({
query: GET_NOTIFICATIONS,
variables: queryVars
});
if (cachedData?.notifications) {
cachedData.notifications.forEach((notif) => {
if (!notif.read) {
const notifRef = client.cache.identify({ __typename: "notifications", id: notif.id });
client.cache.writeFragment({
id: notifRef,
fragment: gql`
fragment UpdateNotificationRead on notifications {
read
}
`,
data: { read: timestamp }
});
}
});
} }
client.cache.modify({ client.cache.writeQuery({
id: "ROOT_QUERY", query: GET_UNREAD_COUNT,
fields: { variables: { associationid: userAssociationId },
notifications_aggregate(existing = { aggregate: { count: 0 } }) { data: {
const isUnread = newNotification.read === null; notifications_aggregate: {
const countChange = isUnread ? 1 : 0; __typename: "notifications_aggregate",
return { aggregate: {
...existing, __typename: "notifications_aggregate_fields",
aggregate: { count: 0
...existing.aggregate, }
count: existing.aggregate.count + countChange
}
};
} }
} }
}); });
notification.info({
message: `Changes for ${jobRoNumber}:`,
description: (
<ul
className="notification-alert-unorderd-list"
onClick={() => {
markNotificationRead({ variables: { id: notificationId } })
.then(() => navigate(`/manage/jobs/${jobId}`))
.catch((e) => console.error(`Error marking notification read from info: ${e?.message || ""}`));
}}
style={{ cursor: "pointer" }}
>
{notifications.map((notif, index) => (
<li className="notification-alert-unorderd-list-item" key={index}>
{notif.body}
</li>
))}
</ul>
)
});
} catch (error) { } catch (error) {
console.error(`Something went wrong handling a new notification: ${error?.message || ""}`); console.error(`Error In HandleSyncAllNotificationsRead: ${error?.message || ""}`);
} }
}; };
@@ -307,21 +387,9 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
socketInstance.on("connect_error", handleConnectionError); socketInstance.on("connect_error", handleConnectionError);
socketInstance.on("disconnect", handleDisconnect); socketInstance.on("disconnect", handleDisconnect);
socketInstance.on("bodyshop-message", handleBodyshopMessage); socketInstance.on("bodyshop-message", handleBodyshopMessage);
socketInstance.on("message", (message) => {
try {
if (typeof message === "string" && message.startsWith("42")) {
const parsedMessage = JSON.parse(message.slice(2));
const [event, data] = parsedMessage;
if (event === "notification") handleNotification(data);
} else if (Array.isArray(message)) {
const [event, data] = message;
if (event === "notification") handleNotification(data);
}
} catch (error) {
console.error("Error parsing socket message:", error);
}
});
socketInstance.on("notification", handleNotification); socketInstance.on("notification", handleNotification);
socketInstance.on("sync-notification-read", handleSyncNotificationRead);
socketInstance.on("sync-all-notifications-read", handleSyncAllNotificationsRead);
}; };
const unsubscribe = auth.onIdTokenChanged(async (user) => { const unsubscribe = auth.onIdTokenChanged(async (user) => {
@@ -351,7 +419,15 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
setIsConnected(false); setIsConnected(false);
} }
}; };
}, [bodyshop, notification, userAssociationId, markNotificationRead, markAllNotificationsRead, navigate]); }, [
bodyshop,
notification,
userAssociationId,
markNotificationRead,
markAllNotificationsRead,
navigate,
currentUser
]);
return ( return (
<SocketContext.Provider <SocketContext.Provider
@@ -370,9 +446,8 @@ export const SocketProvider = ({ children, bodyshop, navigate }) => {
export const useSocket = () => { export const useSocket = () => {
const context = useContext(SocketContext); const context = useContext(SocketContext);
if (!context) { // NOTE: Not sure if we absolutely require this, does cause slipups on dev env
throw new Error("useSocket must be used within a SocketProvider"); if (!context) throw new Error("useSocket must be used within a SocketProvider");
}
return context; return context;
}; };

View File

@@ -213,6 +213,27 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
} }
}; };
const getUserSocketMappingByBodyshop = async (email, bodyshopId) => {
const userKey = `user:${email}`;
const socketMappingKey = `${userKey}:socketMapping`;
try {
// Retrieve all socket mappings for the user
const mapping = await pubClient.hgetall(socketMappingKey);
const ttl = await pubClient.ttl(socketMappingKey);
// Filter socket IDs for the provided bodyshopId
const socketIds = Object.entries(mapping).reduce((acc, [socketId, bId]) => {
if (bId === bodyshopId) {
acc.push(socketId);
}
return acc;
}, []);
return { socketIds, ttl };
} catch (error) {
logger.log(`Error retrieving socket mappings for ${email} by bodyshop ${bodyshopId}: ${error}`, "ERROR", "redis");
throw error;
}
};
const api = { const api = {
setSessionData, setSessionData,
getSessionData, getSessionData,
@@ -228,6 +249,7 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
getUsersInRoom, getUsersInRoom,
addUserSocketMapping, addUserSocketMapping,
removeUserSocketMapping, removeUserSocketMapping,
getUserSocketMappingByBodyshop,
getUserSocketMapping, getUserSocketMapping,
refreshUserSocketTTL refreshUserSocketTTL
}; };

View File

@@ -2,7 +2,7 @@ const { admin } = require("../firebase/firebase-handler");
const redisSocketEvents = ({ const redisSocketEvents = ({
io, io,
redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL }, redisHelpers: { addUserSocketMapping, removeUserSocketMapping, refreshUserSocketTTL, getUserSocketMappingByBodyshop },
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom }, ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
logger logger
}) => { }) => {
@@ -182,11 +182,58 @@ const redisSocketEvents = ({
socket.on("leave-bodyshop-conversation", leaveConversationRoom); socket.on("leave-bodyshop-conversation", leaveConversationRoom);
}; };
// Sync Notification Read Events
const registerSyncEvents = (socket) => {
socket.on("sync-notification-read", async ({ email, bodyshopId, notificationId }) => {
try {
const userEmail = socket.user.email;
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-notification-read", { notificationId, timestamp });
}
});
createLogEvent(
socket,
"debug",
`Synced notification ${notificationId} read for ${userEmail} in bodyshop ${bodyshopId}`
);
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing notification read: ${error.message}`);
}
});
socket.on("sync-all-notifications-read", async ({ email, bodyshopId }) => {
try {
const socketMapping = await getUserSocketMappingByBodyshop(email, bodyshopId);
const timestamp = new Date().toISOString();
if (socketMapping?.socketIds) {
socketMapping?.socketIds.forEach((socketId) => {
if (socketId !== socket.id) {
// Avoid sending back to the originating socket
io.to(socketId).emit("sync-all-notifications-read", { timestamp });
}
});
createLogEvent(socket, "debug", `Synced all notifications read for ${email} in bodyshop ${bodyshopId}`);
}
} catch (error) {
createLogEvent(socket, "error", `Error syncing all notifications read: ${error.message}`);
}
});
};
// Call Handlers // Call Handlers
registerRoomAndBroadcastEvents(socket); registerRoomAndBroadcastEvents(socket);
registerUpdateEvents(socket); registerUpdateEvents(socket);
registerMessagingEvents(socket); registerMessagingEvents(socket);
registerDisconnectEvents(socket); registerDisconnectEvents(socket);
registerSyncEvents(socket);
}; };
// Associate Middleware and Handlers // Associate Middleware and Handlers