From fed02913ff7a334208a9ce2480142621a988d454 Mon Sep 17 00:00:00 2001 From: agungdwiprasetyo Date: Thu, 2 Nov 2023 14:04:41 +0700 Subject: [PATCH] postgres listener: fix semaphore key --- codebase/app/postgres_worker/postgres_worker.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/codebase/app/postgres_worker/postgres_worker.go b/codebase/app/postgres_worker/postgres_worker.go index 36751ca..7ff3c41 100644 --- a/codebase/app/postgres_worker/postgres_worker.go +++ b/codebase/app/postgres_worker/postgres_worker.go @@ -7,7 +7,6 @@ import ( "fmt" "log" "reflect" - "strconv" "sync" "github.com/golangid/candi/candihelper" @@ -99,7 +98,7 @@ func NewWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppSe postgresSource.handlers[tableName] = handler - worker.semaphores[strconv.Itoa(postgresSource.workerIndex)+tableName] = make(chan struct{}, worker.opt.maxGoroutines) + worker.semaphores[tableName] = make(chan struct{}, worker.opt.maxGoroutines) worker.opt.sources[sourceName] = postgresSource logger.LogYellow(fmt.Sprintf(`[POSTGRES-LISTENER] (table): "%s"%s --> (module): "%s"`, tableName, postgresSource.getLogForSourceName(), m.Name())) @@ -141,10 +140,10 @@ func (p *postgresWorker) Serve() { var payload EventPayload json.Unmarshal([]byte(e.Extra), &payload) - p.semaphores[strconv.Itoa(chosen)+payload.Table] <- struct{}{} + p.semaphores[payload.Table] <- struct{}{} p.wg.Add(1) go func(data *EventPayload, workerIndex int) { - defer func() { p.wg.Done(); <-p.semaphores[strconv.Itoa(workerIndex)+data.Table] }() + defer func() { p.wg.Done(); <-p.semaphores[data.Table] }() if p.ctx.Err() != nil { logger.LogRed("postgres_listener > ctx root err: " + p.ctx.Err().Error())