feature/IO-3000-messaging-sockets-migrations2 -

- testing and edge cases

Signed-off-by: Dave Richer <dave@imexsystems.ca>
This commit is contained in:
Dave Richer
2024-11-21 18:44:38 -08:00
parent 8229e3593c
commit 38f13346e5
5 changed files with 270 additions and 271 deletions

View File

@@ -12,61 +12,164 @@ const logLocal = (message, ...args) => {
export const registerMessagingHandlers = ({ socket, client }) => {
if (!(socket && client)) return;
const handleNewMessageSummary = (message) => {
const handleNewMessageSummary = async (message) => {
const { conversationId, newConversation, existingConversation, isoutbound } = message;
logLocal("handleNewMessageSummary", message);
if (!existingConversation && newConversation?.phone_num) {
const queryResults = client.cache.readQuery({
query: CONVERSATION_LIST_QUERY,
variables: { offset: 0 }
});
const queryVariables = { offset: 0 };
client.cache.writeQuery({
query: CONVERSATION_LIST_QUERY,
variables: { offset: 0 },
data: {
conversations: [
{
...newConversation,
updated_at: newConversation.updated_at || new Date().toISOString(),
unreadcnt: newConversation.unreadcnt || 0,
archived: newConversation.archived || false,
label: newConversation.label || null,
job_conversations: newConversation.job_conversations || [],
messages_aggregate: newConversation.messages_aggregate || {
aggregate: { count: isoutbound ? 0 : 1 }
}
},
...(queryResults?.conversations || [])
]
}
});
} else {
client.cache.modify({
id: client.cache.identify({
__typename: "conversations",
id: conversationId
}),
fields: {
updated_at: () => new Date().toISOString(),
archived(cached) {
// Unarchive the conversation if it was previously marked as archived
if (cached) {
return false;
// Handle new conversation
if (!existingConversation && newConversation?.phone_num) {
try {
const queryResults = client.cache.readQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables
});
const enrichedConversation = {
...newConversation,
updated_at: newConversation.updated_at || new Date().toISOString(),
unreadcnt: newConversation.unreadcnt || 0,
archived: newConversation.archived || false,
label: newConversation.label || null,
job_conversations: newConversation.job_conversations || [],
messages_aggregate: newConversation.messages_aggregate || {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count: isoutbound ? 0 : 1
}
return cached;
},
messages_aggregate(cached) {
// Increment unread count only if the message is inbound
if (!isoutbound) {
return { aggregate: { count: cached.aggregate.count + 1 } };
}
return cached;
__typename: "conversations"
};
client.cache.writeQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables,
data: {
conversations: [enrichedConversation, ...(queryResults?.conversations || [])]
}
});
} catch (error) {
console.error("Error updating cache for new conversation:", error);
}
return;
}
// Handle existing conversation
if (existingConversation) {
let conversationDetails;
// Fetch or read the conversation details
try {
conversationDetails = client.cache.readFragment({
id: client.cache.identify({
__typename: "conversations",
id: conversationId
}),
fragment: gql`
fragment ExistingConversation on conversations {
id
phone_num
updated_at
archived
label
unreadcnt
job_conversations {
jobid
conversationid
}
messages_aggregate {
aggregate {
count
}
}
__typename
}
`
});
} catch (error) {
console.warn("Conversation not found in cache, querying server...");
}
if (!conversationDetails) {
try {
const { data } = await client.query({
query: GET_CONVERSATION_DETAILS,
variables: { conversationId },
fetchPolicy: "network-only"
});
conversationDetails = data?.conversations_by_pk;
} catch (error) {
console.error("Failed to fetch conversation details from server:", error);
return;
}
});
}
if (!conversationDetails) {
console.error("Unable to retrieve conversation details. Skipping cache update.");
return;
}
try {
const queryResults = client.cache.readQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables
});
const isAlreadyInCache = queryResults?.conversations.some((conv) => conv.id === conversationId);
if (!isAlreadyInCache) {
const enrichedConversation = {
...conversationDetails,
archived: false,
__typename: "conversations",
messages_aggregate: {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count:
conversationDetails.messages?.filter(
(message) => !message.read && !message.isoutbound // Count unread, inbound messages
).length || 0
}
}
};
client.cache.writeQuery({
query: CONVERSATION_LIST_QUERY,
variables: queryVariables,
data: {
conversations: [enrichedConversation, ...(queryResults?.conversations || [])]
}
});
}
// Update existing conversation fields
client.cache.modify({
id: client.cache.identify({
__typename: "conversations",
id: conversationId
}),
fields: {
updated_at: () => new Date().toISOString(),
archived: () => false,
messages_aggregate(cached) {
if (!isoutbound) {
return {
__typename: "messages_aggregate",
aggregate: {
__typename: "messages_aggregate_fields",
count: cached.aggregate.count + 1
}
};
}
return cached;
}
}
});
} catch (error) {
console.error("Error updating cache for existing conversation:", error);
}
}
};
@@ -300,7 +403,7 @@ export const registerMessagingHandlers = ({ socket, client }) => {
};
const handleNewMessage = ({ conversationId, message }) => {
if (!conversationId || !message.id || !message.text) {
if (!conversationId || !message?.id || !message?.text) {
return;
}
@@ -359,9 +462,9 @@ export const registerMessagingHandlers = ({ socket, client }) => {
});
};
socket.on("new-message", handleNewMessage);
socket.on("new-message-summary", handleNewMessageSummary);
socket.on("new-message-detailed", handleNewMessageDetailed);
socket.on("new-message", handleNewMessage);
socket.on("message-changed", handleMessageChanged);
socket.on("conversation-changed", handleConversationChanged);
};

