-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathcontext.go
155 lines (136 loc) · 4.17 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package gohalt
import (
"context"
"time"
)
type ghctxid string
const (
ghctxpriority ghctxid = "gohalt_context_priority"
ghctxkey ghctxid = "gohalt_context_key"
ghctxmessage ghctxid = "gohalt_context_message"
ghctxtimestamp ghctxid = "gohalt_context_timestamp"
ghctxmarshaler ghctxid = "gohalt_context_marshaler"
)
// WithTimestamp adds the provided timestamp to the provided context
// to determine latency between `Acquire` and `Release`.
// Resulted context is used by: `latency` and `percentile` throtttlers.
func WithTimestamp(ctx context.Context, ts time.Time) context.Context {
return context.WithValue(ctx, ghctxtimestamp, ts)
}
func ctxTimestamp(ctx context.Context) time.Time {
if val := ctx.Value(ghctxtimestamp); val != nil {
if timestamp, ok := val.(time.Time); ok {
return timestamp.UTC()
}
}
return time.Now().UTC()
}
// WithPriority adds the provided priority to the provided context
// to differ `Acquire` priority levels.
// Resulted context is used by: `priority` throtttler.
func WithPriority(ctx context.Context, priority uint8) context.Context {
return context.WithValue(ctx, ghctxpriority, priority)
}
func ctxPriority(ctx context.Context, limit uint8) uint8 {
if val := ctx.Value(ghctxpriority); val != nil {
if priority, ok := val.(uint8); ok && priority > 0 && priority <= limit {
return priority
}
}
return 1
}
// WithKey adds the provided key to the provided context
// to add additional call identifier to context.
// Resulted context is used by: `pattern` throtttler.
func WithKey(ctx context.Context, key string) context.Context {
return context.WithValue(ctx, ghctxkey, key)
}
func ctxKey(ctx context.Context) string {
if val, ok := ctx.Value(ghctxkey).(string); ok {
return val
}
return ""
}
// WithMessage adds the provided message to the provided context
// to add additional message that need to be used to context.
// Resulted context is used by: `enqueue` throtttler.
// Used in pair with `WithMarshaler`.
func WithMessage(ctx context.Context, message interface{}) context.Context {
return context.WithValue(ctx, ghctxmessage, message)
}
func ctxMessage(ctx context.Context) interface{} {
return ctx.Value(ghctxmessage)
}
// WithMarshaler adds the provided marshaler to the provided context
// to add additional message marshaler that need to be used to context.
// Resulted context is used by: `enqueue` throtttler.
// Used in pair with `WithMessage`.
func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context {
return context.WithValue(ctx, ghctxmarshaler, mrsh)
}
func ctxMarshaler(ctx context.Context) Marshaler {
if val := ctx.Value(ghctxmarshaler); val != nil {
if marshaler, ok := val.(Marshaler); ok {
return marshaler
}
}
return DefaultMarshaler
}
// WithParams facade call that respectively calls:
// - `WithTimestamp`
// - `WithPriority`
// - `WithKey`
// - `WithMessage`
// - `WithMarshaler`
func WithParams(
ctx context.Context,
ts time.Time,
priority uint8,
key string,
message interface{},
marshaler Marshaler,
) context.Context {
ctx = WithTimestamp(ctx, ts)
ctx = WithPriority(ctx, priority)
ctx = WithKey(ctx, key)
ctx = WithMessage(ctx, message)
ctx = WithMarshaler(ctx, marshaler)
return ctx
}
type ctxthr struct {
context.Context
thr Throttler
freq time.Duration
}
// WithThrottler adds the provided thr to the provided context
// and defines context implementation that uses parrent context plus throttler internally
// that closes context done chanel if internal throttler throttles.
func WithThrottler(ctx context.Context, thr Throttler, freq time.Duration) context.Context {
return ctxthr{Context: ctx, thr: thr, freq: freq}
}
func (ctx ctxthr) Done() <-chan struct{} {
ch := make(chan struct{})
// proactively test throttler once
if err := ctx.Err(); err != nil {
close(ch)
return ch
}
// run long throttler error pooling
gorun(ctx, loop(ctx.freq, func(ctx context.Context) error {
err := ctx.Err()
if err != nil {
close(ch)
log("context is canceled due: %v", err)
}
return err
}))
return ch
}
func (ctx ctxthr) Err() (err error) {
r := NewRunnerSync(ctx.Context, ctx.thr)
r.Run(nope)
return r.Result()
}
func (ctx ctxthr) Throttler() Throttler {
return ctx.thr
}