Skip to content

Commit

Permalink
postgres listener: fix semaphore key
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Nov 2, 2023
1 parent cb114a0 commit fed0291
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions codebase/app/postgres_worker/postgres_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log"
"reflect"
"strconv"
"sync"

"github.com/golangid/candi/candihelper"
Expand Down Expand Up @@ -99,7 +98,7 @@ func NewWorker(service factory.ServiceFactory, opts ...OptionFunc) factory.AppSe

postgresSource.handlers[tableName] = handler

worker.semaphores[strconv.Itoa(postgresSource.workerIndex)+tableName] = make(chan struct{}, worker.opt.maxGoroutines)
worker.semaphores[tableName] = make(chan struct{}, worker.opt.maxGoroutines)
worker.opt.sources[sourceName] = postgresSource
logger.LogYellow(fmt.Sprintf(`[POSTGRES-LISTENER] (table): "%s"%s --> (module): "%s"`,
tableName, postgresSource.getLogForSourceName(), m.Name()))
Expand Down Expand Up @@ -141,10 +140,10 @@ func (p *postgresWorker) Serve() {
var payload EventPayload
json.Unmarshal([]byte(e.Extra), &payload)

p.semaphores[strconv.Itoa(chosen)+payload.Table] <- struct{}{}
p.semaphores[payload.Table] <- struct{}{}
p.wg.Add(1)
go func(data *EventPayload, workerIndex int) {
defer func() { p.wg.Done(); <-p.semaphores[strconv.Itoa(workerIndex)+data.Table] }()
defer func() { p.wg.Done(); <-p.semaphores[data.Table] }()

if p.ctx.Err() != nil {
logger.LogRed("postgres_listener > ctx root err: " + p.ctx.Err().Error())
Expand Down

0 comments on commit fed0291

Please sign in to comment.