feature/IO-3096-GlobalNotifications - Check-point
This commit is contained in:
@@ -6122,9 +6122,13 @@
|
|||||||
columns: '*'
|
columns: '*'
|
||||||
update:
|
update:
|
||||||
columns:
|
columns:
|
||||||
|
- joblineid
|
||||||
- assigned_to
|
- assigned_to
|
||||||
|
- partsorderid
|
||||||
- completed
|
- completed
|
||||||
- description
|
- description
|
||||||
|
- billid
|
||||||
|
- priority
|
||||||
retry_conf:
|
retry_conf:
|
||||||
interval_sec: 10
|
interval_sec: 10
|
||||||
num_retries: 0
|
num_retries: 0
|
||||||
@@ -6134,12 +6138,6 @@
|
|||||||
- name: event-secret
|
- name: event-secret
|
||||||
value_from_env: EVENT_SECRET
|
value_from_env: EVENT_SECRET
|
||||||
request_transform:
|
request_transform:
|
||||||
body:
|
|
||||||
action: transform
|
|
||||||
template: |-
|
|
||||||
{
|
|
||||||
"success": true
|
|
||||||
}
|
|
||||||
method: POST
|
method: POST
|
||||||
query_params: {}
|
query_params: {}
|
||||||
template_engine: Kriti
|
template_engine: Kriti
|
||||||
|
|||||||
@@ -1,5 +1,34 @@
|
|||||||
const handleTasksChange = (req, res) => {
|
const changeParser = require("../utils/changeParser");
|
||||||
|
const { hasScenarios } = require("../utils/scenarioMapperr");
|
||||||
|
const handleTasksChange = async (req, res) => {
|
||||||
|
try {
|
||||||
|
// Step 1: Parse the changes
|
||||||
|
const changes = await changeParser({
|
||||||
|
newData: req?.body?.event?.data?.new,
|
||||||
|
oldData: req?.body?.event?.data?.old,
|
||||||
|
trigger: req?.body?.trigger,
|
||||||
|
table: req?.body?.table
|
||||||
|
});
|
||||||
|
|
||||||
|
console.dir(changes, { depth: null });
|
||||||
|
|
||||||
|
const scenarios = hasScenarios({
|
||||||
|
table: changes.table.name,
|
||||||
|
keys: changes.changedFieldNames,
|
||||||
|
onNew: changes.isNew
|
||||||
|
});
|
||||||
|
|
||||||
|
console.dir(scenarios, { depth: null });
|
||||||
|
// Step 2: See if any scenarios match the changes
|
||||||
|
// Step 3: Handle the scenario
|
||||||
|
} catch (error) {
|
||||||
|
console.error("Error handling tasks change:", error);
|
||||||
|
return res.status(500).json({ message: "Error handling tasks change." });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get Bodyshop from hasura user id,
|
||||||
|
|
||||||
return res.status(200).json({ message: "Tasks change handled." });
|
return res.status(200).json({ message: "Tasks change handled." });
|
||||||
};
|
};
|
||||||
|
//
|
||||||
module.exports = handleTasksChange;
|
module.exports = handleTasksChange;
|
||||||
|
|||||||
@@ -0,0 +1,3 @@
|
|||||||
|
const tasksUpdatedCreatedBuilder = async () => {};
|
||||||
|
|
||||||
|
module.exports = tasksUpdatedCreatedBuilder;
|
||||||
43
server/notifications/utils/changeParser.js
Normal file
43
server/notifications/utils/changeParser.js
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
const changeParser = async ({ oldData, newData, trigger, table }) => {
|
||||||
|
const isNew = !oldData;
|
||||||
|
let changedFields = {};
|
||||||
|
let changedFieldNames = [];
|
||||||
|
|
||||||
|
if (isNew) {
|
||||||
|
// If there's no old data, every field in newData is considered changed (new)
|
||||||
|
changedFields = { ...newData };
|
||||||
|
changedFieldNames = Object.keys(newData);
|
||||||
|
} else {
|
||||||
|
// Compare oldData with newData for changes
|
||||||
|
for (const key in newData) {
|
||||||
|
if (Object.prototype.hasOwnProperty.call(newData, key)) {
|
||||||
|
// Check if the key exists in oldData and if values differ
|
||||||
|
if (
|
||||||
|
!Object.prototype.hasOwnProperty.call(oldData, key) ||
|
||||||
|
JSON.stringify(oldData[key]) !== JSON.stringify(newData[key])
|
||||||
|
) {
|
||||||
|
changedFields[key] = newData[key];
|
||||||
|
changedFieldNames.push(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Check for fields that were removed
|
||||||
|
for (const key in oldData) {
|
||||||
|
if (Object.prototype.hasOwnProperty.call(oldData, key) && !Object.prototype.hasOwnProperty.call(newData, key)) {
|
||||||
|
changedFields[key] = null; // Indicate field was removed
|
||||||
|
changedFieldNames.push(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
changedFieldNames,
|
||||||
|
changedFields,
|
||||||
|
isNew,
|
||||||
|
data: newData,
|
||||||
|
trigger,
|
||||||
|
table
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = changeParser;
|
||||||
52
server/notifications/utils/scenarioMapperr.js
Normal file
52
server/notifications/utils/scenarioMapperr.js
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
const tasksUpdatedCreatedBuilder = require("../scenarioBuilders/tasksUpdatedCreatedBuilder");
|
||||||
|
|
||||||
|
const notificationScenarios = [
|
||||||
|
{ key: "job-assigned-to-me", table: "jobs" },
|
||||||
|
{ key: "bill-posted", table: "bills" },
|
||||||
|
{ key: "critical-parts-status-changed" },
|
||||||
|
{ key: "part-marked-back-ordered" },
|
||||||
|
{ key: "new-note-added", table: "notes" },
|
||||||
|
{ key: "supplement-imported" },
|
||||||
|
{ key: "schedule-dates-changed", table: "jobs" },
|
||||||
|
{
|
||||||
|
key: "tasks-updated-created",
|
||||||
|
table: "tasks",
|
||||||
|
fields: ["updated_at"],
|
||||||
|
onNew: false,
|
||||||
|
builder: tasksUpdatedCreatedBuilder
|
||||||
|
},
|
||||||
|
{ key: "new-media-added-reassigned" },
|
||||||
|
{ key: "new-time-ticket-posted" },
|
||||||
|
{ key: "intake-delivery-checklist-completed" },
|
||||||
|
{ key: "job-added-to-production", table: "jobs" },
|
||||||
|
{ key: "job-status-change", table: "jobs" },
|
||||||
|
{ key: "payment-collected-completed" },
|
||||||
|
{ key: "alternate-transport-changed" }
|
||||||
|
];
|
||||||
|
|
||||||
|
// Helper function to find a scenario based on multiple criteria
|
||||||
|
function hasScenarios({ table, keys, onNew }) {
|
||||||
|
return (
|
||||||
|
notificationScenarios.find((scenario) => {
|
||||||
|
// Check if table matches if provided
|
||||||
|
if (table && scenario.table !== table) return false;
|
||||||
|
|
||||||
|
// Check if key matches if provided
|
||||||
|
if (keys && !keys.some((key) => scenario.key === key)) return false;
|
||||||
|
|
||||||
|
// Check if onNew matches if provided
|
||||||
|
if (onNew !== undefined && scenario.onNew !== onNew) return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}) || null
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Example usage:
|
||||||
|
// console.log(hasScenarios({ table: 'jobs', keys: ['job-assigned-to-me'], onNew: false }));
|
||||||
|
// console.log(hasScenarios({ onNew: true, keys: ['tasks-updated-created'] }));
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
notificationScenarios,
|
||||||
|
hasScenarios
|
||||||
|
};
|
||||||
7
server/utils/consoleDir.js
Normal file
7
server/utils/consoleDir.js
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
const { inspect } = require("node:util");
|
||||||
|
|
||||||
|
const consoleDir = (data) => {
|
||||||
|
console.log(inspect(data, { showHidden: false, depth: null, colors: true }));
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = consoleDir;
|
||||||
@@ -121,6 +121,27 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const addUserSocketMapping = async (email, socketId) => {
|
||||||
|
// Using a Redis set allows a user to have multiple active socket ids.
|
||||||
|
console.log(`Adding socket ${socketId} to user ${email}`);
|
||||||
|
return pubClient.sadd(`user:${email}:sockets`, socketId);
|
||||||
|
};
|
||||||
|
|
||||||
|
const removeUserSocketMapping = async (email, socketId) => {
|
||||||
|
console.log(`Removing socket ${socketId} from user ${email}`);
|
||||||
|
return pubClient.srem(`user:${email}:sockets`, socketId);
|
||||||
|
};
|
||||||
|
|
||||||
|
const getUserSocketMapping = async (email) => {
|
||||||
|
const key = `user:${email}:sockets`;
|
||||||
|
try {
|
||||||
|
return await pubClient.smembers(key);
|
||||||
|
} catch (error) {
|
||||||
|
console.error(`Error retrieving socket IDs for ${email}:`, error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
const api = {
|
const api = {
|
||||||
setSessionData,
|
setSessionData,
|
||||||
getSessionData,
|
getSessionData,
|
||||||
@@ -133,7 +154,10 @@ const applyRedisHelpers = ({ pubClient, app, logger }) => {
|
|||||||
clearList,
|
clearList,
|
||||||
addUserToRoom,
|
addUserToRoom,
|
||||||
removeUserFromRoom,
|
removeUserFromRoom,
|
||||||
getUsersInRoom
|
getUsersInRoom,
|
||||||
|
addUserSocketMapping,
|
||||||
|
removeUserSocketMapping,
|
||||||
|
getUserSocketMapping
|
||||||
};
|
};
|
||||||
|
|
||||||
Object.assign(module.exports, api);
|
Object.assign(module.exports, api);
|
||||||
|
|||||||
@@ -2,7 +2,13 @@ const { admin } = require("../firebase/firebase-handler");
|
|||||||
|
|
||||||
const redisSocketEvents = ({
|
const redisSocketEvents = ({
|
||||||
io,
|
io,
|
||||||
redisHelpers: { setSessionData, clearSessionData }, // Note: Used if we persist user to Redis
|
redisHelpers: {
|
||||||
|
setSessionData,
|
||||||
|
clearSessionData,
|
||||||
|
addUserSocketMapping,
|
||||||
|
removeUserSocketMapping,
|
||||||
|
getUserSocketMapping
|
||||||
|
}, // Note: Used if we persist user to Redis
|
||||||
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
|
ioHelpers: { getBodyshopRoom, getBodyshopConversationRoom },
|
||||||
logger
|
logger
|
||||||
}) => {
|
}) => {
|
||||||
@@ -12,30 +18,20 @@ const redisSocketEvents = ({
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Socket Auth Middleware
|
// Socket Auth Middleware
|
||||||
const authMiddleware = (socket, next) => {
|
const authMiddleware = async (socket, next) => {
|
||||||
|
if (!socket.handshake.auth.token) {
|
||||||
|
return next(new Error("Authentication error - no authorization token."));
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
if (socket.handshake.auth.token) {
|
const user = await admin.auth().verifyIdToken(socket.handshake.auth.token);
|
||||||
admin
|
socket.user = user;
|
||||||
.auth()
|
// Persist the user data in Redis for this socket
|
||||||
.verifyIdToken(socket.handshake.auth.token)
|
await setSessionData(socket.id, "user", user);
|
||||||
.then((user) => {
|
// Store a mapping from the user's email to the socket id
|
||||||
socket.user = user;
|
// await addUserSocketMapping(user.email, socket.id);
|
||||||
// Note: if we ever want to capture user data across sockets
|
next();
|
||||||
// Uncomment the following line and then remove the next() to a second then()
|
|
||||||
// return setSessionData(socket.id, "user", user);
|
|
||||||
next();
|
|
||||||
})
|
|
||||||
.catch((error) => {
|
|
||||||
next(new Error(`Authentication error: ${error.message}`));
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
next(new Error("Authentication error - no authorization token."));
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.log("websocket-connection-error", "error", null, null, {
|
next(new Error(`Authentication error: ${error.message}`));
|
||||||
...error
|
|
||||||
});
|
|
||||||
next(new Error(`Authentication error ${error}`));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -44,6 +40,10 @@ const redisSocketEvents = ({
|
|||||||
// Uncomment for further testing
|
// Uncomment for further testing
|
||||||
// createLogEvent(socket, "debug", `Registering RedisIO Socket Events.`);
|
// createLogEvent(socket, "debug", `Registering RedisIO Socket Events.`);
|
||||||
|
|
||||||
|
getUserSocketMapping(socket.user.email).then((socketIds) => {
|
||||||
|
console.log(socketIds);
|
||||||
|
});
|
||||||
|
|
||||||
// Token Update Events
|
// Token Update Events
|
||||||
const registerUpdateEvents = (socket) => {
|
const registerUpdateEvents = (socket) => {
|
||||||
let latestTokenTimestamp = 0;
|
let latestTokenTimestamp = 0;
|
||||||
@@ -53,37 +53,29 @@ const redisSocketEvents = ({
|
|||||||
latestTokenTimestamp = currentTimestamp;
|
latestTokenTimestamp = currentTimestamp;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Verify token with Firebase Admin SDK
|
|
||||||
const user = await admin.auth().verifyIdToken(newToken, true);
|
const user = await admin.auth().verifyIdToken(newToken, true);
|
||||||
|
|
||||||
// Skip outdated token validations
|
|
||||||
if (currentTimestamp < latestTokenTimestamp) {
|
if (currentTimestamp < latestTokenTimestamp) {
|
||||||
createLogEvent(socket, "warn", "Outdated token validation skipped.");
|
createLogEvent(socket, "warn", "Outdated token validation skipped.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
socket.user = user;
|
socket.user = user;
|
||||||
|
// Update the session data in Redis with the new token info
|
||||||
|
// await setSessionData(socket.id, "user", user);
|
||||||
|
// Update the mapping with the user's email
|
||||||
|
// await addUserSocketMapping(user.email, socket.id);
|
||||||
createLogEvent(socket, "debug", `Token updated successfully for socket ID: ${socket.id}`);
|
createLogEvent(socket, "debug", `Token updated successfully for socket ID: ${socket.id}`);
|
||||||
socket.emit("token-updated", { success: true });
|
socket.emit("token-updated", { success: true });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (error.code === "auth/id-token-expired") {
|
if (error.code === "auth/id-token-expired") {
|
||||||
createLogEvent(socket, "warn", "Stale token received, waiting for new token");
|
createLogEvent(socket, "warn", "Stale token received, waiting for new token");
|
||||||
socket.emit("token-updated", {
|
socket.emit("token-updated", { success: false, error: "Stale token." });
|
||||||
success: false,
|
return;
|
||||||
error: "Stale token."
|
|
||||||
});
|
|
||||||
return; // Avoid disconnecting for expired tokens
|
|
||||||
}
|
}
|
||||||
|
|
||||||
createLogEvent(socket, "error", `Token update failed for socket ID: ${socket.id}, Error: ${error.message}`);
|
createLogEvent(socket, "error", `Token update failed for socket ID: ${socket.id}, Error: ${error.message}`);
|
||||||
socket.emit("token-updated", { success: false, error: error.message });
|
socket.emit("token-updated", { success: false, error: error.message });
|
||||||
|
|
||||||
// Optionally disconnect for invalid tokens or other errors
|
|
||||||
socket.disconnect();
|
socket.disconnect();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.on("update-token", updateToken);
|
socket.on("update-token", updateToken);
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -127,16 +119,20 @@ const redisSocketEvents = ({
|
|||||||
|
|
||||||
// Disconnect Events
|
// Disconnect Events
|
||||||
const registerDisconnectEvents = (socket) => {
|
const registerDisconnectEvents = (socket) => {
|
||||||
const disconnect = () => {
|
const disconnect = async () => {
|
||||||
// Uncomment for further testing
|
// Remove session data from Redis
|
||||||
// createLogEvent(socket, "debug", `User disconnected.`);
|
// await clearSessionData(socket.id);
|
||||||
|
|
||||||
|
// Remove the mapping from user email to this socket id, if available
|
||||||
|
// if (socket.user?.email) {
|
||||||
|
// await removeUserSocketMapping(socket.user.email, socket.id);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// Leave all joined rooms
|
||||||
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
|
const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id);
|
||||||
for (const room of rooms) {
|
for (const room of rooms) {
|
||||||
socket.leave(room);
|
socket.leave(room);
|
||||||
}
|
}
|
||||||
// If we ever want to persist the user across workers
|
|
||||||
// clearSessionData(socket.id);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
socket.on("disconnect", disconnect);
|
socket.on("disconnect", disconnect);
|
||||||
|
|||||||
Reference in New Issue
Block a user