feature/IO-3000-Migrate-MSG-to-Sockets - Major Progress

Signed-off-by: Dave Richer <dave@imexsystems.ca>
This commit is contained in:
Dave Richer
2024-11-14 21:15:59 -08:00
parent e9e1e820a7
commit 1309d8ff65
15 changed files with 337 additions and 109 deletions

View File

@@ -12,6 +12,8 @@ export default function ChatConversationComponent({ subState, conversation, mess
if (loading) return <LoadingSkeleton />;
if (error) return <AlertComponent message={error.message} type="error" />;
console.dir(conversation);
return (
<div
className="chat-conversation"

View File

@@ -1,81 +1,81 @@
import { useMutation, useQuery, useSubscription } from "@apollo/client";
import React, { useState } from "react";
import React, { useContext, useEffect, useState } from "react";
import { connect } from "react-redux";
import { createStructuredSelector } from "reselect";
import { CONVERSATION_SUBSCRIPTION_BY_PK, GET_CONVERSATION_DETAILS } from "../../graphql/conversations.queries";
import { MARK_MESSAGES_AS_READ_BY_CONVERSATION } from "../../graphql/messages.queries";
import { selectSelectedConversation } from "../../redux/messaging/messaging.selectors";
import { selectBodyshop } from "../../redux/user/user.selectors";
import ChatConversationComponent from "./chat-conversation.component";
import axios from "axios";
import { selectBodyshop } from "../../redux/user/user.selectors";
import SocketContext from "../../contexts/SocketIO/socketContext.jsx";
const mapStateToProps = createStructuredSelector({
selectedConversation: selectSelectedConversation,
bodyshop: selectBodyshop
});
export default connect(mapStateToProps, null)(ChatConversationContainer);
export function ChatConversationContainer({ bodyshop, selectedConversation }) {
const {
loading: convoLoading,
error: convoError,
data: convoData
} = useQuery(GET_CONVERSATION_DETAILS, {
variables: { conversationId: selectedConversation },
fetchPolicy: "network-only",
nextFetchPolicy: "network-only"
});
const { loading, error, data } = useSubscription(CONVERSATION_SUBSCRIPTION_BY_PK, {
variables: { conversationId: selectedConversation }
});
const { socket, clientId } = useContext(SocketContext);
const [conversationDetails, setConversationDetails] = useState({});
const [messages, setMessages] = useState([]);
const [markingAsReadInProgress, setMarkingAsReadInProgress] = useState(false);
const [loading, setLoading] = useState(false);
const [error, setError] = useState(null);
const [markConversationRead] = useMutation(MARK_MESSAGES_AS_READ_BY_CONVERSATION, {
variables: { conversationId: selectedConversation },
refetchQueries: ["UNREAD_CONVERSATION_COUNT"],
update(cache) {
cache.modify({
id: cache.identify({
__typename: "conversations",
id: selectedConversation
}),
fields: {
messages_aggregate(cached) {
return { aggregate: { count: 0 } };
}
}
// Fetch conversation details and messages when a conversation is selected
useEffect(() => {
if (socket && selectedConversation) {
setLoading(true);
socket.emit("join-conversation", selectedConversation);
socket.on("conversation-details", (data) => {
setConversationDetails(data.conversation);
console.log("HIT HIT HIT");
console.dir(data);
setMessages(data.messages);
setLoading(false);
});
socket.on("new-message", (message) => {
setMessages((prevMessages) => [...prevMessages, message]);
});
return () => {
socket.emit("leave-conversation", selectedConversation);
socket.off("conversation-details");
socket.off("new-message");
};
}
});
const unreadCount =
data &&
data.messages &&
data.messages.reduce((acc, val) => {
return !val.read && !val.isoutbound ? acc + 1 : acc;
}, 0);
}, [socket, selectedConversation]);
// Mark messages as read
const handleMarkConversationAsRead = async () => {
if (unreadCount > 0 && !!selectedConversation && !markingAsReadInProgress) {
if (messages.some((msg) => !msg.read) && !markingAsReadInProgress) {
setMarkingAsReadInProgress(true);
await markConversationRead({});
// Emit a WebSocket event to mark messages as read
socket.emit("mark-as-read", { conversationId: selectedConversation });
// Fallback to an API call to update the read status in the database
await axios.post("/sms/markConversationRead", {
conversationid: selectedConversation,
imexshopid: bodyshop.imexshopid
});
setMarkingAsReadInProgress(false);
}
};
if (!messages || !conversationDetails || !messages.length) {
return null;
}
return (
<ChatConversationComponent
subState={[loading || convoLoading, error || convoError]}
conversation={convoData ? convoData.conversations_by_pk : {}}
messages={data ? data.messages : []}
subState={[loading, error]}
conversation={conversationDetails}
messages={messages}
handleMarkConversationAsRead={handleMarkConversationAsRead}
/>
);
}
export default connect(mapStateToProps, null)(ChatConversationContainer);

View File

@@ -1,73 +1,59 @@
import { InfoCircleOutlined, MessageOutlined, ShrinkOutlined, SyncOutlined } from "@ant-design/icons";
import { useLazyQuery, useQuery } from "@apollo/client";
import { Badge, Card, Col, Row, Space, Tag, Tooltip, Typography } from "antd";
import React, { useCallback, useEffect, useState } from "react";
import { InfoCircleOutlined, MessageOutlined, ShrinkOutlined } from "@ant-design/icons";
import { Badge, Card, Col, Row, Space, Tooltip, Typography } from "antd";
import React, { useCallback, useContext, useEffect } from "react";
import { useTranslation } from "react-i18next";
import { connect } from "react-redux";
import { createStructuredSelector } from "reselect";
import { CONVERSATION_LIST_QUERY, UNREAD_CONVERSATION_COUNT } from "../../graphql/conversations.queries";
import { toggleChatVisible } from "../../redux/messaging/messaging.actions";
import { selectChatVisible, selectSelectedConversation } from "../../redux/messaging/messaging.selectors";
import {
selectChatVisible,
selectConversations,
selectSelectedConversation,
selectUnreadCount
} from "../../redux/messaging/messaging.selectors";
import ChatConversationListComponent from "../chat-conversation-list/chat-conversation-list.component";
import ChatConversationContainer from "../chat-conversation/chat-conversation.container";
import ChatNewConversation from "../chat-new-conversation/chat-new-conversation.component";
import LoadingSpinner from "../loading-spinner/loading-spinner.component";
import "./chat-popup.styles.scss";
import { selectBodyshop } from "../../redux/user/user.selectors";
import SocketContext from "../../contexts/SocketIO/socketContext";
const mapStateToProps = createStructuredSelector({
selectedConversation: selectSelectedConversation,
chatVisible: selectChatVisible
chatVisible: selectChatVisible,
bodyshop: selectBodyshop,
conversations: selectConversations,
unreadCount: selectUnreadCount
});
const mapDispatchToProps = (dispatch) => ({
toggleChatVisible: () => dispatch(toggleChatVisible())
});
export function ChatPopupComponent({ chatVisible, selectedConversation, toggleChatVisible }) {
export function ChatPopupComponent({
chatVisible,
selectedConversation,
toggleChatVisible,
bodyshop,
conversations,
unreadCount
}) {
const { t } = useTranslation();
const [pollInterval, setpollInterval] = useState(0);
const { data: unreadData } = useQuery(UNREAD_CONVERSATION_COUNT, {
fetchPolicy: "network-only",
nextFetchPolicy: "network-only",
...(pollInterval > 0 ? { pollInterval } : {})
});
const [getConversations, { loading, data, refetch, fetchMore }] = useLazyQuery(CONVERSATION_LIST_QUERY, {
fetchPolicy: "network-only",
nextFetchPolicy: "network-only",
skip: !chatVisible,
...(pollInterval > 0 ? { pollInterval } : {})
});
const fcmToken = sessionStorage.getItem("fcmtoken");
const { socket, clientId } = useContext(SocketContext);
// Emit event to open messaging when chat becomes visible
useEffect(() => {
if (fcmToken) {
setpollInterval(0);
} else {
setpollInterval(90000);
if (chatVisible && socket && bodyshop?.id) {
socket.emit("open-messaging", bodyshop.id);
}
}, [fcmToken]);
useEffect(() => {
if (chatVisible)
getConversations({
variables: {
offset: 0
}
});
}, [chatVisible, getConversations]);
}, [chatVisible, socket, bodyshop?.id]);
// Handle loading more conversations
const loadMoreConversations = useCallback(() => {
if (data)
fetchMore({
variables: {
offset: data.conversations.length
}
});
}, [data, fetchMore]);
const unreadCount = unreadData?.messages_aggregate.aggregate.count || 0;
if (socket) {
socket.emit("load-more-conversations", { offset: conversations.length });
}
}, [socket, conversations.length]);
return (
<Badge count={unreadCount}>
@@ -80,21 +66,19 @@ export function ChatPopupComponent({ chatVisible, selectedConversation, toggleCh
<Tooltip title={t("messaging.labels.recentonly")}>
<InfoCircleOutlined />
</Tooltip>
<SyncOutlined style={{ cursor: "pointer" }} onClick={() => refetch()} />
{pollInterval > 0 && <Tag color="yellow">{t("messaging.labels.nopush")}</Tag>}
<ShrinkOutlined
onClick={() => toggleChatVisible()}
style={{ position: "absolute", right: ".5rem", top: ".5rem" }}
/>
</Space>
<ShrinkOutlined
onClick={() => toggleChatVisible()}
style={{ position: "absolute", right: ".5rem", top: ".5rem" }}
/>
<Row gutter={[8, 8]} className="chat-popup-content">
<Col span={8}>
{loading ? (
{conversations && conversations.length === 0 ? (
<LoadingSpinner />
) : (
<ChatConversationListComponent
conversationList={data ? data.conversations : []}
conversationList={conversations.conversations}
loadMoreConversations={loadMoreConversations}
/>
)}

View File

@@ -3,10 +3,12 @@ import SocketIO from "socket.io-client";
import { auth } from "../../firebase/firebase.utils";
import { store } from "../../redux/store";
import { addAlerts, setWssStatus } from "../../redux/application/application.actions";
import { useDispatch } from "react-redux";
const useSocket = (bodyshop) => {
const socketRef = useRef(null);
const [clientId, setClientId] = useState(null);
const dispatch = useDispatch();
useEffect(() => {
const unsubscribe = auth.onIdTokenChanged(async (user) => {
@@ -66,6 +68,21 @@ const useSocket = (bodyshop) => {
store.dispatch(setWssStatus("disconnected"));
};
const handleMessagingList = (data) => {
dispatch({ type: "SET_CONVERSATIONS", payload: data.conversations });
};
const handleNewMessage = (message) => {
dispatch({ type: "ADD_MESSAGE", payload: message });
};
const handleReadUpdated = ({ conversationId }) => {
dispatch({ type: "UPDATE_UNREAD_COUNT", payload: conversationId });
};
socketInstance.on("messaging-list", handleMessagingList);
socketInstance.on("new-message", handleNewMessage);
socketInstance.on("read-updated", handleReadUpdated);
socketInstance.on("connect", handleConnect);
socketInstance.on("reconnect", handleReconnect);
socketInstance.on("connect_error", handleConnectionError);
@@ -89,7 +106,7 @@ const useSocket = (bodyshop) => {
socketRef.current = null;
}
};
}, [bodyshop]);
}, [bodyshop, dispatch]);
return { socket: socketRef.current, clientId };
};

View File

@@ -114,3 +114,22 @@ export const UPDATE_CONVERSATION_LABEL = gql`
}
}
`;
export const GET_CONVERSATION_MESSAGES = gql`
query GET_CONVERSATION_MESSAGES($conversationId: uuid!) {
conversation: conversations_by_pk(id: $conversationId) {
id
phone_num
updated_at
label
}
messages: messages(where: { conversationid: { _eq: $conversationId } }, order_by: { created_at: asc }) {
id
text
created_at
read
isoutbound
userid
}
}
`;

View File

@@ -647,7 +647,7 @@ export function Manage({ conflict, bodyshop, alerts, setAlerts }) {
return (
<>
{import.meta.env.PROD && <ChatAffixContainer bodyshop={bodyshop} chatVisible={chatVisible} />}
{true && <ChatAffixContainer bodyshop={bodyshop} chatVisible={chatVisible} />}
<Layout style={{ minHeight: "100vh" }} className="layout-container">
<UpdateAlert />
<HeaderContainer />

View File

@@ -2,9 +2,9 @@ import MessagingActionTypes from "./messaging.types";
export const toggleChatVisible = () => ({
type: MessagingActionTypes.TOGGLE_CHAT_VISIBLE
//payload: user
});
// Action to handle when a message is sent via WebSocket
export const sendMessage = (message) => ({
type: MessagingActionTypes.SEND_MESSAGE,
payload: message
@@ -19,17 +19,39 @@ export const sendMessageFailure = (error) => ({
type: MessagingActionTypes.SEND_MESSAGE_FAILURE,
payload: error
});
// Set the selected conversation by ID
export const setSelectedConversation = (conversationId) => ({
type: MessagingActionTypes.SET_SELECTED_CONVERSATION,
payload: conversationId
});
// Open chat by phone number (if theres a need to search for a conversation)
export const openChatByPhone = (phoneNumber) => ({
type: MessagingActionTypes.OPEN_CHAT_BY_PHONE,
payload: phoneNumber
});
// Set an individual message (e.g., from a new message event)
export const setMessage = (message) => ({
type: MessagingActionTypes.SET_MESSAGE,
payload: message
});
// Set the list of conversations received from WebSocket
export const setConversations = (conversations) => ({
type: MessagingActionTypes.SET_CONVERSATIONS,
payload: conversations
});
// Add a message to the conversation messages
export const addMessage = (message) => ({
type: MessagingActionTypes.ADD_MESSAGE,
payload: message
});
// Update unread count for a conversation (e.g., after marking messages as read)
export const updateUnreadCount = (conversationId) => ({
type: MessagingActionTypes.UPDATE_UNREAD_COUNT,
payload: conversationId
});

View File

@@ -6,6 +6,9 @@ const INITIAL_STATE = {
isSending: false,
error: null,
message: null,
conversations: [], // Holds the list of conversations
messages: [], // Holds the list of messages for the selected conversation
unreadCount: 0,
searchingForConversation: false
};
@@ -13,40 +16,76 @@ const messagingReducer = (state = INITIAL_STATE, action) => {
switch (action.type) {
case MessagingActionTypes.SET_MESSAGE:
return { ...state, message: action.payload };
case MessagingActionTypes.TOGGLE_CHAT_VISIBLE:
return {
...state,
open: !state.open
};
case MessagingActionTypes.OPEN_CHAT_BY_PHONE:
return {
...state,
searchingForConversation: true
};
case MessagingActionTypes.SET_SELECTED_CONVERSATION:
return {
...state,
open: true,
searchingForConversation: false,
selectedConversationId: action.payload
selectedConversationId: action.payload,
messages: [] // Reset messages when a new conversation is selected
};
case MessagingActionTypes.SEND_MESSAGE:
return {
...state,
error: null,
isSending: true
};
case MessagingActionTypes.SEND_MESSAGE_SUCCESS:
return {
...state,
message: "",
isSending: false
};
case MessagingActionTypes.SEND_MESSAGE_FAILURE:
return {
...state,
error: action.payload
error: action.payload,
isSending: false
};
case MessagingActionTypes.SET_CONVERSATIONS:
return {
...state,
conversations: action.payload
};
case MessagingActionTypes.ADD_MESSAGE:
return {
...state,
messages: [...state.messages, action.payload]
};
case MessagingActionTypes.UPDATE_UNREAD_COUNT:
console.log("SKL");
console.dir({ action, state });
return {
...state,
conversations: Array.isArray(state.conversations)
? state.conversations.map((conversation) =>
conversation.id === action.payload
? { ...conversation, unreadCount: 0 } // Reset unread count for the selected conversation
: conversation
)
: state.conversations,
unreadCount: Math.max(state.unreadCount - 1, 0) // Ensure unreadCount does not go below zero
};
default:
return state;
}

View File

@@ -19,3 +19,8 @@ export const searchingForConversation = createSelector(
[selectMessaging],
(messaging) => messaging.searchingForConversation
);
// New selectors for conversations and unread count
export const selectConversations = createSelector([selectMessaging], (messaging) => messaging.conversations);
export const selectUnreadCount = createSelector([selectMessaging], (messaging) => messaging.unreadCount);

View File

@@ -5,6 +5,9 @@ const MessagingActionTypes = {
SEND_MESSAGE_FAILURE: "SEND_MESSAGE_FAILURE",
SET_SELECTED_CONVERSATION: "SET_SELECTED_CONVERSATION",
OPEN_CHAT_BY_PHONE: "OPEN_CHAT_BY_PHONE",
SET_MESSAGE: "SET_MESSAGE"
SET_MESSAGE: "SET_MESSAGE",
ADD_MESSAGE: "ADD_MESSAGE",
UPDATE_UNREAD_COUNT: "UPDATE_UNREAD_COUNT",
SET_CONVERSATIONS: "SET_CONVERSATIONS"
};
export default MessagingActionTypes;

View File

@@ -2544,3 +2544,69 @@ exports.GET_JOBS_BY_PKS = `query GET_JOBS_BY_PKS($ids: [uuid!]!) {
}
}
`;
exports.GET_CONVERSATIONS = `query GET_CONVERSATIONS($bodyshopId: uuid!) {
conversations(
where: { bodyshopid: { _eq: $bodyshopId }, archived: { _eq: false } },
order_by: { updated_at: desc },
limit: 50
) {
phone_num
id
updated_at
unreadcnt
archived
label
messages_aggregate(where: { read: { _eq: false }, isoutbound: { _eq: false } }) {
aggregate {
count
}
}
job_conversations {
job {
id
ro_number
ownr_fn
ownr_ln
ownr_co_nm
}
}
}
}
`;
exports.MARK_MESSAGES_AS_READ = `mutation MARK_MESSAGES_AS_READ($conversationId: uuid!) {
update_messages(where: { conversationid: { _eq: $conversationId } }, _set: { read: true }) {
affected_rows
}
}
`;
exports.GET_CONVERSATION_DETAILS = `
query GET_CONVERSATION_DETAILS($conversationId: uuid!) {
conversation: conversations_by_pk(id: $conversationId) {
id
phone_num
updated_at
label
job_conversations {
job {
id
ro_number
ownr_fn
ownr_ln
ownr_co_nm
}
}
}
messages: messages(where: { conversationid: { _eq: $conversationId } }, order_by: { created_at: asc }) {
id
text
created_at
read
isoutbound
userid
image_path
}
}
`;

View File

@@ -12,6 +12,7 @@ const InstanceManager = require("../utils/instanceMgr").default;
exports.receive = async (req, res) => {
//Perform request validation
const { ioRedis } = req;
logger.log("sms-inbound", "DEBUG", "api", null, {
msid: req.body.SmsMessageSid,
@@ -108,6 +109,11 @@ exports.receive = async (req, res) => {
newMessage,
fcmresp
});
// Broadcast new message to the conversation room
const room = `conversation-${newMessage.conversationid}`;
ioRedis.to(room).emit("new-message", newMessage);
res.status(200).send("");
} catch (e2) {
logger.log("sms-inbound-error", "ERROR", "api", null, {

View File

@@ -14,6 +14,7 @@ const gqlClient = require("../graphql-client/graphql-client").client;
exports.send = (req, res) => {
const { to, messagingServiceSid, body, conversationid, selectedMedia, imexshopid } = req.body;
const { ioRedis } = req;
logger.log("sms-outbound", "DEBUG", req.user.email, null, {
messagingServiceSid: messagingServiceSid,
@@ -59,6 +60,13 @@ exports.send = (req, res) => {
conversationid: newMessage.conversationid || ""
};
// TODO Verify
// const messageData = response.insert_messages.returning[0];
// Broadcast new message to conversation room
const room = `conversation-${conversationid}`;
ioRedis.to(room).emit("new-message", newMessage);
admin.messaging().send({
topic: `${imexshopid}-messaging`,
data

View File

@@ -11,6 +11,7 @@ const { admin } = require("../firebase/firebase-handler");
exports.status = (req, res) => {
const { SmsSid, SmsStatus } = req.body;
const { ioRedis } = req;
client
.request(queries.UPDATE_MESSAGE_STATUS, {
msid: SmsSid,
@@ -21,6 +22,12 @@ exports.status = (req, res) => {
msid: SmsSid,
fields: { status: SmsStatus }
});
// TODO Verify
const conversationId = response.update_messages.returning[0].conversationid;
ioRedis.to(`conversation-${conversationId}`).emit("message-status-updated", {
messageId: SmsSid,
status: SmsStatus
});
})
.catch((error) => {
logger.log("sms-status-update-error", "ERROR", "api", null, {

View File

@@ -1,4 +1,6 @@
const { admin } = require("../firebase/firebase-handler");
const { MARK_MESSAGES_AS_READ, GET_CONVERSATIONS, GET_CONVERSATION_DETAILS } = require("../graphql-client/queries");
const client = require("../graphql-client/graphql-client").client;
const redisSocketEvents = ({
io,
@@ -113,6 +115,7 @@ const redisSocketEvents = ({
socket.on("leave-bodyshop-room", leaveBodyshopRoom);
socket.on("broadcast-to-bodyshop", broadcastToBodyshopRoom);
};
// Disconnect Events
const registerDisconnectEvents = (socket) => {
const disconnect = () => {
@@ -129,10 +132,57 @@ const redisSocketEvents = ({
socket.on("disconnect", disconnect);
};
// Messaging Events
const registerMessagingEvents = (socket) => {
const broadcastNewMessage = async (message) => {
const room = `conversation-${message.conversationId}`;
io.to(room).emit("new-message", message);
};
const openMessaging = async (bodyshopUUID) => {
try {
const conversations = await client.request(GET_CONVERSATIONS, { bodyshopId: bodyshopUUID });
socket.emit("messaging-list", { conversations });
} catch (error) {
console.dir(error);
logger.log("error", "Failed to fetch conversations", error);
socket.emit("error", { message: "Failed to fetch conversations" });
}
};
const joinConversation = async (conversationId) => {
try {
const room = `conversation-${conversationId}`;
socket.join(room);
// Fetch conversation details and messages
const data = await client.request(GET_CONVERSATION_DETAILS, { conversationId });
socket.emit("conversation-details", data); // Send data to the client
} catch (error) {
logger.log("error", "Failed to join conversation", error);
socket.emit("error", { message: "Failed to join conversation" });
}
};
const markAsRead = async ({ conversationId, userId }) => {
try {
await client.request(MARK_MESSAGES_AS_READ, { conversationId, userId });
io.to(`conversation-${conversationId}`).emit("read-updated", { conversationId });
} catch (error) {
logger.log("error", "Failed to mark messages as read", error);
socket.emit("error", { message: "Failed to mark messages as read" });
}
};
// Mark Messages as Read
socket.on("mark-as-read", markAsRead);
socket.on("join-conversation", joinConversation);
socket.on("open-messaging", openMessaging);
};
// Call Handlers
registerRoomAndBroadcastEvents(socket);
registerUpdateEvents(socket);
registerMessagingEvents(socket);
registerDisconnectEvents(socket);
};