IO-3166-Global-Notifications-Part-2: getAwsClusterFix

This commit is contained in:
Dave Richer
2025-03-07 15:58:52 -05:00
parent 14a885b443
commit 1d84dd1a83

View File

@@ -28,7 +28,11 @@ const logger = require("./server/utils/logger");
const { applyRedisHelpers } = require("./server/utils/redisHelpers");
const { applyIOHelpers } = require("./server/utils/ioHelpers");
const { redisSocketEvents } = require("./server/web-sockets/redisSocketEvents");
const { ElastiCacheClient, DescribeCacheClustersCommand } = require("@aws-sdk/client-elasticache");
const {
ElastiCacheClient,
DescribeCacheClustersCommand,
DescribeReplicationGroupsCommand
} = require("@aws-sdk/client-elasticache");
const { InstanceRegion } = require("./server/utils/instanceMgr");
const StartStatusReporter = require("./server/utils/statusReporter");
const { registerCleanupTask, initializeCleanupManager } = require("./server/utils/cleanupManager");
@@ -126,39 +130,43 @@ const applyRoutes = ({ app }) => {
* @returns {Promise<string[]>}
*/
const getRedisNodesFromAWS = async () => {
const client = new ElastiCacheClient({
region: InstanceRegion()
});
const params = {
ReplicationGroupId: process.env.REDIS_CLUSTER_ID,
ShowCacheNodeInfo: true
};
const client = new ElastiCacheClient({ region: InstanceRegion() });
try {
// Fetch the cache clusters associated with the replication group
const command = new DescribeCacheClustersCommand(params);
const response = await client.send(command);
const cacheClusters = response.CacheClusters;
const describeReplicationGroupCommand = new DescribeReplicationGroupsCommand({
ReplicationGroupId: process.env.REDIS_CLUSTER_ID
});
const describeReplicationGroupResponse = await client.send(describeReplicationGroupCommand);
//TODO: add checking to make sure there's only 1.
const cacheClusterIds = describeReplicationGroupResponse.ReplicationGroups[0].MemberClusters;
// Ensure cacheClusters exists and is an array
if (!cacheClusters || !Array.isArray(cacheClusters) || cacheClusters.length === 0) {
if (!cacheClusterIds || !Array.isArray(cacheClusterIds) || cacheClusterIds.length === 0) {
logger.log(`No cache clusters found for cluster id ${process.env.REDIS_CLUSTER_ID}`, "ERROR", "redis", "api");
return [];
}
// Process each cluster
return cacheClusters.flatMap((cluster) => {
if (cluster.CacheNodes && Array.isArray(cluster.CacheNodes)) {
const nodeEndpointAddresses = [];
for (const cluster of cacheClusterIds) {
const params = { CacheClusterId: cluster, ShowCacheNodeInfo: true };
const command = new DescribeCacheClustersCommand(params);
const response = await client.send(command);
if (response.CacheClusters && Array.isArray(response.CacheClusters)) {
// Map nodes to address strings
const addresses = cluster.CacheNodes.map((node) => `${node.Endpoint.Address}:${node.Endpoint.Port}`);
//TODO: What happens if we have more shards?
const nodeAddress = `${response.CacheClusters[0].CacheNodes[0].Endpoint.Address}:${response.CacheClusters[0].CacheNodes[0].Endpoint.Port}`;
// Debug log node addresses
logger.log(`Cluster node addresses: ${addresses.join(", ")}`, "DEBUG", "redis", "api");
logger.log(`Cluster node addresses: ${nodeAddress}`, "DEBUG", "redis", "api");
// Return only those addresses that start with the current cluster id
return addresses.filter((address) => address.startsWith(process.env.REDIS_CLUSTER_ID));
nodeEndpointAddresses.push(nodeAddress);
}
return [];
});
}
return nodeEndpointAddresses;
// Process each cluster
} catch (err) {
logger.log(`Error fetching Redis nodes from AWS:`, "ERROR", "redis", "api", {
message: err?.message,