feature/IO-3000-messaging-sockets-migrations2 -

- A lot of a lot of testing....

Signed-off-by: Dave Richer <dave@imexsystems.ca>
This commit is contained in:
Dave Richer
2024-11-21 22:14:39 -08:00
parent 141deff41e
commit 6504b27eca
5 changed files with 376 additions and 249 deletions

View File

@@ -14,40 +14,46 @@ export const registerMessagingHandlers = ({ socket, client }) => {
const handleNewMessageSummary = async (message) => {
const { conversationId, newConversation, existingConversation, isoutbound } = message;
logLocal("handleNewMessageSummary", message);
logLocal("handleNewMessageSummary - Start", { message, isNew: !existingConversation });
const queryVariables = { offset: 0 };
// Utility function to enrich conversation data
const enrichConversation = (conversation, isOutbound) => ({
...conversation,
updated_at: conversation.updated_at || new Date().toISOString(),
unreadcnt: conversation.unreadcnt || 0,
archived: conversation.archived || false,
label: conversation.label || null,
job_conversations: conversation.job_conversations || [],
messages_aggregate: conversation.messages_aggregate || {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count: isOutbound ? 0 : 1
}
},
__typename: "conversations"
});
// Handle new conversation
if (!existingConversation && newConversation?.phone_num) {
logLocal("handleNewMessageSummary - New Conversation", newConversation);
try {
const queryResults = client.cache.readQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables
});
const enrichedConversation = {
...newConversation,
updated_at: newConversation.updated_at || new Date().toISOString(),
unreadcnt: newConversation.unreadcnt || 0,
archived: newConversation.archived || false,
label: newConversation.label || null,
job_conversations: newConversation.job_conversations || [],
messages_aggregate: newConversation.messages_aggregate || {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count: isoutbound ? 0 : 1
}
},
__typename: "conversations"
};
const enrichedConversation = enrichConversation(newConversation, isoutbound);
client.cache.writeQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables,
data: {
conversations: [enrichedConversation, ...(queryResults?.conversations || [])]
client.cache.modify({
id: "ROOT_QUERY",
fields: {
conversations(existingConversations = []) {
return [enrichedConversation, ...existingConversations];
}
}
});
} catch (error) {
@@ -60,13 +66,10 @@ export const registerMessagingHandlers = ({ socket, client }) => {
if (existingConversation) {
let conversationDetails;
// Fetch or read the conversation details
// Attempt to read existing conversation details from cache
try {
conversationDetails = client.cache.readFragment({
id: client.cache.identify({
__typename: "conversations",
id: conversationId
}),
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fragment: gql`
fragment ExistingConversation on conversations {
id
@@ -89,9 +92,10 @@ export const registerMessagingHandlers = ({ socket, client }) => {
`
});
} catch (error) {
console.warn("Conversation not found in cache, querying server...");
logLocal("handleNewMessageSummary - Cache miss for conversation, fetching from server", { conversationId });
}
// Fetch conversation details from server if not in cache
if (!conversationDetails) {
try {
const { data } = await client.query({
@@ -106,12 +110,14 @@ export const registerMessagingHandlers = ({ socket, client }) => {
}
}
// Validate that conversation details were retrieved
if (!conversationDetails) {
console.error("Unable to retrieve conversation details. Skipping cache update.");
return;
}
try {
// Check if the conversation is already in the cache
const queryResults = client.cache.readQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables
@@ -120,46 +126,32 @@ export const registerMessagingHandlers = ({ socket, client }) => {
const isAlreadyInCache = queryResults?.conversations.some((conv) => conv.id === conversationId);
if (!isAlreadyInCache) {
const enrichedConversation = {
...conversationDetails,
archived: false,
__typename: "conversations",
messages_aggregate: {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count:
conversationDetails.messages?.filter(
(message) => !message.read && !message.isoutbound // Count unread, inbound messages
).length || 0
}
}
};
const enrichedConversation = enrichConversation(conversationDetails, isoutbound);
client.cache.writeQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables,
data: {
conversations: [enrichedConversation, ...(queryResults?.conversations || [])]
client.cache.modify({
id: "ROOT_QUERY",
fields: {
conversations(existingConversations = []) {
return [enrichedConversation, ...existingConversations];
}
}
});
}
// Update existing conversation fields
// Update fields for the existing conversation in the cache
client.cache.modify({
id: client.cache.identify({
__typename: "conversations",
id: conversationId
}),
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
updated_at: () => new Date().toISOString(),
archived: () => false,
messages_aggregate(cached) {
messages_aggregate(cached = { aggregate: { count: 0 } }) {
const currentCount = cached.aggregate?.count || 0;
if (!isoutbound) {
return {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count: cached.aggregate.count + 1
count: currentCount + 1
}
};
}
@@ -176,80 +168,107 @@ export const registerMessagingHandlers = ({ socket, client }) => {
const handleNewMessageDetailed = (message) => {
const { conversationId, newMessage } = message;
logLocal("handleNewMessageDetailed", message);
logLocal("handleNewMessageDetailed - Start", message);
// Append the new message to the conversation's message list
const queryResults = client.cache.readQuery({
query: GET_CONVERSATION_DETAILS,
variables: { conversationId }
});
if (queryResults) {
client.cache.writeQuery({
try {
// Check if the conversation exists in the cache
const queryResults = client.cache.readQuery({
query: GET_CONVERSATION_DETAILS,
variables: { conversationId },
data: {
...queryResults,
conversations_by_pk: {
...queryResults.conversations_by_pk,
messages: [...queryResults.conversations_by_pk.messages, newMessage]
variables: { conversationId }
});
if (!queryResults?.conversations_by_pk) {
console.warn("Conversation not found in cache:", { conversationId });
return;
}
// Append the new message to the conversation's message list using cache.modify
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
messages(existingMessages = []) {
return [...existingMessages, newMessage];
}
}
});
logLocal("handleNewMessageDetailed - Message appended successfully", { conversationId, newMessage });
} catch (error) {
console.error("Error updating conversation messages in cache:", error);
}
};
const handleMessageChanged = (message) => {
if (!message) return;
if (!message) {
logLocal("handleMessageChanged - No message provided", message);
return;
}
logLocal("handleMessageChanged", message);
logLocal("handleMessageChanged - Start", message);
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: message.conversationid }),
fields: {
...(message.type === "status-changed" && {
messages(existing = [], { readField }) {
return existing.map((messageRef) => {
// Match the message by ID
try {
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: message.conversationid }),
fields: {
messages(existingMessages = [], { readField }) {
return existingMessages.map((messageRef) => {
// Check if this is the message to update
if (readField("id", messageRef) === message.id) {
const currentStatus = readField("status", messageRef);
// Prevent overwriting if the current status is already "delivered"
if (currentStatus === "delivered") {
return messageRef;
}
// Update the existing message fields
return client.cache.writeFragment({
id: messageRef.__ref,
fragment: gql`
fragment UpdatedMessage on messages {
id
status
conversationid
__typename
// Handle known types of message changes
switch (message.type) {
case "status-changed":
// Prevent overwriting if the current status is already "delivered"
if (currentStatus === "delivered") {
logLocal("handleMessageChanged - Status already delivered, skipping update", {
messageId: message.id
});
return messageRef;
}
`,
data: {
__typename: "messages",
...message // Only update the fields provided in the message object
}
});
// Update the status field
return {
...messageRef,
status: message.status
};
case "text-updated":
// Handle changes to the message text
return {
...messageRef,
text: message.text
};
// Add cases for other known message types as needed
default:
// Log a warning for unhandled message types
logLocal("handleMessageChanged - Unhandled message type", { type: message.type });
return messageRef;
}
}
return messageRef; // Keep other messages unchanged
});
}
})
}
});
}
});
logLocal("handleMessageChanged - Message updated successfully", { messageId: message.id, type: message.type });
} catch (error) {
console.error("handleMessageChanged - Error modifying cache:", error);
}
};
const handleConversationChanged = async (data) => {
if (!data) return;
if (!data) {
logLocal("handleConversationChanged - No data provided", data);
return;
}
const { conversationId, type, job_conversations, ...fields } = data;
logLocal("handleConversationChanged", data);
const { conversationId, type, job_conversations, messageIds, ...fields } = data;
logLocal("handleConversationChanged - Start", data);
const updatedAt = new Date().toISOString();
@@ -263,9 +282,7 @@ export const registerMessagingHandlers = ({ socket, client }) => {
const updatedList = existingList?.conversations
? [
newConversation,
...existingList.conversations.filter(
(conv) => conv.id !== newConversation.id // Prevent duplicates
)
...existingList.conversations.filter((conv) => conv.id !== newConversation.id) // Prevent duplicates
]
: [newConversation];
@@ -276,129 +293,149 @@ export const registerMessagingHandlers = ({ socket, client }) => {
conversations: updatedList
}
});
logLocal("handleConversationChanged - Conversation list updated successfully", newConversation);
} catch (error) {
console.error("Error updating conversation list in the cache:", error);
}
};
if (type === "conversation-created") {
updateConversationList({ ...fields, job_conversations, updated_at: updatedAt });
return;
}
// Handle specific types
try {
switch (type) {
case "conversation-marked-read":
if (conversationId && messageIds?.length > 0) {
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
messages(existingMessages = [], { readField }) {
return existingMessages.map((message) => {
if (messageIds.includes(readField("id", message))) {
return { ...message, read: true };
}
return message;
});
},
messages_aggregate: () => ({
__typename: "messages_aggregate",
aggregate: { __typename: "messages_aggregate_fields", count: 0 }
})
}
});
}
break;
const cacheId = client.cache.identify({
__typename: "conversations",
id: conversationId
});
case "conversation-created":
updateConversationList({ ...fields, job_conversations, updated_at: updatedAt });
break;
if (!cacheId) {
console.error(`Could not find conversation with id: ${conversationId}`);
return;
}
case "conversation-unarchived":
case "conversation-archived":
// Would like to someday figure out how to get this working without refetch queries,
// But I have but a solid 4 hours into it, and there are just too many weird occurrences
try {
const listQueryVariables = { offset: 0 };
const detailsQueryVariables = { conversationId };
if (type === "conversation-unarchived" || type === "conversation-archived") {
try {
const listQueryVariables = { offset: 0 };
const detailsQueryVariables = { conversationId };
// Refetch conversation list and details
await client.refetchQueries({
include: [CONVERSATION_LIST_QUERY, GET_CONVERSATION_DETAILS],
variables: [
{ query: CONVERSATION_LIST_QUERY, variables: listQueryVariables },
{ query: GET_CONVERSATION_DETAILS, variables: detailsQueryVariables }
]
});
// Refetch the conversation list and details queries
await client.refetchQueries({
include: [CONVERSATION_LIST_QUERY, GET_CONVERSATION_DETAILS],
variables: [
{ query: CONVERSATION_LIST_QUERY, variables: listQueryVariables },
{ query: GET_CONVERSATION_DETAILS, variables: detailsQueryVariables }
]
});
logLocal("handleConversationChanged - Refetched queries after state change", { conversationId, type });
} catch (error) {
console.error("Error refetching queries after conversation state change:", error);
}
break;
console.log("Refetched conversation list and details after state change.");
} catch (error) {
console.error("Error refetching queries after conversation state change:", error);
case "tag-added":
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
job_conversations: (existing = []) => [...existing, ...job_conversations]
}
});
break;
case "tag-removed":
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
job_conversations: (existing = [], { readField }) =>
existing.filter((jobRef) => readField("jobid", jobRef) !== fields.jobId)
}
});
break;
default:
logLocal("handleConversationChanged - Unhandled type", { type });
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
...Object.fromEntries(
Object.entries(fields).map(([key, value]) => [key, (cached) => (value !== undefined ? value : cached)])
)
}
});
}
return;
} catch (error) {
console.error("Error handling conversation changes:", { type, error });
}
// Handle other types of updates (e.g., marked read, tags added/removed)
client.cache.modify({
id: cacheId,
fields: {
...Object.fromEntries(
Object.entries(fields).map(([key, value]) => [key, (cached) => (value !== undefined ? value : cached)])
),
...(type === "conversation-marked-read" && {
messages_aggregate: () => ({
__typename: "messages_aggregate",
aggregate: { __typename: "messages_aggregate_fields", count: 0 }
})
}),
...(type === "tag-added" && {
job_conversations: (existing = []) => [...existing, ...job_conversations]
}),
...(type === "tag-removed" && {
job_conversations: (existing = [], { readField }) =>
existing.filter((jobRef) => readField("jobid", jobRef) !== data.jobId)
})
}
});
};
const handleNewMessage = ({ conversationId, message }) => {
if (!conversationId || !message?.id || !message?.text) {
logLocal("handleNewMessage - Missing conversationId or message details", { conversationId, message });
return;
}
logLocal("handleNewMessage", { conversationId, message });
logLocal("handleNewMessage - Start", { conversationId, message });
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
messages(existing = []) {
// Ensure that the `message` object matches the schema
const newMessageRef = client.cache.writeFragment({
data: {
__typename: "messages",
id: message.id,
body: message.text,
selectedMedia: message.image_path || [],
imexshopid: message.userid,
status: message.status,
created_at: message.created_at,
read: message.read
},
fragment: gql`
fragment NewMessage on messages {
id
body
selectedMedia
imexshopid
status
created_at
read
}
`
});
try {
// Add the new message to the cache
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
messages(existing = []) {
// Write the new message to the cache
const newMessageRef = client.cache.writeFragment({
data: {
__typename: "messages",
id: message.id,
text: message.text,
selectedMedia: message.image_path || [],
imexshopid: message.userid,
status: message.status,
created_at: message.created_at,
read: message.read
},
fragment: gql`
fragment NewMessage on messages {
id
text
selectedMedia
imexshopid
status
created_at
read
}
`
});
// Prevent duplicates by checking if the message already exists
const isDuplicate = existing.some(
(msgRef) =>
client.cache.readFragment({
id: msgRef.__ref,
fragment: gql`
fragment CheckMessage on messages {
id
}
`
})?.id === message.id
);
// We already have it, so return the existing list
if (isDuplicate) {
return existing;
// The merge function defined in the cache will handle deduplication
return [...existing, newMessageRef];
}
return [...existing, newMessageRef]; // Add the new message reference
}
}
});
});
logLocal("handleNewMessage - Message added to cache", { conversationId, messageId: message.id });
} catch (error) {
console.error("handleNewMessage - Error modifying cache:", error);
}
};
socket.on("new-message-summary", handleNewMessageSummary);

