Skip to content

Commit

Permalink
update worker event context buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Mar 7, 2023
1 parent 4204665 commit 4cc1f6a
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 54 deletions.
26 changes: 11 additions & 15 deletions candishared/event_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@ type EventContext struct {
key string
err error

buff *bytes.Buffer
messageBuff *bytes.Buffer
}

// NewEventContext event context constructor
func NewEventContext(buff *bytes.Buffer) *EventContext {
return &EventContext{
messageBuff: buff,
}
}

// SetContext setter
Expand Down Expand Up @@ -73,7 +80,7 @@ func (e *EventContext) Key() string {

// Message get context
func (e *EventContext) Message() []byte {
return e.buff.Bytes()
return e.messageBuff.Bytes()
}

// Err get error
Expand All @@ -83,21 +90,10 @@ func (e *EventContext) Err() error {

// Read implement io.Reader
func (e *EventContext) Read(p []byte) (n int, err error) {
return e.buff.Read(p)
return e.messageBuff.Read(p)
}

// Write implement io.Writer
func (e *EventContext) Write(p []byte) (n int, err error) {
if e.buff == nil {
e.buff = &bytes.Buffer{}
}
return e.buff.Write(p)
}

// WriteString method
func (e *EventContext) WriteString(s string) (n int, err error) {
if e.buff == nil {
e.buff = &bytes.Buffer{}
}
return e.buff.WriteString(s)
return e.messageBuff.Write(p)
}
26 changes: 3 additions & 23 deletions candiutils/cronparser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -47,28 +48,6 @@ var (
)

var (
numberTokens = map[string]int{
"0": 0, "1": 1, "2": 2, "3": 3, "4": 4, "5": 5, "6": 6, "7": 7, "8": 8, "9": 9,
"00": 0, "01": 1, "02": 2, "03": 3, "04": 4, "05": 5, "06": 6, "07": 7, "08": 8, "09": 9,
"10": 10, "11": 11, "12": 12, "13": 13, "14": 14, "15": 15, "16": 16, "17": 17, "18": 18, "19": 19,
"20": 20, "21": 21, "22": 22, "23": 23, "24": 24, "25": 25, "26": 26, "27": 27, "28": 28, "29": 29,
"30": 30, "31": 31, "32": 32, "33": 33, "34": 34, "35": 35, "36": 36, "37": 37, "38": 38, "39": 39,
"40": 40, "41": 41, "42": 42, "43": 43, "44": 44, "45": 45, "46": 46, "47": 47, "48": 48, "49": 49,
"50": 50, "51": 51, "52": 52, "53": 53, "54": 54, "55": 55, "56": 56, "57": 57, "58": 58, "59": 59,
"1970": 1970, "1971": 1971, "1972": 1972, "1973": 1973, "1974": 1974, "1975": 1975, "1976": 1976, "1977": 1977, "1978": 1978, "1979": 1979,
"1980": 1980, "1981": 1981, "1982": 1982, "1983": 1983, "1984": 1984, "1985": 1985, "1986": 1986, "1987": 1987, "1988": 1988, "1989": 1989,
"1990": 1990, "1991": 1991, "1992": 1992, "1993": 1993, "1994": 1994, "1995": 1995, "1996": 1996, "1997": 1997, "1998": 1998, "1999": 1999,
"2000": 2000, "2001": 2001, "2002": 2002, "2003": 2003, "2004": 2004, "2005": 2005, "2006": 2006, "2007": 2007, "2008": 2008, "2009": 2009,
"2010": 2010, "2011": 2011, "2012": 2012, "2013": 2013, "2014": 2014, "2015": 2015, "2016": 2016, "2017": 2017, "2018": 2018, "2019": 2019,
"2020": 2020, "2021": 2021, "2022": 2022, "2023": 2023, "2024": 2024, "2025": 2025, "2026": 2026, "2027": 2027, "2028": 2028, "2029": 2029,
"2030": 2030, "2031": 2031, "2032": 2032, "2033": 2033, "2034": 2034, "2035": 2035, "2036": 2036, "2037": 2037, "2038": 2038, "2039": 2039,
"2040": 2040, "2041": 2041, "2042": 2042, "2043": 2043, "2044": 2044, "2045": 2045, "2046": 2046, "2047": 2047, "2048": 2048, "2049": 2049,
"2050": 2050, "2051": 2051, "2052": 2052, "2053": 2053, "2054": 2054, "2055": 2055, "2056": 2056, "2057": 2057, "2058": 2058, "2059": 2059,
"2060": 2060, "2061": 2061, "2062": 2062, "2063": 2063, "2064": 2064, "2065": 2065, "2066": 2066, "2067": 2067, "2068": 2068, "2069": 2069,
"2070": 2070, "2071": 2071, "2072": 2072, "2073": 2073, "2074": 2074, "2075": 2075, "2076": 2076, "2077": 2077, "2078": 2078, "2079": 2079,
"2080": 2080, "2081": 2081, "2082": 2082, "2083": 2083, "2084": 2084, "2085": 2085, "2086": 2086, "2087": 2087, "2088": 2088, "2089": 2089,
"2090": 2090, "2091": 2091, "2092": 2092, "2093": 2093, "2094": 2094, "2095": 2095, "2096": 2096, "2097": 2097, "2098": 2098, "2099": 2099,
}
monthTokens = map[string]int{
`1`: 1, `jan`: 1, `january`: 1,
`2`: 2, `feb`: 2, `february`: 2,
Expand Down Expand Up @@ -96,7 +75,8 @@ var (
)

func atoi(s string) int {
return numberTokens[s]
i, _ := strconv.Atoi(s)
return i
}

type fieldDescriptor struct {
Expand Down
11 changes: 6 additions & 5 deletions codebase/app/cron_worker/cron_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cronworker
// cron scheduler worker, create with 100% pure internal go library (using reflect select channel)

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -181,28 +182,28 @@ func (c *cronWorker) processJob(job *Job) {
trace.SetError(fmt.Errorf("%v", r))
trace.Log("stacktrace", string(debug.Stack()))
}
logger.LogGreen("cron scheduler > trace_url: " + tracer.GetTraceURL(ctx))
logger.LogGreen("cron_scheduler > trace_url: " + tracer.GetTraceURL(ctx))
trace.SetTag("trace_id", tracer.GetTraceID(ctx))
trace.Finish()
}()
trace.SetTag("job_name", job.HandlerName)
trace.SetTag("job_param", job.Params)
trace.Log("job_param", job.Params)

if c.opt.debugMode {
log.Printf("\x1b[35;3mCron Scheduler: executing task '%s' (interval: %s)\x1b[0m", job.HandlerName, job.Interval)
}

var eventContext candishared.EventContext
eventContext := candishared.NewEventContext(bytes.NewBuffer(make([]byte, 256)))
eventContext.SetContext(ctx)
eventContext.SetWorkerType(string(types.Scheduler))
eventContext.SetHandlerRoute(job.HandlerName)
eventContext.SetHeader(map[string]string{
"interval": job.Interval,
})
eventContext.WriteString(job.Params)
eventContext.Write([]byte(job.Params))

for _, handlerFunc := range job.Handler.HandlerFuncs {
if err := handlerFunc(&eventContext); err != nil {
if err := handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
Expand Down
5 changes: 3 additions & 2 deletions codebase/app/kafka_worker/default_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafkaworker

import (
"bytes"
"fmt"
"log"
"runtime/debug"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, me
log.Printf("\x1b[35;3mKafka Consumer: message consumed, timestamp = %v, topic = %s\x1b[0m", message.Timestamp, message.Topic)
}

var eventContext candishared.EventContext
eventContext := candishared.NewEventContext(bytes.NewBuffer(make([]byte, 256)))
eventContext.SetContext(ctx)
eventContext.SetWorkerType(string(types.Kafka))
eventContext.SetHandlerRoute(message.Topic)
Expand All @@ -99,7 +100,7 @@ func (c *consumerHandler) processMessage(session sarama.ConsumerGroupSession, me
eventContext.Write(message.Value)

for _, handlerFunc := range handler.HandlerFuncs {
if err := handlerFunc(&eventContext); err != nil {
if err := handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
Expand Down
9 changes: 5 additions & 4 deletions codebase/app/postgres_worker/postgres_worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgresworker

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -238,13 +239,13 @@ func (p *postgresWorker) execEvent(workerIndex int, data *EventPayload) {
}
}

message, _ := json.Marshal(data)

var eventContext candishared.EventContext
eventContext := candishared.NewEventContext(bytes.NewBuffer(make([]byte, 256)))
eventContext.SetContext(ctx)
eventContext.SetWorkerType(string(types.PostgresListener))
eventContext.SetHandlerRoute(data.Table)
eventContext.SetKey(data.EventID)

message, _ := json.Marshal(data)
eventContext.Write(message)

if source.name != "" {
Expand All @@ -259,7 +260,7 @@ func (p *postgresWorker) execEvent(workerIndex int, data *EventPayload) {
trace.Log("payload", data)

for _, handlerFunc := range handler.HandlerFuncs {
if err := handlerFunc(&eventContext); err != nil {
if err := handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
Expand Down
5 changes: 3 additions & 2 deletions codebase/app/rabbitmq_worker/rabbitmq_worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rabbitmqworker

import (
"bytes"
"context"
"fmt"
"log"
Expand Down Expand Up @@ -173,7 +174,7 @@ func (r *rabbitmqWorker) processMessage(message amqp.Delivery) {
log.Printf("\x1b[35;3mRabbitMQ Consumer: message consumed, topic = %s\x1b[0m", message.RoutingKey)
}

var eventContext candishared.EventContext
eventContext := candishared.NewEventContext(bytes.NewBuffer(make([]byte, 256)))
eventContext.SetContext(ctx)
eventContext.SetWorkerType(string(types.RabbitMQ))
eventContext.SetHandlerRoute(message.RoutingKey)
Expand All @@ -182,7 +183,7 @@ func (r *rabbitmqWorker) processMessage(message amqp.Delivery) {
eventContext.Write(message.Body)

for _, handlerFunc := range selectedHandler.HandlerFuncs {
if err := handlerFunc(&eventContext); err != nil {
if err := handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
Expand Down
7 changes: 4 additions & 3 deletions codebase/app/redis_worker/redis_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redisworker
// Redis subscriber worker codebase

import (
"bytes"
"context"
"fmt"
"log"
Expand Down Expand Up @@ -202,15 +203,15 @@ func (r *redisWorker) processMessage(param RedisMessage) {
trace.SetTag("event_id", param.EventID)
trace.Log("message", param.Message)

var eventContext candishared.EventContext
eventContext := candishared.NewEventContext(bytes.NewBuffer(make([]byte, 256)))
eventContext.SetContext(ctx)
eventContext.SetWorkerType(string(types.RedisSubscriber))
eventContext.SetHandlerRoute(param.HandlerName)
eventContext.SetKey(param.EventID)
eventContext.WriteString(param.Message)
eventContext.Write([]byte(param.Message))

for _, handlerFunc := range selectedHandler.HandlerFuncs {
if err := handlerFunc(&eventContext); err != nil {
if err := handlerFunc(eventContext); err != nil {
eventContext.SetError(err)
trace.SetError(err)
}
Expand Down

0 comments on commit 4cc1f6a

Please sign in to comment.