const { admin } = require("../firebase/firebase-handler"); const { MARK_MESSAGES_AS_READ, GET_CONVERSATIONS, GET_CONVERSATION_DETAILS } = require("../graphql-client/queries"); const { phone } = require("phone"); const { client: gqlClient } = require("../graphql-client/graphql-client"); const queries = require("../graphql-client/queries"); const twilio = require("twilio"); const client = require("../graphql-client/graphql-client").client; const twilioClient = twilio(process.env.TWILIO_AUTH_TOKEN, process.env.TWILIO_AUTH_KEY); const redisSocketEvents = ({ io, redisHelpers: { setSessionData, clearSessionData }, // Note: Used if we persist user to Redis ioHelpers: { getBodyshopRoom }, logger }) => { // Logging helper functions const createLogEvent = (socket, level, message) => { logger.log("ioredis-log-event", level, socket?.user?.email, null, { wsmessage: message }); }; // Socket Auth Middleware const authMiddleware = (socket, next) => { try { if (socket.handshake.auth.token) { admin .auth() .verifyIdToken(socket.handshake.auth.token) .then((user) => { socket.user = user; // Note: if we ever want to capture user data across sockets // Uncomment the following line and then remove the next() to a second then() // return setSessionData(socket.id, "user", user); next(); }) .catch((error) => { next(new Error(`Authentication error: ${error.message}`)); }); } else { next(new Error("Authentication error - no authorization token.")); } } catch (error) { logger.log("websocket-connection-error", "error", null, null, { ...error }); next(new Error(`Authentication error ${error}`)); } }; // Register Socket Events const registerSocketEvents = (socket) => { // Uncomment for further testing // createLogEvent(socket, "debug", `Registering RedisIO Socket Events.`); // Token Update Events const registerUpdateEvents = (socket) => { const updateToken = async (newToken) => { try { // noinspection UnnecessaryLocalVariableJS const user = await admin.auth().verifyIdToken(newToken, true); socket.user = user; // If We ever want to persist user Data across workers // await setSessionData(socket.id, "user", user); // Uncomment for further testing // createLogEvent(socket, "debug", "Token updated successfully"); socket.emit("token-updated", { success: true }); } catch (error) { if (error.code === "auth/id-token-expired") { createLogEvent(socket, "warn", "Stale token received, waiting for new token"); socket.emit("token-updated", { success: false, error: "Stale token." }); } else { createLogEvent(socket, "error", `Token update failed: ${error.message}`); socket.emit("token-updated", { success: false, error: error.message }); // For any other errors, optionally disconnect the socket socket.disconnect(); } } }; socket.on("update-token", updateToken); }; // Room Broadcast Events const registerRoomAndBroadcastEvents = (socket) => { const joinBodyshopRoom = (bodyshopUUID) => { try { const room = getBodyshopRoom(bodyshopUUID); socket.join(room); // createLogEvent(socket, "debug", `Client joined bodyshop room: ${room}`); } catch (error) { createLogEvent(socket, "error", `Error joining room: ${error}`); } }; const leaveBodyshopRoom = (bodyshopUUID) => { try { const room = getBodyshopRoom(bodyshopUUID); socket.leave(room); createLogEvent(socket, "debug", `Client left bodyshop room: ${room}`); } catch (error) { createLogEvent(socket, "error", `Error joining room: ${error}`); } }; const broadcastToBodyshopRoom = (bodyshopUUID, message) => { try { const room = getBodyshopRoom(bodyshopUUID); io.to(room).emit("bodyshop-message", message); // We do not need this as these can be debugged live // createLogEvent(socket, "debug", `Broadcast message to bodyshop ${room}`); } catch (error) { createLogEvent(socket, "error", `Error getting room: ${error}`); } }; socket.on("join-bodyshop-room", joinBodyshopRoom); socket.on("leave-bodyshop-room", leaveBodyshopRoom); socket.on("broadcast-to-bodyshop", broadcastToBodyshopRoom); }; // Disconnect Events const registerDisconnectEvents = (socket) => { const disconnect = () => { // Uncomment for further testing // createLogEvent(socket, "debug", `User disconnected.`); const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id); for (const room of rooms) { socket.leave(room); } // If we ever want to persist the user across workers // clearSessionData(socket.id); }; socket.on("disconnect", disconnect); }; // Messaging Events const registerMessagingEvents = (socket) => { const openMessaging = async (bodyshopUUID) => { try { const conversations = await client.request(GET_CONVERSATIONS, { bodyshopId: bodyshopUUID }); socket.emit("messaging-list", { conversations }); } catch (error) { logger.log("error", "Failed to fetch conversations", error); socket.emit("error", { message: "Failed to fetch conversations" }); } }; const joinConversation = async (conversationId) => { try { const room = `conversation-${conversationId}`; socket.join(room); // Fetch conversation details and messages const data = await client.request(GET_CONVERSATION_DETAILS, { conversationId }); socket.emit("conversation-details", data); // Send data to the client } catch (error) { logger.log("error", "Failed to join conversation", error); socket.emit("error", { message: "Failed to join conversation" }); } }; const markAsRead = async ({ conversationId, userId, imexshopid, bodyshopId }) => { try { await client.request(MARK_MESSAGES_AS_READ, { conversationId, userId }); // Fetch the updated unread count for this conversation const conversations = await client.request(GET_CONVERSATIONS, { bodyshopId }); // Emit the updated unread count to all clients const room = `conversation-${conversationId}`; io.to(room).emit("messaging-list", { conversations }); admin.messaging().send({ topic: `${imexshopid}-messaging`, data: { type: "messaging-mark-conversation-read", conversationid: conversationId || "" } }); } catch (error) { logger.log("Failed to mark messages as read", "error", null, null, { message: error.message, stack: error.stack }); socket.emit("error", { message: "Failed to mark messages as read" }); } }; const sendMessage = (data) => { const { to, messagingServiceSid, body, conversationid, selectedMedia, imexshopid, user } = data; logger.log("sms-outbound", "DEBUG", user.email, null, { messagingServiceSid: messagingServiceSid, to: phone(to).phoneNumber, mediaUrl: selectedMedia.map((i) => i.src), text: body, conversationid, isoutbound: true, userid: user.email, image: selectedMedia?.length > 0, image_path: selectedMedia?.length > 0 ? selectedMedia.map((i) => i.src) : [] }); if (!!to && !!messagingServiceSid && (!!body || !!selectedMedia?.length > 0) && !!conversationid) { twilioClient.messages .create({ body: body, messagingServiceSid: messagingServiceSid, to: phone(to).phoneNumber, mediaUrl: selectedMedia.map((i) => i.src) }) .then((message) => { let newMessage = { msid: message.sid, text: body, conversationid, isoutbound: true, userid: user.email, image: selectedMedia?.length > 0, image_path: selectedMedia?.length > 0 ? selectedMedia.map((i) => i.src) : [] }; gqlClient .request(queries.INSERT_MESSAGE, { msg: newMessage, conversationid }) .then((r2) => { logger.log("sms-outbound-success", "DEBUG", user.email, null, { msid: message.sid, conversationid }); const data = { type: "messaging-outbound", conversationid: newMessage.conversationid || "" }; // TODO Verify // const messageData = response.insert_messages.returning[0]; // Broadcast new message to conversation room const room = `conversation-${conversationid}`; io.to(room).emit("new-message", newMessage); admin.messaging().send({ topic: `${imexshopid}-messaging`, data }); }) .catch((e2) => { logger.log("sms-outbound-error", "ERROR", user.email, null, { msid: message.sid, conversationid, error: e2 }); }); }) .catch((e1) => { logger.log("sms-outbound-error", "ERROR", user.email, null, { conversationid, error: e1 }); }); } else { logger.log("sms-outbound-error", "ERROR", user.email, null, { type: "missing-parameters", messagingServiceSid: messagingServiceSid, to: phone(to).phoneNumber, text: body, conversationid, isoutbound: true, userid: user.email, image: selectedMedia?.length > 0, image_path: selectedMedia?.length > 0 ? selectedMedia.map((i) => i.src) : [] }); } }; const leaveConversation = (conversationId) => { try { const room = `conversation-${conversationId}`; socket.leave(room); // Optionally notify the client socket.emit("conversation-left", { conversationId }); } catch (error) { socket.emit("error", { message: "Failed to leave conversation" }); } }; socket.on("leave-conversation", leaveConversation); socket.on("send-message", sendMessage); socket.on("mark-as-read", markAsRead); socket.on("join-conversation", joinConversation); socket.on("open-messaging", openMessaging); }; // Call Handlers registerRoomAndBroadcastEvents(socket); registerUpdateEvents(socket); registerMessagingEvents(socket); registerDisconnectEvents(socket); }; // Associate Middleware and Handlers io.use(authMiddleware); io.on("connection", registerSocketEvents); }; module.exports = { redisSocketEvents };