Skip to content

Commit

Permalink
chore: clean up unused variables
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Nov 3, 2022
2 parents 4390c3f + 58c69b4 commit f030706
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 50 deletions.
1 change: 0 additions & 1 deletion app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func LoadOptions(args []string) *Options {

// Ignore errors; flagSet is set for ExitOnError.
_ = flagSet.Parse(args[1:])
flag.Parse()

return &Options{
NormalMode: *normalMode,
Expand Down
6 changes: 3 additions & 3 deletions router/failed-events-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type FailedEventRowT struct {

var (
failedKeysTablePrefix = "failed_keys"
failedKeysExpire = 48 * time.Hour
failedKeysCleanUpSleep = 24 * time.Hour
failedKeysEnabled = false
failedKeysExpire time.Duration
failedKeysCleanUpSleep time.Duration
failedKeysEnabled bool
)

type FailedEventsManagerI interface {
Expand Down
84 changes: 38 additions & 46 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ type HandleDestOAuthRespParamsT struct {
secret json.RawMessage
}

type DiagnosticT struct {
requestsMetricLock sync.RWMutex
failureMetricLock sync.RWMutex
diagnosisTicker *time.Ticker
requestsMetric []requestMetric
failuresMetric map[string]map[string]int
}

// HandleT is the handle to this module.
type HandleT struct {
responseQ chan jobResponseT
Expand All @@ -74,11 +82,7 @@ type HandleT struct {
destName string
destinationId string
workers []*workerT
requestsMetricLock sync.RWMutex
failureMetricLock sync.RWMutex
diagnosisTicker *time.Ticker
requestsMetric []requestMetric
failuresMetric map[string]map[string]int
telemetry *DiagnosticT
customDestinationManager customDestinationManager.DestinationManager
throttler throttler.Throttler
guaranteeUserEventOrder bool
Expand Down Expand Up @@ -194,15 +198,14 @@ type destJobCountsT struct {
}

var (
jobQueryBatchSize, updateStatusBatchSize, noOfJobsPerChannel int
failedEventsCacheSize int
readSleep, minSleep, maxStatusUpdateWait, diagnosisTickerTime time.Duration
minRetryBackoff, maxRetryBackoff, jobsBatchTimeout time.Duration
pkgLogger logger.Logger
Diagnostics diagnostics.DiagnosticsI
fixedLoopSleep time.Duration
toAbortDestinationIDs string
disableEgress bool
jobQueryBatchSize, updateStatusBatchSize, noOfJobsPerChannel int
readSleep, maxStatusUpdateWait, diagnosisTickerTime time.Duration
minRetryBackoff, maxRetryBackoff, jobsBatchTimeout time.Duration
pkgLogger logger.Logger
Diagnostics diagnostics.DiagnosticsI
fixedLoopSleep time.Duration
toAbortDestinationIDs string
disableEgress bool
)

var jsonfast = jsoniter.ConfigCompatibleWithStandardLibrary
Expand Down Expand Up @@ -231,15 +234,13 @@ func loadConfig() {
config.RegisterDurationConfigVariable(1000, &readSleep, true, time.Millisecond, []string{"Router.readSleep", "Router.readSleepInMS"}...)
config.RegisterIntConfigVariable(1000, &noOfJobsPerChannel, false, 1, "Router.noOfJobsPerChannel")
config.RegisterDurationConfigVariable(5, &jobsBatchTimeout, true, time.Second, []string{"Router.jobsBatchTimeout", "Router.jobsBatchTimeoutInSec"}...)
config.RegisterDurationConfigVariable(0, &minSleep, false, time.Second, []string{"Router.minSleep", "Router.minSleepInS"}...)
config.RegisterDurationConfigVariable(5, &maxStatusUpdateWait, true, time.Second, []string{"Router.maxStatusUpdateWait", "Router.maxStatusUpdateWaitInS"}...)
config.RegisterBoolConfigVariable(false, &disableEgress, false, "disableEgress")
// Time period for diagnosis ticker
config.RegisterDurationConfigVariable(60, &diagnosisTickerTime, false, time.Second, []string{"Diagnostics.routerTimePeriod", "Diagnostics.routerTimePeriodInS"}...)
config.RegisterDurationConfigVariable(10, &minRetryBackoff, true, time.Second, []string{"Router.minRetryBackoff", "Router.minRetryBackoffInS"}...)
config.RegisterDurationConfigVariable(300, &maxRetryBackoff, true, time.Second, []string{"Router.maxRetryBackoff", "Router.maxRetryBackoffInS"}...)
config.RegisterDurationConfigVariable(0, &fixedLoopSleep, true, time.Millisecond, []string{"Router.fixedLoopSleep", "Router.fixedLoopSleepInMS"}...)
config.RegisterIntConfigVariable(10, &failedEventsCacheSize, false, 1, "Router.failedEventsCacheSize")
config.RegisterStringConfigVariable("", &toAbortDestinationIDs, true, "Router.toAbortDestinationIDs")
// sources failed keys config
config.RegisterDurationConfigVariable(48, &failedKeysExpire, true, time.Hour, "Router.failedKeysExpire")
Expand Down Expand Up @@ -1224,9 +1225,9 @@ func durationBeforeNextAttempt(attempt int) (d time.Duration) {

func (rt *HandleT) trackRequestMetrics(reqMetric requestMetric) {
if diagnostics.EnableRouterMetric {
rt.requestsMetricLock.Lock()
rt.requestsMetric = append(rt.requestsMetric, reqMetric)
rt.requestsMetricLock.Unlock()
rt.telemetry.requestsMetricLock.Lock()
rt.telemetry.requestsMetric = append(rt.telemetry.requestsMetric, reqMetric)
rt.telemetry.requestsMetricLock.Unlock()
}
}

Expand All @@ -1247,7 +1248,6 @@ func (rt *HandleT) initWorkers() {
retryForJobMap: make(map[int64]time.Time),
workerID: i,
failedJobs: 0,
sleepTime: minSleep,
routerJobs: make([]types.RouterJobT, 0),
destinationJobs: make([]types.DestinationJobT, 0),
rt: rt,
Expand Down Expand Up @@ -1370,13 +1370,6 @@ func (rt *HandleT) shouldThrottle(destID, userID string, throttledAtTime time.Ti
return limitReached
}

// ResetSleep this makes the workers reset their sleep
func (rt *HandleT) ResetSleep() {
for _, w := range rt.workers {
w.sleepTime = minSleep
}
}

func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {
reportMetrics := make([]*utilTypes.PUReportedMetric, 0)
connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails)
Expand Down Expand Up @@ -1455,12 +1448,12 @@ func (rt *HandleT) commitStatusList(responseList *[]jobResponseT) {
event = diagnostics.RouterAborted
}

rt.failureMetricLock.Lock()
if _, ok := rt.failuresMetric[event][string(resp.status.ErrorResponse)]; !ok {
rt.failuresMetric[event] = make(map[string]int)
rt.telemetry.failureMetricLock.Lock()
if _, ok := rt.telemetry.failuresMetric[event][string(resp.status.ErrorResponse)]; !ok {
rt.telemetry.failuresMetric[event] = make(map[string]int)
}
rt.failuresMetric[event][string(resp.status.ErrorResponse)] += 1
rt.failureMetricLock.Unlock()
rt.telemetry.failuresMetric[event][string(resp.status.ErrorResponse)] += 1
rt.telemetry.failureMetricLock.Unlock()
}
}
}
Expand Down Expand Up @@ -1616,40 +1609,40 @@ func (rt *HandleT) collectMetrics(ctx context.Context) {
case <-ctx.Done():
rt.logger.Debugf("[%v Router] :: collectMetrics exiting", rt.destName)
return
case <-rt.diagnosisTicker.C:
case <-rt.telemetry.diagnosisTicker.C:
}
rt.requestsMetricLock.RLock()
rt.telemetry.requestsMetricLock.RLock()
var diagnosisProperties map[string]interface{}
retries := 0
aborted := 0
success := 0
var compTime time.Duration
for _, reqMetric := range rt.requestsMetric {
for _, reqMetric := range rt.telemetry.requestsMetric {
retries += reqMetric.RequestRetries
aborted += reqMetric.RequestAborted
success += reqMetric.RequestSuccess
compTime += reqMetric.RequestCompletedTime
}
if len(rt.requestsMetric) > 0 {
if len(rt.telemetry.requestsMetric) > 0 {
diagnosisProperties = map[string]interface{}{
rt.destName: map[string]interface{}{
diagnostics.RouterAborted: aborted,
diagnostics.RouterRetries: retries,
diagnostics.RouterSuccess: success,
diagnostics.RouterCompletedTime: (compTime / time.Duration(len(rt.requestsMetric))) / time.Millisecond,
diagnostics.RouterCompletedTime: (compTime / time.Duration(len(rt.telemetry.requestsMetric))) / time.Millisecond,
},
}

Diagnostics.Track(diagnostics.RouterEvents, diagnosisProperties)
}

rt.requestsMetric = nil
rt.requestsMetricLock.RUnlock()
rt.telemetry.requestsMetric = nil
rt.telemetry.requestsMetricLock.RUnlock()

// This lock will ensure we don't send out Track Request while filling up the
// failureMetric struct
rt.failureMetricLock.Lock()
for key, value := range rt.failuresMetric {
rt.telemetry.failureMetricLock.Lock()
for key, value := range rt.telemetry.failuresMetric {
var err error
stringValueBytes, err := jsonfast.Marshal(value)
if err != nil {
Expand All @@ -1662,8 +1655,8 @@ func (rt *HandleT) collectMetrics(ctx context.Context) {
diagnostics.ErrorCountMap: string(stringValueBytes),
})
}
rt.failuresMetric = make(map[string]map[string]int)
rt.failureMetricLock.Unlock()
rt.telemetry.failuresMetric = make(map[string]map[string]int)
rt.telemetry.failureMetricLock.Unlock()
}
}

Expand Down Expand Up @@ -1888,7 +1881,6 @@ func (rt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB jobsd
return
}

rt.diagnosisTicker = time.NewTicker(diagnosisTickerTime)
rt.jobsDB = jobsDB
rt.errorDB = errorDB
rt.destName = destName
Expand All @@ -1900,7 +1892,6 @@ func (rt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB jobsd
config.RegisterIntConfigVariable(3, &rt.jobdDBMaxRetries, true, 1, []string{"JobsDB." + "Router." + "MaxRetries", "JobsDB." + "MaxRetries"}...)
rt.crashRecover()
rt.responseQ = make(chan jobResponseT, jobQueryBatchSize)

if rt.netHandle == nil {
netHandle := &NetHandleT{}
netHandle.logger = rt.logger.Child("network")
Expand All @@ -1911,7 +1902,8 @@ func (rt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB jobsd
rt.customDestinationManager = customDestinationManager.New(destName, customDestinationManager.Opts{
Timeout: rt.netClientTimeout,
})
rt.failuresMetric = make(map[string]map[string]int)
rt.telemetry.failuresMetric = make(map[string]map[string]int)
rt.telemetry.diagnosisTicker = time.NewTicker(diagnosisTickerTime)

rt.destinationResponseHandler = New(destinationConfig.responseRules)
if value, ok := destinationConfig.config["saveDestinationResponse"].(bool); ok {
Expand Down

0 comments on commit f030706

Please sign in to comment.