From 282e90cc1230d06642ba52c8753c51feddb03a62 Mon Sep 17 00:00:00 2001 From: xblack Date: Thu, 2 Jan 2025 17:40:25 +0530 Subject: [PATCH] fix: redis connection --- src/index.ts | 35 ++++++++++++++++++++--------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/src/index.ts b/src/index.ts index def78ae..b2f6802 100644 --- a/src/index.ts +++ b/src/index.ts @@ -664,7 +664,7 @@ spec: fs.writeFileSync(tempFile, jobTemplate) try { - await customExec("", "CREATE_JOB", "", `kubectl apply -f ${tempFile}`, true) + // await customExec("", "CREATE_JOB", "", `kubectl apply -f ${tempFile}`, true) } finally { fs.unlinkSync(tempFile) } @@ -1150,9 +1150,9 @@ const processJob = async () => { const serviceContext = path.join(gitRepoPath, service.servicePath) const dockerfilePath = path.join(serviceContext, 'Dockerfile') - const dockerBuildCommand = process.env.ENV === "production" - ? `${dockerBuildCli} bud --isolation chroot --platform=linux/amd64 --layers --cache-from ${localRegistryUrl}/${owner}/cache --cache-to ${localRegistryUrl}/${owner}/cache -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}` - : `${dockerBuildCli} build --platform=linux/amd64 -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}`; + const dockerBuildCommand = process.env.ENV === "production" + ? `${dockerBuildCli} bud --isolation chroot --platform=linux/amd64 --layers --cache-from ${localRegistryUrl}/${owner}/cache --cache-to ${localRegistryUrl}/${owner}/cache -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}` + : `${dockerBuildCli} build --platform=linux/amd64 -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}`; await customExec(deploymentRunId, "DOCKER_IMAGE_BUILD", serviceName, dockerBuildCommand) @@ -1616,7 +1616,7 @@ const processJob = async () => { } deploymentRunId = undefined - + process.exit(0) } @@ -1667,13 +1667,9 @@ process.on('unhandledRejection', async (reason, promise) => { // listen for messages in the redis stream if (process.env.PROCESS_JOB) { const subscriberClient = redis.createClient({ - url: `redis://:${process.env.REDIS_PASSWORD}@${process.env.REDIS_HOST}:${process.env.REDIS_PORT}` + url: `redis://:${process.env.REDIS_PASSWORD}@${process.env.REDIS_HOST}:${process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : 6379}` }) - subscriberClient.on('error', (err: any) => console.error('Subscriber error:', err)) - subscriberClient.on('ready', () => console.info(`[APP] Subscriber connected to Redis`)) - await subscriberClient.connect() - const streamKey = 'agent-job-stream' const consumerGroup = 'agent-job-processors' const consumer = `consumer-${process.env.DEPLOYMENT_RUN_ID}` @@ -1711,7 +1707,8 @@ if (process.env.PROCESS_JOB) { await processJob(); await subscriberClient.xAck(streamKey, consumerGroup, message.id); - return; + await subscriberClient.disconnect(); + process.exit(0); } // If not our message, acknowledge and continue to new messages await subscriberClient.xAck(streamKey, consumerGroup, message.id); @@ -1740,7 +1737,7 @@ if (process.env.PROCESS_JOB) { const message = messages[0].messages[0]; const { data, runId } = message.message; - console.log(`[APP] Checking message for runId: ${runId}`) + console.log(`[APP] Received message for runId: ${runId}`); if (runId === process.env.DEPLOYMENT_RUN_ID) { const parsedJobData = JSON.parse(Buffer.from(data, 'base64').toString()); @@ -1753,6 +1750,8 @@ if (process.env.PROCESS_JOB) { await processJob(); await subscriberClient.xAck(streamKey, consumerGroup, message.id); + await subscriberClient.disconnect(); + process.exit(0); return; } // If not our message, acknowledge and continue @@ -1760,10 +1759,16 @@ if (process.env.PROCESS_JOB) { } } catch (error) { console.error('Error processing message:', error); - throw error; + await subscriberClient.disconnect(); + process.exit(1); } } - // Start processing messages - processMessages().catch(console.error) + subscriberClient.on('error', (err: any) => console.error('Subscriber error:', err)) + subscriberClient.on('ready', async () => { + console.info(`[APP] Subscriber connected to Redis`) + processMessages().catch(console.error) + }) + + await subscriberClient.connect() } \ No newline at end of file