View File

@@ -1,4 +1,4 @@
import { useQuery } from "@apollo/client";
import { useApolloClient, useQuery } from "@apollo/client";
import axios from "axios";
import React, { useContext, useEffect, useState } from "react";
import { connect } from "react-redux";
@@ -17,19 +17,73 @@ const mapStateToProps = createStructuredSelector({
export default connect(mapStateToProps, null)(ChatConversationContainer);
export function ChatConversationContainer({ bodyshop, selectedConversation }) {
const client = useApolloClient();
const { socket } = useContext(SocketContext);
const [markingAsReadInProgress, setMarkingAsReadInProgress] = useState(false);
const {
loading: convoLoading,
error: convoError,
data: convoData
} = useQuery(GET_CONVERSATION_DETAILS, {
variables: { conversationId: selectedConversation },
fetchPolicy: "network-only",
nextFetchPolicy: "network-only"
fetchPolicy: "network-only"
});
const { socket } = useContext(SocketContext);
// Utility to update Apollo cache
const updateCacheWithReadMessages = (conversationId, messageIds) => {
if (!conversationId || !messageIds || messageIds.length === 0) return;
messageIds.forEach((messageId) => {
client.cache.modify({
id: client.cache.identify({ __typename: "messages", id: messageId }),
fields: {
read() {
return true; // Mark message as read
}
}
});
});
// Optionally update aggregate unread count
client.cache.modify({
id: client.cache.identify({ __typename: "conversations", id: conversationId }),
fields: {
messages_aggregate(existingAggregate) {
const updatedAggregate = {
...existingAggregate,
aggregate: {
...existingAggregate.aggregate,
count: 0 // No unread messages remaining
}
};
return updatedAggregate;
}
}
});
};
// Handle WebSocket events
useEffect(() => {
if (!socket || !socket.connected) return;
const handleConversationChange = (data) => {
if (data.type === "conversation-marked-read") {
const { conversationId, messageIds } = data;
console.log("Conversation change received:", data);
updateCacheWithReadMessages(conversationId, messageIds);
}
};
socket.on("conversation-changed", handleConversationChange);
return () => {
socket.off("conversation-changed", handleConversationChange);
};
}, [socket, client]);
// Handle joining/leaving conversation
useEffect(() => {
// Early gate, we have no socket, bail.
if (!socket || !socket.connected) return;
socket.emit("join-bodyshop-conversation", {
@@ -45,25 +99,41 @@ export function ChatConversationContainer({ bodyshop, selectedConversation }) {
};
}, [selectedConversation, bodyshop, socket]);
const [markingAsReadInProgress, setMarkingAsReadInProgress] = useState(false);
const unreadCount =
convoData &&
convoData.conversations_by_pk &&
convoData.conversations_by_pk.messages &&
convoData.conversations_by_pk.messages.reduce((acc, val) => {
return !val.read && !val.isoutbound ? acc + 1 : acc;
}, 0);
// Handle marking conversation as read
const handleMarkConversationAsRead = async () => {
if (unreadCount > 0 && !!selectedConversation && !markingAsReadInProgress) {
if (!convoData || !selectedConversation || markingAsReadInProgress) return;
const conversation = convoData.conversations_by_pk;
if (!conversation) {
console.warn(`No data found for conversation ID: ${selectedConversation}`);
return;
}
const unreadMessageIds = conversation.messages
?.filter((message) => !message.read && !message.isoutbound)
.map((message) => message.id);
if (unreadMessageIds?.length > 0) {
setMarkingAsReadInProgress(true);
await axios.post("/sms/markConversationRead", {
conversationid: selectedConversation,
imexshopid: bodyshop.imexshopid,
bodyshopid: bodyshop.id
});
setMarkingAsReadInProgress(false);
try {
const payload = {
conversation,
imexshopid: bodyshop?.imexshopid,
bodyshopid: bodyshop?.id
};
console.log("Marking conversation as read:", payload);
await axios.post("/sms/markConversationRead", payload);
// Update local cache
updateCacheWithReadMessages(selectedConversation, unreadMessageIds);
} catch (error) {
console.error("Error marking conversation as read:", error.response?.data || error.message);
} finally {
setMarkingAsReadInProgress(false);
}
}
};

View File

@@ -162,6 +162,22 @@ const cache = new InMemoryCache({
(incomingItem) => !existing.some((existingItem) => existingItem.__ref === incomingItem.__ref)
)
];
return merged;
}
},
messages: {
keyArgs: false, // Ignore arguments when determining uniqueness (like `order_by`).
merge(existing = [], incoming = [], { readField }) {
const existingIds = new Set(existing.map((message) => readField("id", message)));
// Merge incoming messages, avoiding duplicates
const merged = [...existing];
incoming.forEach((message) => {
if (!existingIds.has(readField("id", message))) {
merged.push(message);
}
});
return merged;
}
}

View File

@@ -2569,6 +2569,9 @@ exports.GET_JOBS_BY_PKS = `query GET_JOBS_BY_PKS($ids: [uuid!]!) {
exports.MARK_MESSAGES_AS_READ = `mutation MARK_MESSAGES_AS_READ($conversationId: uuid!) {
update_messages(where: { conversationid: { _eq: $conversationId } }, _set: { read: true }) {
returning {
id
}
affected_rows
}
}

View File

@@ -58,48 +58,49 @@ exports.status = async (req, res) => {
logger.log("sms-status-update-error", "ERROR", "api", null, {
msid: SmsSid,
fields: { status: SmsStatus },
error
stack: error.stack,
message: error.message
});
res.status(500).json({ error: "Failed to update message status." });
}
};
exports.markConversationRead = async (req, res) => {
const { conversationid, imexshopid, bodyshopid } = req.body;
const {
ioRedis,
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom }
} = req;
const { conversation, imexshopid, bodyshopid } = req.body;
// Alternatively, support both payload formats
const conversationId = conversation?.id || req.body.conversationId;
if (!conversationId || !imexshopid || !bodyshopid) {
return res.status(400).json({ error: "Invalid conversation data provided." });
}
try {
// Mark messages in the conversation as read
const response = await client.request(queries.MARK_MESSAGES_AS_READ, {
conversationId: conversationid
conversationId
});
const updatedMessages = response.update_messages.affected_rows;
logger.log("conversation-mark-read", "DEBUG", "api", null, {
conversationid,
imexshopid,
bodyshopid,
updatedMessages
});
const updatedMessageIds = response.update_messages.returning.map((message) => message.id);
const broadcastRoom = getBodyshopRoom(bodyshopid);
ioRedis.to(broadcastRoom).emit("conversation-changed", {
type: "conversation-marked-read",
conversationId: conversationid
conversationId,
affectedMessages: response.update_messages.affected_rows,
messageIds: updatedMessageIds
});
res.status(200).json({ success: true, message: "Conversation marked as read." });
} catch (error) {
logger.log("conversation-mark-read-error", "ERROR", "api", null, {
conversationid,
imexshopid,
error
res.status(200).json({
success: true,
message: "Conversation marked as read."
});
} catch (error) {
console.error("Error marking conversation as read:", error);
res.status(500).json({ error: "Failed to mark conversation as read." });
}
};