feature/IO-3478-Mark-Conversation-Unread: Make Notifications more realtime

This commit is contained in:
Dave
2025-12-30 14:20:26 -05:00
parent 9dbe246575
commit 5b11587380
2 changed files with 65 additions and 38 deletions

View File

@@ -2,7 +2,6 @@ import { useApolloClient } from "@apollo/client";
import { getToken } from "@firebase/messaging";
import axios from "axios";
import { useEffect } from "react";
import { useTranslation } from "react-i18next";
import { messaging, requestForToken } from "../../firebase/firebase.utils";
import ChatPopupComponent from "../chat-popup/chat-popup.component";
import "./chat-affix.styles.scss";
@@ -10,14 +9,14 @@ import { registerMessagingHandlers, unregisterMessagingHandlers } from "./regist
import { useSocket } from "../../contexts/SocketIO/useSocket.js";
export function ChatAffixContainer({ bodyshop, chatVisible, currentUser }) {
const { t } = useTranslation();
const client = useApolloClient();
const { socket } = useSocket();
// 1) FCM subscription (independent of socket handler registration)
useEffect(() => {
if (!bodyshop?.messagingservicesid) return;
async function SubscribeToTopicForFCMNotification() {
async function subscribeToTopicForFCMNotification() {
try {
await requestForToken();
await axios.post("/notifications/subscribe", {
@@ -32,17 +31,35 @@ export function ChatAffixContainer({ bodyshop, chatVisible, currentUser }) {
}
}
SubscribeToTopicForFCMNotification();
subscribeToTopicForFCMNotification();
}, [bodyshop?.messagingservicesid, bodyshop?.imexshopid]);
// Register WebSocket handlers
if (socket?.connected) {
registerMessagingHandlers({ socket, client, currentUser, bodyshop, t });
// 2) Register socket handlers as soon as socket is connected (regardless of chatVisible)
useEffect(() => {
if (!socket) return;
if (!bodyshop?.messagingservicesid) return;
if (!bodyshop?.id) return;
return () => {
unregisterMessagingHandlers({ socket });
};
// If socket isn't connected yet, ensure no stale handlers remain.
if (!socket.connected) {
unregisterMessagingHandlers({ socket });
return;
}
}, [bodyshop, socket, t, client]);
// Prevent duplicate listeners if this effect runs more than once.
unregisterMessagingHandlers({ socket });
registerMessagingHandlers({
socket,
client,
currentUser,
bodyshop
});
return () => {
unregisterMessagingHandlers({ socket });
};
}, [socket, socket?.connected, bodyshop?.id, bodyshop?.messagingservicesid, client, currentUser?.email]);
if (!bodyshop?.messagingservicesid) return <></>;

View File

@@ -13,6 +13,9 @@ const logLocal = (message, ...args) => {
}
};
const safeIsoNow = () => new Date().toISOString();
const isSystemMsid = (msid) => typeof msid === "string" && msid.startsWith("SYS_");
const normalizeConversationForList = (raw, { isoutbound, isSystem } = {}) => {
const c = raw || {};
const id = c.id;
@@ -47,7 +50,6 @@ const normalizeConversationForList = (raw, { isoutbound, isSystem } = {}) => {
})
: [],
// This is the field your list badge reads (with args). We write it via a fragment with the same args.
messages_aggregate: c.messages_aggregate || {
__typename: "messages_aggregate",
aggregate: {
@@ -93,7 +95,6 @@ const normalizeMessageForCache = (raw, fallbackConversationId) => {
id: m.id,
conversationid: m.conversationid ?? m.conversation?.id ?? fallbackConversationId,
// Fields your UI queries expect
status: m.status ?? null,
text: m.text ?? "",
is_system: typeof m.is_system === "boolean" ? m.is_system : false,
@@ -101,7 +102,7 @@ const normalizeMessageForCache = (raw, fallbackConversationId) => {
image: typeof m.image === "boolean" ? m.image : false,
image_path: m.image_path ?? null,
userid: m.userid ?? null,
created_at: m.created_at ?? new Date().toISOString(),
created_at: m.created_at ?? safeIsoNow(),
read: typeof m.read === "boolean" ? m.read : false
};
};
@@ -117,10 +118,6 @@ const isConversationDetailsCached = (client, conversationId) => {
}
};
const isSystemMsid = (msid) => typeof msid === "string" && msid.startsWith("SYS_");
const safeIsoNow = () => new Date().toISOString();
const conversationDetailsCached = (client, conversationId) => {
try {
const res = client.cache.readQuery({
@@ -152,7 +149,6 @@ const messageEntityCached = (client, messageId) => {
}
};
// Normalize/enrich conversation data so it matches what CONVERSATION_LIST_QUERY expects
const enrichConversation = (conversation, { isoutbound, isSystem }) => ({
...conversation,
updated_at: conversation.updated_at || safeIsoNow(),
@@ -177,14 +173,12 @@ const upsertConversationIntoOffsetZeroList = (client, conversationObj, { isoutbo
const convCacheId = client.cache.identify(normalized);
if (!convCacheId) return;
// Seed the entity in the normalized store so we can safely reference it.
client.cache.writeFragment({
id: convCacheId,
fragment: CONVERSATION_LIST_ITEM_FRAGMENT,
data: normalized
});
// Insert/move it to top of the first page list (offset 0) only.
client.cache.modify({
id: "ROOT_QUERY",
fields: {
@@ -204,9 +198,25 @@ const upsertConversationIntoOffsetZeroList = (client, conversationObj, { isoutbo
export const registerMessagingHandlers = ({ socket, client, currentUser, bodyshop }) => {
if (!(socket && client)) return;
// Coalesce unread refetches (avoid spamming during bursts)
let unreadRefetchInFlight = null;
const refetchUnreadCount = () => {
if (unreadRefetchInFlight) return;
unreadRefetchInFlight = client
.refetchQueries({
include: ["UNREAD_CONVERSATION_COUNT"]
})
.catch(() => {
// best-effort
})
.finally(() => {
unreadRefetchInFlight = null;
});
};
const handleNewMessageSummary = async (message) => {
const { conversationId, newConversation, existingConversation, isoutbound, msid, updated_at } = message;
const isSystem = isSystemMsid(msid);
const isNewMessageSoundEnabled = (clientInstance) => {
@@ -230,9 +240,13 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
if (isLeaderTab(bodyshop.id) && isNewMessageSoundEnabled(client)) {
playNewMessageSound(bodyshop.id);
}
// Real-time badge update for affix (best-effort, coalesced)
if (!isSystem) {
refetchUnreadCount();
}
}
// If we think it's "new", sanity-check the cache: if conversation exists, treat as existing.
if (!existingConversation && conversationId) {
try {
const cachedConversation = client.cache.readFragment({
@@ -253,7 +267,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
}
}
// New conversation: upsert into offset-0 conversation list cache
if (!existingConversation && newConversation?.phone_num) {
try {
upsertConversationIntoOffsetZeroList(client, newConversation, { isoutbound, isSystem });
@@ -263,7 +276,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
return;
}
// Existing conversation: update updated_at and unread badge (only for inbound non-system)
if (existingConversation && conversationId) {
try {
client.cache.modify({
@@ -272,7 +284,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
updated_at: () => updated_at || safeIsoNow(),
archived: () => false,
// Badge in your list uses messages_aggregate.aggregate.count with is_system excluded
messages_aggregate(cached = null) {
if (isoutbound || isSystem) return cached;
@@ -286,7 +297,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
};
},
// Keep legacy unreadcnt reasonably in sync (if you still display it anywhere)
unreadcnt(cached) {
if (isoutbound || isSystem) return cached;
const n = typeof cached === "number" ? cached : 0;
@@ -294,12 +304,9 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
}
}
});
// Optional: bubble to top of offset-0 list by rewriting entity is enough
} catch (error) {
console.error("Error updating cache for existing conversation:", error);
}
return;
}
@@ -311,7 +318,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
logLocal("handleNewMessageDetailed - Start", message);
// If the conversation thread isn't open/cached, don't try to append messages
if (!conversationId || !isConversationDetailsCached(client, conversationId)) return;
try {
@@ -326,7 +332,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
return;
}
// Write the entity (no missing-field warnings because normalized includes is_system)
client.cache.writeFragment({
id: messageCacheId,
fragment: gql`
@@ -347,7 +352,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
data: normalized
});
// Append a ref (not a raw object) and avoid duplicates
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
@@ -357,7 +361,7 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
return [...existing, { __ref: messageCacheId }];
},
updated_at() {
return normalized.created_at || new Date().toISOString();
return normalized.created_at || safeIsoNow();
}
}
});
@@ -373,7 +377,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
logLocal("handleMessageChanged - Start", message);
// Only update if the message entity exists locally
if (!messageEntityCached(client, message.id)) return;
try {
@@ -422,7 +425,8 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
try {
switch (type) {
case "conversation-marked-read": {
// Update message entities only if details are cached, otherwise just update counters.
refetchUnreadCount();
if (detailsCached && Array.isArray(messageIds)) {
messageIds.forEach((id) => {
if (!messageEntityCached(client, id)) return;
@@ -449,6 +453,8 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
}
case "conversation-marked-unread": {
refetchUnreadCount();
const safeUnreadCount = typeof unreadCount === "number" ? unreadCount : 1;
const idsMarkedRead = Array.isArray(messageIdsMarkedRead) ? messageIdsMarkedRead : [];
@@ -488,6 +494,9 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
}
case "conversation-created": {
// New conversation likely implies new unread inbound message(s)
refetchUnreadCount();
const conv = enrichConversation(
{ id: conversationId, job_conversations, ...fields, updated_at: updatedAt },
{ isoutbound: false, isSystem: false }
@@ -498,7 +507,9 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
case "conversation-unarchived":
case "conversation-archived": {
// Correct refetch usage: this refetches any ACTIVE watchers for these documents.
// Keep unread badge correct even if archiving affects counts
refetchUnreadCount();
await client.refetchQueries({
include: [CONVERSATION_LIST_QUERY, GET_CONVERSATION_DETAILS]
});
@@ -546,7 +557,6 @@ export const registerMessagingHandlers = ({ socket, client, currentUser, bodysho
}
default: {
// Safe partial updates to the conversation entity
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: Object.fromEntries(