diff --git a/client/src/components/chat-affix/registerMessagingSocketHandlers.js b/client/src/components/chat-affix/registerMessagingSocketHandlers.js index 76cc2c741..5b4f34ea1 100644 --- a/client/src/components/chat-affix/registerMessagingSocketHandlers.js +++ b/client/src/components/chat-affix/registerMessagingSocketHandlers.js @@ -14,40 +14,46 @@ export const registerMessagingHandlers = ({ socket, client }) => { const handleNewMessageSummary = async (message) => { const { conversationId, newConversation, existingConversation, isoutbound } = message; - logLocal("handleNewMessageSummary", message); + logLocal("handleNewMessageSummary - Start", { message, isNew: !existingConversation }); const queryVariables = { offset: 0 }; + // Utility function to enrich conversation data + const enrichConversation = (conversation, isOutbound) => ({ + ...conversation, + updated_at: conversation.updated_at || new Date().toISOString(), + unreadcnt: conversation.unreadcnt || 0, + archived: conversation.archived || false, + label: conversation.label || null, + job_conversations: conversation.job_conversations || [], + messages_aggregate: conversation.messages_aggregate || { + __typename: "messages_aggregate", + aggregate: { + __typename: "messages_aggregate_fields", + count: isOutbound ? 0 : 1 + } + }, + __typename: "conversations" + }); + // Handle new conversation if (!existingConversation && newConversation?.phone_num) { + logLocal("handleNewMessageSummary - New Conversation", newConversation); + try { const queryResults = client.cache.readQuery({ query: CONVERSATION_LIST_QUERY, variables: queryVariables }); - const enrichedConversation = { - ...newConversation, - updated_at: newConversation.updated_at || new Date().toISOString(), - unreadcnt: newConversation.unreadcnt || 0, - archived: newConversation.archived || false, - label: newConversation.label || null, - job_conversations: newConversation.job_conversations || [], - messages_aggregate: newConversation.messages_aggregate || { - __typename: "messages_aggregate", - aggregate: { - __typename: "messages_aggregate_fields", - count: isoutbound ? 0 : 1 - } - }, - __typename: "conversations" - }; + const enrichedConversation = enrichConversation(newConversation, isoutbound); - client.cache.writeQuery({ - query: CONVERSATION_LIST_QUERY, - variables: queryVariables, - data: { - conversations: [enrichedConversation, ...(queryResults?.conversations || [])] + client.cache.modify({ + id: "ROOT_QUERY", + fields: { + conversations(existingConversations = []) { + return [enrichedConversation, ...existingConversations]; + } } }); } catch (error) { @@ -60,13 +66,10 @@ export const registerMessagingHandlers = ({ socket, client }) => { if (existingConversation) { let conversationDetails; - // Fetch or read the conversation details + // Attempt to read existing conversation details from cache try { conversationDetails = client.cache.readFragment({ - id: client.cache.identify({ - __typename: "conversations", - id: conversationId - }), + id: client.cache.identify({ __typename: "conversations", id: conversationId }), fragment: gql` fragment ExistingConversation on conversations { id @@ -89,9 +92,10 @@ export const registerMessagingHandlers = ({ socket, client }) => { ` }); } catch (error) { - console.warn("Conversation not found in cache, querying server..."); + logLocal("handleNewMessageSummary - Cache miss for conversation, fetching from server", { conversationId }); } + // Fetch conversation details from server if not in cache if (!conversationDetails) { try { const { data } = await client.query({ @@ -106,12 +110,14 @@ export const registerMessagingHandlers = ({ socket, client }) => { } } + // Validate that conversation details were retrieved if (!conversationDetails) { console.error("Unable to retrieve conversation details. Skipping cache update."); return; } try { + // Check if the conversation is already in the cache const queryResults = client.cache.readQuery({ query: CONVERSATION_LIST_QUERY, variables: queryVariables @@ -120,46 +126,32 @@ export const registerMessagingHandlers = ({ socket, client }) => { const isAlreadyInCache = queryResults?.conversations.some((conv) => conv.id === conversationId); if (!isAlreadyInCache) { - const enrichedConversation = { - ...conversationDetails, - archived: false, - __typename: "conversations", - messages_aggregate: { - __typename: "messages_aggregate", - aggregate: { - __typename: "messages_aggregate_fields", - count: - conversationDetails.messages?.filter( - (message) => !message.read && !message.isoutbound // Count unread, inbound messages - ).length || 0 - } - } - }; + const enrichedConversation = enrichConversation(conversationDetails, isoutbound); - client.cache.writeQuery({ - query: CONVERSATION_LIST_QUERY, - variables: queryVariables, - data: { - conversations: [enrichedConversation, ...(queryResults?.conversations || [])] + client.cache.modify({ + id: "ROOT_QUERY", + fields: { + conversations(existingConversations = []) { + return [enrichedConversation, ...existingConversations]; + } } }); } - // Update existing conversation fields + + // Update fields for the existing conversation in the cache client.cache.modify({ - id: client.cache.identify({ - __typename: "conversations", - id: conversationId - }), + id: client.cache.identify({ __typename: "conversations", id: conversationId }), fields: { updated_at: () => new Date().toISOString(), archived: () => false, - messages_aggregate(cached) { + messages_aggregate(cached = { aggregate: { count: 0 } }) { + const currentCount = cached.aggregate?.count || 0; if (!isoutbound) { return { __typename: "messages_aggregate", aggregate: { __typename: "messages_aggregate_fields", - count: cached.aggregate.count + 1 + count: currentCount + 1 } }; } @@ -176,80 +168,107 @@ export const registerMessagingHandlers = ({ socket, client }) => { const handleNewMessageDetailed = (message) => { const { conversationId, newMessage } = message; - logLocal("handleNewMessageDetailed", message); + logLocal("handleNewMessageDetailed - Start", message); - // Append the new message to the conversation's message list - const queryResults = client.cache.readQuery({ - query: GET_CONVERSATION_DETAILS, - variables: { conversationId } - }); - - if (queryResults) { - client.cache.writeQuery({ + try { + // Check if the conversation exists in the cache + const queryResults = client.cache.readQuery({ query: GET_CONVERSATION_DETAILS, - variables: { conversationId }, - data: { - ...queryResults, - conversations_by_pk: { - ...queryResults.conversations_by_pk, - messages: [...queryResults.conversations_by_pk.messages, newMessage] + variables: { conversationId } + }); + + if (!queryResults?.conversations_by_pk) { + console.warn("Conversation not found in cache:", { conversationId }); + return; + } + + // Append the new message to the conversation's message list using cache.modify + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + messages(existingMessages = []) { + return [...existingMessages, newMessage]; } } }); + + logLocal("handleNewMessageDetailed - Message appended successfully", { conversationId, newMessage }); + } catch (error) { + console.error("Error updating conversation messages in cache:", error); } }; const handleMessageChanged = (message) => { - if (!message) return; + if (!message) { + logLocal("handleMessageChanged - No message provided", message); + return; + } - logLocal("handleMessageChanged", message); + logLocal("handleMessageChanged - Start", message); - client.cache.modify({ - id: client.cache.identify({ __typename: "conversations", id: message.conversationid }), - fields: { - ...(message.type === "status-changed" && { - messages(existing = [], { readField }) { - return existing.map((messageRef) => { - // Match the message by ID + try { + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: message.conversationid }), + fields: { + messages(existingMessages = [], { readField }) { + return existingMessages.map((messageRef) => { + // Check if this is the message to update if (readField("id", messageRef) === message.id) { const currentStatus = readField("status", messageRef); - // Prevent overwriting if the current status is already "delivered" - if (currentStatus === "delivered") { - return messageRef; - } - - // Update the existing message fields - return client.cache.writeFragment({ - id: messageRef.__ref, - fragment: gql` - fragment UpdatedMessage on messages { - id - status - conversationid - __typename + // Handle known types of message changes + switch (message.type) { + case "status-changed": + // Prevent overwriting if the current status is already "delivered" + if (currentStatus === "delivered") { + logLocal("handleMessageChanged - Status already delivered, skipping update", { + messageId: message.id + }); + return messageRef; } - `, - data: { - __typename: "messages", - ...message // Only update the fields provided in the message object - } - }); + + // Update the status field + return { + ...messageRef, + status: message.status + }; + + case "text-updated": + // Handle changes to the message text + return { + ...messageRef, + text: message.text + }; + + // Add cases for other known message types as needed + + default: + // Log a warning for unhandled message types + logLocal("handleMessageChanged - Unhandled message type", { type: message.type }); + return messageRef; + } } return messageRef; // Keep other messages unchanged }); } - }) - } - }); + } + }); + + logLocal("handleMessageChanged - Message updated successfully", { messageId: message.id, type: message.type }); + } catch (error) { + console.error("handleMessageChanged - Error modifying cache:", error); + } }; const handleConversationChanged = async (data) => { - if (!data) return; + if (!data) { + logLocal("handleConversationChanged - No data provided", data); + return; + } - const { conversationId, type, job_conversations, ...fields } = data; - logLocal("handleConversationChanged", data); + const { conversationId, type, job_conversations, messageIds, ...fields } = data; + logLocal("handleConversationChanged - Start", data); const updatedAt = new Date().toISOString(); @@ -263,9 +282,7 @@ export const registerMessagingHandlers = ({ socket, client }) => { const updatedList = existingList?.conversations ? [ newConversation, - ...existingList.conversations.filter( - (conv) => conv.id !== newConversation.id // Prevent duplicates - ) + ...existingList.conversations.filter((conv) => conv.id !== newConversation.id) // Prevent duplicates ] : [newConversation]; @@ -276,129 +293,149 @@ export const registerMessagingHandlers = ({ socket, client }) => { conversations: updatedList } }); + + logLocal("handleConversationChanged - Conversation list updated successfully", newConversation); } catch (error) { console.error("Error updating conversation list in the cache:", error); } }; - if (type === "conversation-created") { - updateConversationList({ ...fields, job_conversations, updated_at: updatedAt }); - return; - } + // Handle specific types + try { + switch (type) { + case "conversation-marked-read": + if (conversationId && messageIds?.length > 0) { + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + messages(existingMessages = [], { readField }) { + return existingMessages.map((message) => { + if (messageIds.includes(readField("id", message))) { + return { ...message, read: true }; + } + return message; + }); + }, + messages_aggregate: () => ({ + __typename: "messages_aggregate", + aggregate: { __typename: "messages_aggregate_fields", count: 0 } + }) + } + }); + } + break; - const cacheId = client.cache.identify({ - __typename: "conversations", - id: conversationId - }); + case "conversation-created": + updateConversationList({ ...fields, job_conversations, updated_at: updatedAt }); + break; - if (!cacheId) { - console.error(`Could not find conversation with id: ${conversationId}`); - return; - } + case "conversation-unarchived": + case "conversation-archived": + // Would like to someday figure out how to get this working without refetch queries, + // But I have but a solid 4 hours into it, and there are just too many weird occurrences + try { + const listQueryVariables = { offset: 0 }; + const detailsQueryVariables = { conversationId }; - if (type === "conversation-unarchived" || type === "conversation-archived") { - try { - const listQueryVariables = { offset: 0 }; - const detailsQueryVariables = { conversationId }; + // Refetch conversation list and details + await client.refetchQueries({ + include: [CONVERSATION_LIST_QUERY, GET_CONVERSATION_DETAILS], + variables: [ + { query: CONVERSATION_LIST_QUERY, variables: listQueryVariables }, + { query: GET_CONVERSATION_DETAILS, variables: detailsQueryVariables } + ] + }); - // Refetch the conversation list and details queries - await client.refetchQueries({ - include: [CONVERSATION_LIST_QUERY, GET_CONVERSATION_DETAILS], - variables: [ - { query: CONVERSATION_LIST_QUERY, variables: listQueryVariables }, - { query: GET_CONVERSATION_DETAILS, variables: detailsQueryVariables } - ] - }); + logLocal("handleConversationChanged - Refetched queries after state change", { conversationId, type }); + } catch (error) { + console.error("Error refetching queries after conversation state change:", error); + } + break; - console.log("Refetched conversation list and details after state change."); - } catch (error) { - console.error("Error refetching queries after conversation state change:", error); + case "tag-added": + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + job_conversations: (existing = []) => [...existing, ...job_conversations] + } + }); + break; + + case "tag-removed": + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + job_conversations: (existing = [], { readField }) => + existing.filter((jobRef) => readField("jobid", jobRef) !== fields.jobId) + } + }); + break; + + default: + logLocal("handleConversationChanged - Unhandled type", { type }); + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + ...Object.fromEntries( + Object.entries(fields).map(([key, value]) => [key, (cached) => (value !== undefined ? value : cached)]) + ) + } + }); } - - return; + } catch (error) { + console.error("Error handling conversation changes:", { type, error }); } - // Handle other types of updates (e.g., marked read, tags added/removed) - client.cache.modify({ - id: cacheId, - fields: { - ...Object.fromEntries( - Object.entries(fields).map(([key, value]) => [key, (cached) => (value !== undefined ? value : cached)]) - ), - ...(type === "conversation-marked-read" && { - messages_aggregate: () => ({ - __typename: "messages_aggregate", - aggregate: { __typename: "messages_aggregate_fields", count: 0 } - }) - }), - ...(type === "tag-added" && { - job_conversations: (existing = []) => [...existing, ...job_conversations] - }), - ...(type === "tag-removed" && { - job_conversations: (existing = [], { readField }) => - existing.filter((jobRef) => readField("jobid", jobRef) !== data.jobId) - }) - } - }); }; const handleNewMessage = ({ conversationId, message }) => { if (!conversationId || !message?.id || !message?.text) { + logLocal("handleNewMessage - Missing conversationId or message details", { conversationId, message }); return; } - logLocal("handleNewMessage", { conversationId, message }); + logLocal("handleNewMessage - Start", { conversationId, message }); - client.cache.modify({ - id: client.cache.identify({ __typename: "conversations", id: conversationId }), - fields: { - messages(existing = []) { - // Ensure that the `message` object matches the schema - const newMessageRef = client.cache.writeFragment({ - data: { - __typename: "messages", - id: message.id, - body: message.text, - selectedMedia: message.image_path || [], - imexshopid: message.userid, - status: message.status, - created_at: message.created_at, - read: message.read - }, - fragment: gql` - fragment NewMessage on messages { - id - body - selectedMedia - imexshopid - status - created_at - read - } - ` - }); + try { + // Add the new message to the cache + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + messages(existing = []) { + // Write the new message to the cache + const newMessageRef = client.cache.writeFragment({ + data: { + __typename: "messages", + id: message.id, + text: message.text, + selectedMedia: message.image_path || [], + imexshopid: message.userid, + status: message.status, + created_at: message.created_at, + read: message.read + }, + fragment: gql` + fragment NewMessage on messages { + id + text + selectedMedia + imexshopid + status + created_at + read + } + ` + }); - // Prevent duplicates by checking if the message already exists - const isDuplicate = existing.some( - (msgRef) => - client.cache.readFragment({ - id: msgRef.__ref, - fragment: gql` - fragment CheckMessage on messages { - id - } - ` - })?.id === message.id - ); - - // We already have it, so return the existing list - if (isDuplicate) { - return existing; + // The merge function defined in the cache will handle deduplication + return [...existing, newMessageRef]; } - - return [...existing, newMessageRef]; // Add the new message reference } - } - }); + }); + + logLocal("handleNewMessage - Message added to cache", { conversationId, messageId: message.id }); + } catch (error) { + console.error("handleNewMessage - Error modifying cache:", error); + } }; socket.on("new-message-summary", handleNewMessageSummary); diff --git a/client/src/components/chat-conversation/chat-conversation.container.jsx b/client/src/components/chat-conversation/chat-conversation.container.jsx index 613d96288..3b7f8385b 100644 --- a/client/src/components/chat-conversation/chat-conversation.container.jsx +++ b/client/src/components/chat-conversation/chat-conversation.container.jsx @@ -1,4 +1,4 @@ -import { useQuery } from "@apollo/client"; +import { useApolloClient, useQuery } from "@apollo/client"; import axios from "axios"; import React, { useContext, useEffect, useState } from "react"; import { connect } from "react-redux"; @@ -17,19 +17,73 @@ const mapStateToProps = createStructuredSelector({ export default connect(mapStateToProps, null)(ChatConversationContainer); export function ChatConversationContainer({ bodyshop, selectedConversation }) { + const client = useApolloClient(); + const { socket } = useContext(SocketContext); + const [markingAsReadInProgress, setMarkingAsReadInProgress] = useState(false); + const { loading: convoLoading, error: convoError, data: convoData } = useQuery(GET_CONVERSATION_DETAILS, { variables: { conversationId: selectedConversation }, - fetchPolicy: "network-only", - nextFetchPolicy: "network-only" + fetchPolicy: "network-only" }); - const { socket } = useContext(SocketContext); + // Utility to update Apollo cache + const updateCacheWithReadMessages = (conversationId, messageIds) => { + if (!conversationId || !messageIds || messageIds.length === 0) return; + + messageIds.forEach((messageId) => { + client.cache.modify({ + id: client.cache.identify({ __typename: "messages", id: messageId }), + fields: { + read() { + return true; // Mark message as read + } + } + }); + }); + + // Optionally update aggregate unread count + client.cache.modify({ + id: client.cache.identify({ __typename: "conversations", id: conversationId }), + fields: { + messages_aggregate(existingAggregate) { + const updatedAggregate = { + ...existingAggregate, + aggregate: { + ...existingAggregate.aggregate, + count: 0 // No unread messages remaining + } + }; + return updatedAggregate; + } + } + }); + }; + + // Handle WebSocket events + useEffect(() => { + if (!socket || !socket.connected) return; + + const handleConversationChange = (data) => { + if (data.type === "conversation-marked-read") { + const { conversationId, messageIds } = data; + console.log("Conversation change received:", data); + updateCacheWithReadMessages(conversationId, messageIds); + } + }; + + socket.on("conversation-changed", handleConversationChange); + + return () => { + socket.off("conversation-changed", handleConversationChange); + }; + }, [socket, client]); + + // Handle joining/leaving conversation useEffect(() => { - // Early gate, we have no socket, bail. if (!socket || !socket.connected) return; socket.emit("join-bodyshop-conversation", { @@ -45,25 +99,41 @@ export function ChatConversationContainer({ bodyshop, selectedConversation }) { }; }, [selectedConversation, bodyshop, socket]); - const [markingAsReadInProgress, setMarkingAsReadInProgress] = useState(false); - - const unreadCount = - convoData && - convoData.conversations_by_pk && - convoData.conversations_by_pk.messages && - convoData.conversations_by_pk.messages.reduce((acc, val) => { - return !val.read && !val.isoutbound ? acc + 1 : acc; - }, 0); - + // Handle marking conversation as read const handleMarkConversationAsRead = async () => { - if (unreadCount > 0 && !!selectedConversation && !markingAsReadInProgress) { + if (!convoData || !selectedConversation || markingAsReadInProgress) return; + + const conversation = convoData.conversations_by_pk; + if (!conversation) { + console.warn(`No data found for conversation ID: ${selectedConversation}`); + return; + } + + const unreadMessageIds = conversation.messages + ?.filter((message) => !message.read && !message.isoutbound) + .map((message) => message.id); + + if (unreadMessageIds?.length > 0) { setMarkingAsReadInProgress(true); - await axios.post("/sms/markConversationRead", { - conversationid: selectedConversation, - imexshopid: bodyshop.imexshopid, - bodyshopid: bodyshop.id - }); - setMarkingAsReadInProgress(false); + + try { + const payload = { + conversation, + imexshopid: bodyshop?.imexshopid, + bodyshopid: bodyshop?.id + }; + + console.log("Marking conversation as read:", payload); + + await axios.post("/sms/markConversationRead", payload); + + // Update local cache + updateCacheWithReadMessages(selectedConversation, unreadMessageIds); + } catch (error) { + console.error("Error marking conversation as read:", error.response?.data || error.message); + } finally { + setMarkingAsReadInProgress(false); + } } }; diff --git a/client/src/utils/GraphQLClient.js b/client/src/utils/GraphQLClient.js index e0d409bd7..423c38e54 100644 --- a/client/src/utils/GraphQLClient.js +++ b/client/src/utils/GraphQLClient.js @@ -162,6 +162,22 @@ const cache = new InMemoryCache({ (incomingItem) => !existing.some((existingItem) => existingItem.__ref === incomingItem.__ref) ) ]; + return merged; + } + }, + messages: { + keyArgs: false, // Ignore arguments when determining uniqueness (like `order_by`). + merge(existing = [], incoming = [], { readField }) { + const existingIds = new Set(existing.map((message) => readField("id", message))); + + // Merge incoming messages, avoiding duplicates + const merged = [...existing]; + incoming.forEach((message) => { + if (!existingIds.has(readField("id", message))) { + merged.push(message); + } + }); + return merged; } } diff --git a/server/graphql-client/queries.js b/server/graphql-client/queries.js index 72224dfb6..e70144775 100644 --- a/server/graphql-client/queries.js +++ b/server/graphql-client/queries.js @@ -2569,6 +2569,9 @@ exports.GET_JOBS_BY_PKS = `query GET_JOBS_BY_PKS($ids: [uuid!]!) { exports.MARK_MESSAGES_AS_READ = `mutation MARK_MESSAGES_AS_READ($conversationId: uuid!) { update_messages(where: { conversationid: { _eq: $conversationId } }, _set: { read: true }) { + returning { + id + } affected_rows } } diff --git a/server/sms/status.js b/server/sms/status.js index 51d2f7ecb..0e29bbf8f 100644 --- a/server/sms/status.js +++ b/server/sms/status.js @@ -58,48 +58,49 @@ exports.status = async (req, res) => { logger.log("sms-status-update-error", "ERROR", "api", null, { msid: SmsSid, fields: { status: SmsStatus }, - error + stack: error.stack, + message: error.message }); res.status(500).json({ error: "Failed to update message status." }); } }; exports.markConversationRead = async (req, res) => { - const { conversationid, imexshopid, bodyshopid } = req.body; const { ioRedis, ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom } } = req; + const { conversation, imexshopid, bodyshopid } = req.body; + + // Alternatively, support both payload formats + const conversationId = conversation?.id || req.body.conversationId; + + if (!conversationId || !imexshopid || !bodyshopid) { + return res.status(400).json({ error: "Invalid conversation data provided." }); + } try { - // Mark messages in the conversation as read const response = await client.request(queries.MARK_MESSAGES_AS_READ, { - conversationId: conversationid + conversationId }); - const updatedMessages = response.update_messages.affected_rows; - - logger.log("conversation-mark-read", "DEBUG", "api", null, { - conversationid, - imexshopid, - bodyshopid, - updatedMessages - }); + const updatedMessageIds = response.update_messages.returning.map((message) => message.id); const broadcastRoom = getBodyshopRoom(bodyshopid); ioRedis.to(broadcastRoom).emit("conversation-changed", { type: "conversation-marked-read", - conversationId: conversationid + conversationId, + affectedMessages: response.update_messages.affected_rows, + messageIds: updatedMessageIds }); - res.status(200).json({ success: true, message: "Conversation marked as read." }); - } catch (error) { - logger.log("conversation-mark-read-error", "ERROR", "api", null, { - conversationid, - imexshopid, - error + res.status(200).json({ + success: true, + message: "Conversation marked as read." }); + } catch (error) { + console.error("Error marking conversation as read:", error); res.status(500).json({ error: "Failed to mark conversation as read." }); } };