View File

@@ -19,12 +19,7 @@ const mapDispatchToProps = (dispatch) => ({
setSelectedConversation: (conversationId) => dispatch(setSelectedConversation(conversationId))
});
function ChatConversationListComponent({
conversationList,
selectedConversation,
setSelectedConversation,
loadMoreConversations
}) {
function ChatConversationListComponent({ conversationList, selectedConversation, setSelectedConversation }) {
const renderConversation = (index) => {
const item = conversationList[index];
const cardContentRight = <TimeAgoFormatter>{item.updated_at}</TimeAgoFormatter>;
@@ -69,13 +64,15 @@ function ChatConversationListComponent({
);
};
// TODO: Can go back into virtuoso for additional fetch
// endReached={loadMoreConversations} // Calls loadMoreConversations when scrolled to the bottom
return (
<div className="chat-list-container">
<Virtuoso
data={conversationList}
itemContent={(index) => renderConversation(index)}
style={{ height: "100%", width: "100%" }}
endReached={loadMoreConversations} // Calls loadMoreConversations when scrolled to the bottom
/>
</div>
);

View File

@@ -77,17 +77,6 @@ export function ChatPopupComponent({ chatVisible, selectedConversation, toggleCh
});
}, [chatVisible, getConversations]);
const loadMoreConversations = useCallback(() => {
if (data)
fetchMore({
variables: {
offset: data.conversations.length
}
}).catch((err) => {
console.error(`Error fetching more conversations: ${(err, err.message || "")}`);
});
}, [data, fetchMore]);
const unreadCount = unreadData?.messages_aggregate?.aggregate?.count || 0;
return (
@@ -114,10 +103,7 @@ export function ChatPopupComponent({ chatVisible, selectedConversation, toggleCh
{loading ? (
<LoadingSpinner />
) : (
<ChatConversationListComponent
conversationList={data ? data.conversations : []}
loadMoreConversations={loadMoreConversations}
/>
<ChatConversationListComponent conversationList={data ? data.conversations : []} />
)}
</Col>
<Col span={16}>{selectedConversation ? <ChatConversationContainer /> : null}</Col>

View File

@@ -1515,7 +1515,8 @@ exports.GET_JOB_BY_PK = `query GET_JOB_BY_PK($id: uuid!) {
}`;
//TODO:AIO The above query used to have parts order lines in it. Validate that this doesn't need it.
exports.QUERY_JOB_COSTING_DETAILS = ` query QUERY_JOB_COSTING_DETAILS($id: uuid!) {
exports.QUERY_JOB_COSTING_DETAILS = `
query QUERY_JOB_COSTING_DETAILS($id: uuid!) {
jobs_by_pk(id: $id) {
ro_number
clm_total
@@ -2566,68 +2567,9 @@ exports.GET_JOBS_BY_PKS = `query GET_JOBS_BY_PKS($ids: [uuid!]!) {
}
`;
exports.GET_CONVERSATIONS = `query GET_CONVERSATIONS($bodyshopId: uuid!) {
conversations(
where: { bodyshopid: { _eq: $bodyshopId }, archived: { _eq: false } },
order_by: { updated_at: desc },
limit: 50
) {
phone_num
id
updated_at
unreadcnt
archived
label
messages_aggregate(where: { read: { _eq: false }, isoutbound: { _eq: false } }) {
aggregate {
count
}
}
job_conversations {
job {
id
ro_number
ownr_fn
ownr_ln
ownr_co_nm
}
}
}
}
`;
exports.MARK_MESSAGES_AS_READ = `mutation MARK_MESSAGES_AS_READ($conversationId: uuid!) {
update_messages(where: { conversationid: { _eq: $conversationId } }, _set: { read: true }) {
affected_rows
}
}
`;
exports.GET_CONVERSATION_DETAILS = `
query GET_CONVERSATION_DETAILS($conversationId: uuid!) {
conversation: conversations_by_pk(id: $conversationId) {
id
phone_num
updated_at
label
job_conversations {
job {
id
ro_number
ownr_fn
ownr_ln
ownr_co_nm
}
}
}
messages: messages(where: { conversationid: { _eq: $conversationId } }, order_by: { created_at: asc }) {
id
text
created_at
read
isoutbound
userid
image_path
}
}
`;

View File

@@ -16,23 +16,21 @@ exports.receive = async (req, res) => {
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom }
} = req;
logger.log("sms-inbound", "DEBUG", "api", null, {
const loggerData = {
msid: req.body.SmsMessageSid,
text: req.body.Body,
image: !!req.body.MediaUrl0,
image_path: generateMediaArray(req.body)
});
};
logger.log("sms-inbound", "DEBUG", "api", null, loggerData);
if (!req.body || !req.body.MessagingServiceSid || !req.body.SmsMessageSid) {
logger.log("sms-inbound-error", "ERROR", "api", null, {
msid: req.body.SmsMessageSid,
text: req.body.Body,
image: !!req.body.MediaUrl0,
image_path: generateMediaArray(req.body),
...loggerData,
type: "malformed-request"
});
res.status(400).json({ success: false, error: "Malformed Request" });
return;
return res.status(400).json({ success: false, error: "Malformed Request" });
}
try {
@@ -41,6 +39,14 @@ exports.receive = async (req, res) => {
phone: phone(req.body.From).phoneNumber
});
if (!response.bodyshops[0]) {
return res.status(400).json({ success: false, error: "No matching bodyshop" });
}
const bodyshop = response.bodyshops[0];
const isNewConversation = bodyshop.conversations.length === 0;
const isDuplicate = bodyshop.conversations.length > 1;
let newMessage = {
msid: req.body.SmsMessageSid,
text: req.body.Body,
@@ -48,140 +54,105 @@ exports.receive = async (req, res) => {
image_path: generateMediaArray(req.body)
};
if (response.bodyshops[0]) {
const bodyshop = response.bodyshops[0];
if (bodyshop.conversations.length === 0) {
newMessage.conversation = {
data: {
bodyshopid: bodyshop.id,
phone_num: phone(req.body.From).phoneNumber,
archived: false
}
};
try {
const insertresp = await client.request(queries.RECEIVE_MESSAGE, { msg: newMessage });
const createdConversation = insertresp?.insert_messages?.returning?.[0]?.conversation || null;
const message = insertresp?.insert_messages?.returning?.[0];
if (!createdConversation) {
throw new Error("Conversation data is missing from the response.");
}
const broadcastRoom = getBodyshopRoom(createdConversation.bodyshop.id);
const conversationRoom = getBodyshopConversationRoom({
bodyshopId: message.conversation.bodyshop.id,
conversationId: message.conversation.id
});
ioRedis.to(broadcastRoom).emit("new-message-summary", {
isoutbound: false,
existingConversation: false,
newConversation: createdConversation,
conversationId: createdConversation.id,
updated_at: message.updated_at,
msid: message.sid,
summary: true
});
ioRedis.to(conversationRoom).emit("new-message-detailed", {
newMessage: message,
isoutbound: false,
newConversation: createdConversation,
existingConversation: false,
conversationId: createdConversation.id,
summary: false
});
logger.log("sms-inbound-success", "DEBUG", "api", null, {
newMessage,
createdConversation
});
res.status(200).send("");
return;
} catch (e) {
handleError(req, e, res, "RECEIVE_MESSAGE");
return;
}
} else if (bodyshop.conversations.length === 1) {
newMessage.conversationid = bodyshop.conversations[0].id;
} else {
logger.log("sms-inbound-error", "ERROR", "api", null, {
msid: req.body.SmsMessageSid,
text: req.body.Body,
image: !!req.body.MediaUrl0,
image_path: generateMediaArray(req.body),
messagingServiceSid: req.body.MessagingServiceSid,
type: "duplicate-phone"
});
res.status(400).json({ success: false, error: "Duplicate phone number" });
return;
}
try {
const insertresp = await client.request(queries.INSERT_MESSAGE, {
msg: newMessage,
conversationid: newMessage.conversationid
});
const message = insertresp.insert_messages.returning[0];
const data = {
type: "messaging-inbound",
conversationid: message.conversationid || "",
text: message.text || "",
messageid: message.id || "",
phone_num: message.conversation.phone_num || ""
};
const fcmresp = await admin.messaging().send({
topic: `${message.conversation.bodyshop.imexshopid}-messaging`,
notification: {
title: InstanceManager({
imex: `ImEX Online Message - ${data.phone_num}`,
rome: `Rome Online Message - ${data.phone_num}`,
promanager: `ProManager Message - ${data.phone_num}`
}),
body: message.image_path ? `Image ${message.text}` : message.text
},
data
});
logger.log("sms-inbound-success", "DEBUG", "api", null, {
newMessage,
fcmresp
});
const broadcastRoom = getBodyshopRoom(message.conversation.bodyshop.id);
const conversationRoom = getBodyshopConversationRoom({
bodyshopId: message.conversation.bodyshop.id,
conversationId: message.conversation.id
});
ioRedis.to(broadcastRoom).emit("new-message-summary", {
isoutbound: false,
existingConversation: true,
conversationId: message.conversationid,
updated_at: message.updated_at,
msid: message.sid,
summary: true
});
ioRedis.to(conversationRoom).emit("new-message-detailed", {
newMessage: message,
isoutbound: false,
existingConversation: true,
conversationId: message.conversationid,
summary: false
});
res.status(200).send("");
} catch (e) {
handleError(req, e, res, "INSERT_MESSAGE");
}
if (isDuplicate) {
logger.log("sms-inbound-error", "ERROR", "api", null, {
...loggerData,
messagingServiceSid: req.body.MessagingServiceSid,
type: "duplicate-phone"
});
return res.status(400).json({ success: false, error: "Duplicate phone number" });
}
if (isNewConversation) {
newMessage.conversation = {
data: {
bodyshopid: bodyshop.id,
phone_num: phone(req.body.From).phoneNumber,
archived: false
}
};
} else {
const existingConversation = bodyshop.conversations[0];
// Update the conversation to unarchive it
if (existingConversation.archived) {
await client.request(queries.UNARCHIVE_CONVERSATION, {
id: existingConversation.id,
archived: false
});
}
newMessage.conversationid = existingConversation.id;
}
const query = isNewConversation ? queries.RECEIVE_MESSAGE : queries.INSERT_MESSAGE;
const variables = isNewConversation
? { msg: newMessage }
: { msg: newMessage, conversationid: newMessage.conversationid };
const insertresp = await client.request(query, variables);
const message = insertresp?.insert_messages?.returning?.[0];
const conversation = message?.conversation || null;
if (!conversation) {
throw new Error("Conversation data is missing from the response.");
}
const broadcastRoom = getBodyshopRoom(conversation.bodyshop.id);
const conversationRoom = getBodyshopConversationRoom({
bodyshopId: conversation.bodyshop.id,
conversationId: conversation.id
});
const commonPayload = {
isoutbound: false,
conversationId: conversation.id,
updated_at: message.updated_at,
msid: message.sid
};
ioRedis.to(broadcastRoom).emit("new-message-summary", {
...commonPayload,
existingConversation: !isNewConversation,
newConversation: isNewConversation ? conversation : null,
summary: true
});
ioRedis.to(conversationRoom).emit("new-message-detailed", {
newMessage: message,
...commonPayload,
newConversation: isNewConversation ? conversation : null,
existingConversation: !isNewConversation,
summary: false
});
const fcmresp = await admin.messaging().send({
topic: `${message.conversation.bodyshop.imexshopid}-messaging`,
notification: {
title: InstanceManager({
imex: `ImEX Online Message - ${message.conversation.phone_num}`,
rome: `Rome Online Message - ${message.conversation.phone_num}`,
promanager: `ProManager Message - ${message.conversation.phone_num}`
}),
body: message.image_path ? `Image ${message.text}` : message.text
},
data: {
type: "messaging-inbound",
conversationid: message.conversationid || "",
text: message.text || "",
messageid: message.id || "",
phone_num: message.conversation.phone_num || ""
}
});
logger.log("sms-inbound-success", "DEBUG", "api", null, {
newMessage,
fcmresp
});
res.status(200).send("");
} catch (e) {
handleError(req, e, res, "FIND_BODYSHOP_BY_MESSAGING_SERVICE_SID");
handleError(req, e, res, "RECEIVE_MESSAGE");
}
};