From e15384d0bf61dd9121f067de227708698c72eaea Mon Sep 17 00:00:00 2001 From: Dave Richer Date: Wed, 20 Nov 2024 12:23:50 -0800 Subject: [PATCH] feature/IO-3000-messaging-sockets-migrations2 - Everything but tagging and labels works Signed-off-by: Dave Richer --- .../registerMessagingSocketHandlers.js | 198 +++++++++++------- .../chat-conversation.container.jsx | 11 +- server/sms/status.js | 119 ++++++----- 3 files changed, 186 insertions(+), 142 deletions(-) diff --git a/client/src/components/chat-affix/registerMessagingSocketHandlers.js b/client/src/components/chat-affix/registerMessagingSocketHandlers.js index c6cd66c0e..339acd9a6 100644 --- a/client/src/components/chat-affix/registerMessagingSocketHandlers.js +++ b/client/src/components/chat-affix/registerMessagingSocketHandlers.js @@ -4,127 +4,169 @@ export const registerMessagingHandlers = ({ socket, client }) => { if (!(socket && client)) return; const handleNewMessageSummary = (message) => { - console.log("🚀 ~ SUMMARY CONSOLE LOG:", message); + const { conversationId, newConversation, existingConversation, isoutbound } = message; - if (!message.isoutbound) { - //It's an inbound message. - if (!message.existingConversation) { - //Do a read query. - const queryResults = client.cache.readQuery({ - query: CONVERSATION_LIST_QUERY, - variables: {} - }); - // Do a write query. Assume 0 unread messages to utilize code below. - client.cache.writeQuery({ - query: CONVERSATION_LIST_QUERY, - variables: {}, - data: { - conversations: [ - { ...message.newConversation, messages_aggregate: { aggregate: { count: 0 } } }, - ...queryResults - ] - } - }); - } + if (!existingConversation && newConversation?.phone_num) { + const queryResults = client.cache.readQuery({ + query: CONVERSATION_LIST_QUERY, + variables: { offset: 0 } + }); + + const fullConversation = { + ...newConversation, + phone_num: newConversation.phone_num, + id: newConversation.id, + updated_at: newConversation.updated_at || new Date().toISOString(), + unreadcnt: newConversation.unreadcnt || 0, + archived: newConversation.archived || false, + label: newConversation.label || null, + job_conversations: newConversation.job_conversations || [], + messages_aggregate: newConversation.messages_aggregate || { + aggregate: { count: isoutbound ? 0 : 1 } + } + }; + + client.cache.writeQuery({ + query: CONVERSATION_LIST_QUERY, + variables: { offset: 0 }, + data: { + conversations: [fullConversation, ...(queryResults?.conversations || [])] + } + }); + } else { client.cache.modify({ id: client.cache.identify({ __typename: "conversations", - id: message.conversationId + id: conversationId }), fields: { updated_at: () => new Date(), messages_aggregate(cached) { - return { aggregate: { count: cached.aggregate.count + 1 } }; + // Increment unread count only if the message is inbound + if (!isoutbound) { + return { aggregate: { count: cached.aggregate.count + 1 } }; + } + return cached; } } }); - client.cache.modify({ - fields: { - conversations(existingConversations = [], { readField }) { - return [ - { __ref: `conversations:${message.conversationId}` }, // TODO: This throws the cache merging error in apollo. - ...existingConversations.filter((c) => c.__ref !== `conversations:${message.conversationId}`) - ]; - } - } - }); - - client.cache.modify({ - fields: { - messages_aggregate(cached) { - return { aggregate: { count: cached.aggregate.count + 1 } }; - } - } - }); - } else { - //It's an outbound message - //Update the last updated for conversations in the list. If it's new, add it in. - // If it isn't just update the last updated at. - client.cache.modify({ - id: client.cache.identify({ - __typename: "conversations", - id: message.conversationId - }), - fields: { - updated_at: () => message.newMessage.updated_at - } - }); } }; - const handleNewMessageDetailed = (message) => { - console.log("🚀 ~ DETAIL CONSOLE LOG:", message); - //They're looking at the conversation right now. Need to merge into the list of messages i.e. append to the end. - //Add the message to the overall cache. + const { conversationId, newMessage } = message; - //Handle outbound messages - if (message.newMessage.isoutbound) { - const queryResults = client.cache.readQuery({ - query: GET_CONVERSATION_DETAILS, - variables: { conversationId: message.newMessage.conversationid } - }); + // Append the new message to the conversation's message list + const queryResults = client.cache.readQuery({ + query: GET_CONVERSATION_DETAILS, + variables: { conversationId } + }); + + if (queryResults) { client.cache.writeQuery({ query: GET_CONVERSATION_DETAILS, - variables: { conversationId: message.newMessage.conversationid }, + variables: { conversationId }, data: { ...queryResults, conversations_by_pk: { ...queryResults.conversations_by_pk, - messages: [...queryResults.conversations_by_pk.messages, message.newMessage] + messages: [...queryResults.conversations_by_pk.messages, newMessage] } } }); } - // We got this as a receive. - else { - } }; const handleMessageChanged = (message) => { - //Find it in the cache, and just update it based on what was sent. + // Find the message in the cache and update all fields dynamically client.cache.modify({ id: client.cache.identify({ __typename: "messages", id: message.id }), fields: { - //TODO: see if there is a way to have this update all fields e.g. only spread in updates rather than prescribing - updated_at: () => new Date(), - status(cached) { - return message.status; - } + // Dynamically update all fields based on the incoming message object + __typename: (existingType) => existingType || "messages", // Ensure __typename is preserved + ...Object.fromEntries( + Object.entries(message).map(([key, value]) => [ + key, + (cached) => (value !== undefined ? value : cached) // Update with new value or keep existing + ]) + ) } }); }; - const handleConversationChanged = (conversation) => { - //If it was archived, marked unread, etc. + const handleConversationChanged = (data) => { + const { type, conversationId, jobId, label } = data; + + switch (type) { + case "conversation-marked-read": + client.cache.modify({ + id: client.cache.identify({ + __typename: "conversations", + id: conversationId + }), + fields: { + messages_aggregate: () => ({ aggregate: { count: 0 } }) + } + }); + + // Optionally, refetch queries if needed + // client.refetchQueries({ + // include: [CONVERSATION_LIST_QUERY, GET_CONVERSATION_DETAILS] + // }); + break; + case "tag-added": + client.cache.modify({ + id: client.cache.identify({ + __typename: "conversations", + id: conversationId + }), + fields: { + job_conversations(existingJobConversations = []) { + return [...existingJobConversations, { __ref: `jobs:${jobId}` }]; + } + } + }); + break; + + case "tag-removed": + client.cache.modify({ + id: client.cache.identify({ + __typename: "conversations", + id: conversationId + }), + fields: { + job_conversations(existingJobConversations = []) { + return existingJobConversations.filter((jobRef) => jobRef.__ref !== `jobs:${jobId}`); + } + } + }); + break; + + case "label-changed": + client.cache.modify({ + id: client.cache.identify({ + __typename: "conversations", + id: conversationId + }), + fields: { + label() { + return label; + } + } + }); + break; + + default: + console.warn(`Unhandled conversation change type: ${type}`); + } }; socket.on("new-message-summary", handleNewMessageSummary); socket.on("new-message-detailed", handleNewMessageDetailed); socket.on("message-changed", handleMessageChanged); - socket.on("conversation-changed", handleConversationChanged); //TODO: Unread, mark as read, archived, unarchive, etc. + socket.on("conversation-changed", handleConversationChanged); }; export const unregisterMessagingHandlers = ({ socket }) => { diff --git a/client/src/components/chat-conversation/chat-conversation.container.jsx b/client/src/components/chat-conversation/chat-conversation.container.jsx index 9a57a43e2..58107ad7c 100644 --- a/client/src/components/chat-conversation/chat-conversation.container.jsx +++ b/client/src/components/chat-conversation/chat-conversation.container.jsx @@ -30,6 +30,9 @@ export function ChatConversationContainer({ bodyshop, selectedConversation }) { const { socket } = useContext(SocketContext); useEffect(() => { + // Early gate, we have no socket, bail. + if (!socket || !socket.connected) return; + socket.emit("join-bodyshop-conversation", { bodyshopId: bodyshop.id, conversationId: selectedConversation @@ -43,10 +46,6 @@ export function ChatConversationContainer({ bodyshop, selectedConversation }) { }; }, [selectedConversation, bodyshop, socket]); - // const { loading, error, data } = useSubscription(CONVERSATION_SUBSCRIPTION_BY_PK, { - // variables: { conversationId: selectedConversation } - // }); - const [markingAsReadInProgress, setMarkingAsReadInProgress] = useState(false); const unreadCount = @@ -60,10 +59,10 @@ export function ChatConversationContainer({ bodyshop, selectedConversation }) { const handleMarkConversationAsRead = async () => { if (unreadCount > 0 && !!selectedConversation && !markingAsReadInProgress) { setMarkingAsReadInProgress(true); - // await markConversationRead({}); await axios.post("/sms/markConversationRead", { conversationid: selectedConversation, - imexshopid: bodyshop.imexshopid + imexshopid: bodyshop.imexshopid, + bodyshopid: bodyshop.id }); setMarkingAsReadInProgress(false); } diff --git a/server/sms/status.js b/server/sms/status.js index 63c45b33e..9527ed5f8 100644 --- a/server/sms/status.js +++ b/server/sms/status.js @@ -5,29 +5,31 @@ require("dotenv").config({ const client = require("../graphql-client/graphql-client").client; const queries = require("../graphql-client/queries"); -const { phone } = require("phone"); const logger = require("../utils/logger"); -const { admin } = require("../firebase/firebase-handler"); -exports.status = (req, res) => { +exports.status = async (req, res) => { const { SmsSid, SmsStatus } = req.body; const { ioRedis, ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom } } = req; - client - .request(queries.UPDATE_MESSAGE_STATUS, { + + try { + // Update message status in the database + const response = await client.request(queries.UPDATE_MESSAGE_STATUS, { msid: SmsSid, fields: { status: SmsStatus } - }) - .then((response) => { + }); + + const message = response.update_messages.returning[0]; + + if (message) { logger.log("sms-status-update", "DEBUG", "api", null, { msid: SmsSid, fields: { status: SmsStatus } }); - // TODO Verify - const message = response.update_messages.returning[0]; + // Emit WebSocket event to notify the change in message status const conversationRoom = getBodyshopConversationRoom({ bodyshopId: message.conversation.bodyshopid, conversationId: message.conversationid @@ -36,69 +38,70 @@ exports.status = (req, res) => { ioRedis.to(conversationRoom).emit("message-changed", { message }); - }) - .catch((error) => { - logger.log("sms-status-update-error", "ERROR", "api", null, { + } else { + logger.log("sms-status-update-warning", "WARN", "api", null, { msid: SmsSid, fields: { status: SmsStatus }, - error + warning: "No message returned from the database update." }); + } + res.sendStatus(200); + } catch (error) { + logger.log("sms-status-update-error", "ERROR", "api", null, { + msid: SmsSid, + fields: { status: SmsStatus }, + error }); - res.sendStatus(200); + res.status(500).json({ error: "Failed to update message status." }); + } }; exports.markConversationRead = async (req, res) => { - const { conversationid, imexshopid } = req.body; + const { conversationid, imexshopid, bodyshopid } = req.body; const { ioRedis, ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom } } = req; - //Server side, mark the conversation as read + try { + // Mark messages in the conversation as read + const response = await client.request(queries.MARK_MESSAGES_AS_READ, { + conversationId: conversationid + }); - //TODO: Convert this to run on server side. Stolen from chat-conversation.container.jsx - // const [markConversationRead] = useMutation(MARK_MESSAGES_AS_READ_BY_CONVERSATION, { - // variables: { conversationId: selectedConversation }, - // refetchQueries: ["UNREAD_CONVERSATION_COUNT"], - // - // update(cache) { - // cache.modify({ - // id: cache.identify({ - // __typename: "conversations", - // id: selectedConversation - // }), - // fields: { - // messages_aggregate(cached) { - // return { aggregate: { count: 0 } }; - // } - // } - // }); - // } - // }); + const updatedMessages = response.update_messages.affected_rows; - const broadcastRoom = getBodyshopRoom(r2.insert_messages.returning[0].conversation.bodyshop.id); + logger.log("conversation-mark-read", "DEBUG", "api", null, { + conversationid, + imexshopid, + bodyshopid, + updatedMessages + }); - ioRedis.to(broadcastRoom).emit("conversation-changed", { - //type: "conversation-marked-unread" //TODO: Flush out what this looks like. - // isoutbound: true, - // conversationId: conversationid, - // updated_at: r2.insert_messages.returning[0].updated_at, - // msid: message.sid, - // summary: true - }); + const broadcastRoom = getBodyshopRoom(bodyshopid); - res.send(200); + const conversationRoom = getBodyshopConversationRoom({ + bodyshopId: bodyshopid, + conversationId: conversationid + }); + + ioRedis.to(broadcastRoom).emit("conversation-changed", { + type: "conversation-marked-read", + conversationId: conversationid + }); + + ioRedis.to(conversationRoom).emit("message-changed", { + type: "all-messages-marked-read", + conversationId: conversationid + }); + + res.status(200).json({ success: true, message: "Conversation marked as read." }); + } catch (error) { + logger.log("conversation-mark-read-error", "ERROR", "api", null, { + conversationid, + imexshopid, + error + }); + res.status(500).json({ error: "Failed to mark conversation as read." }); + } }; - -// Inbound Sample -// { -// "SmsSid": "SM5205ea340e06437799d9345e7283457c", -// "SmsStatus": "queued", -// "MessageStatus": "queued", -// "To": "+16049992002", -// "MessagingServiceSid": "MG6e259e2add04ffa0d0aa355038670ee1", -// "MessageSid": "SM5205ea340e06437799d9345e7283457c", -// "AccountSid": "AC6c09d337d6b9c68ab6488c2052bd457c", -// "From": "+16043301606", -// "ApiVersion": "2010-04-01" -// }