Skip to content

Commit

Permalink
fix: redis connection
Browse files Browse the repository at this point in the history
  • Loading branch information
code-crusher committed Jan 2, 2025
1 parent 9af4943 commit 282e90c
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ spec:
fs.writeFileSync(tempFile, jobTemplate)

try {
await customExec("", "CREATE_JOB", "", `kubectl apply -f ${tempFile}`, true)
// await customExec("", "CREATE_JOB", "", `kubectl apply -f ${tempFile}`, true)
} finally {
fs.unlinkSync(tempFile)
}
Expand Down Expand Up @@ -1150,9 +1150,9 @@ const processJob = async () => {
const serviceContext = path.join(gitRepoPath, service.servicePath)
const dockerfilePath = path.join(serviceContext, 'Dockerfile')

const dockerBuildCommand = process.env.ENV === "production"
? `${dockerBuildCli} bud --isolation chroot --platform=linux/amd64 --layers --cache-from ${localRegistryUrl}/${owner}/cache --cache-to ${localRegistryUrl}/${owner}/cache -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}`
: `${dockerBuildCli} build --platform=linux/amd64 -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}`;
const dockerBuildCommand = process.env.ENV === "production"
? `${dockerBuildCli} bud --isolation chroot --platform=linux/amd64 --layers --cache-from ${localRegistryUrl}/${owner}/cache --cache-to ${localRegistryUrl}/${owner}/cache -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}`
: `${dockerBuildCli} build --platform=linux/amd64 -t ${owner}/${serviceName}:latest -f ${dockerfilePath} ${serviceContext}`;

await customExec(deploymentRunId, "DOCKER_IMAGE_BUILD", serviceName, dockerBuildCommand)

Expand Down Expand Up @@ -1616,7 +1616,7 @@ const processJob = async () => {
}

deploymentRunId = undefined

process.exit(0)
}

Expand Down Expand Up @@ -1667,13 +1667,9 @@ process.on('unhandledRejection', async (reason, promise) => {
// listen for messages in the redis stream
if (process.env.PROCESS_JOB) {
const subscriberClient = redis.createClient({
url: `redis://:${process.env.REDIS_PASSWORD}@${process.env.REDIS_HOST}:${process.env.REDIS_PORT}`
url: `redis://:${process.env.REDIS_PASSWORD}@${process.env.REDIS_HOST}:${process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : 6379}`
})

subscriberClient.on('error', (err: any) => console.error('Subscriber error:', err))
subscriberClient.on('ready', () => console.info(`[APP] Subscriber connected to Redis`))
await subscriberClient.connect()

const streamKey = 'agent-job-stream'
const consumerGroup = 'agent-job-processors'
const consumer = `consumer-${process.env.DEPLOYMENT_RUN_ID}`
Expand Down Expand Up @@ -1711,7 +1707,8 @@ if (process.env.PROCESS_JOB) {

await processJob();
await subscriberClient.xAck(streamKey, consumerGroup, message.id);
return;
await subscriberClient.disconnect();
process.exit(0);
}
// If not our message, acknowledge and continue to new messages
await subscriberClient.xAck(streamKey, consumerGroup, message.id);
Expand Down Expand Up @@ -1740,7 +1737,7 @@ if (process.env.PROCESS_JOB) {
const message = messages[0].messages[0];
const { data, runId } = message.message;

console.log(`[APP] Checking message for runId: ${runId}`)
console.log(`[APP] Received message for runId: ${runId}`);

if (runId === process.env.DEPLOYMENT_RUN_ID) {
const parsedJobData = JSON.parse(Buffer.from(data, 'base64').toString());
Expand All @@ -1753,17 +1750,25 @@ if (process.env.PROCESS_JOB) {

await processJob();
await subscriberClient.xAck(streamKey, consumerGroup, message.id);
await subscriberClient.disconnect();
process.exit(0);
return;
}
// If not our message, acknowledge and continue
await subscriberClient.xAck(streamKey, consumerGroup, message.id);
}
} catch (error) {
console.error('Error processing message:', error);
throw error;
await subscriberClient.disconnect();
process.exit(1);
}
}

// Start processing messages
processMessages().catch(console.error)
subscriberClient.on('error', (err: any) => console.error('Subscriber error:', err))
subscriberClient.on('ready', async () => {
console.info(`[APP] Subscriber connected to Redis`)
processMessages().catch(console.error)
})

await subscriberClient.connect()
}

0 comments on commit 282e90c

Please sign in to comment.