-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathkcl.go
389 lines (336 loc) · 13.3 KB
/
kcl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
package input
import (
"bytes"
"fmt"
"math"
"os"
"regexp"
"sync/atomic"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
"github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl/clientlibrary/worker"
"github.com/vmware/vmware-go-kcl/logger"
"github.com/AdRoll/baker"
)
// KCLDesc describes the KCL input.
var KCLDesc = baker.InputDesc{
Name: "KCL",
New: NewKCL,
Config: &KCLConfig{},
Help: "This input fetches records from Kinesis with KCL. It consumes a specified stream, and\n" +
"processes all shards in that stream. It never exits.\n" +
"Multiple baker instances can consume the same stream, in that case the KCL will take care of\n" +
"balancing the shards between workers. Careful (shard stealing is not implemented yet).\n" +
"Resharding on the producer side is automatically handled by the KCL that will distribute\n" +
"the shards among KCL workers.",
}
// KCLConfig is the configuration for the KCL input.
type KCLConfig struct {
AwsRegion string `help:"AWS region to connect to" default:"us-west-2"`
Stream string `help:"Name of Kinesis stream" required:"true"`
AppName string `help:"Used by KCL to allow multiple app to consume the same stream." required:"true"`
MaxShards int `help:"Max shards this Worker can handle at a time" default:"32767"`
ShardSync time.Duration `help:"Time between tasks to sync leases and Kinesis shards" default:"60s"`
LeaseDuration time.Duration `help:"Time after which a worker should have renewed all shard leases before not being considered owner anymore" default:"60s"`
InitialPosition string `help:"Position in the stream where a new application should start from. Values: LATEST or TRIM_HORIZON" default:"LATEST"`
initialPosition config.InitialPositionInStream
}
var appNameRx = regexp.MustCompile(`^[a-zA-Z_0-9]+$`)
func (cfg *KCLConfig) validate() error {
if !appNameRx.MatchString(cfg.AppName) {
return fmt.Errorf("invalid 'AppName' '%s', accepts only [A-Za-z0-9_]+", cfg.AppName)
}
if cfg.InitialPosition != "LATEST" && cfg.InitialPosition != "TRIM_HORIZON" {
return fmt.Errorf("invalid 'InitialPosition' '%s', accepts only 'LATEST' or 'TRIM_HORIZON'", cfg.InitialPosition)
}
return nil
}
func (cfg *KCLConfig) fillDefaults() {
if cfg.AwsRegion == "" {
cfg.AwsRegion = "us-west-2"
}
switch cfg.InitialPosition {
case "", "LATEST":
cfg.initialPosition = config.LATEST
case "TRIM_HORIZON":
cfg.initialPosition = config.TRIM_HORIZON
}
if cfg.MaxShards == 0 {
cfg.MaxShards = math.MaxInt16
}
if cfg.ShardSync == 0 {
cfg.ShardSync = time.Minute
}
if cfg.LeaseDuration == 0 {
cfg.LeaseDuration = time.Minute
}
}
// KCL is a Baker input reading from Kinesis with the KCL (Kinesis Client Library).
type KCL struct {
// atomically accessed (leave on top of the struct)
nlines, nshards int64 // counters
done chan struct{} // signal shutdown request from baker to the KCL worker
inch chan<- *baker.Data
streamShards int // Number of shards (ACTIVE+CLOSED) in the KCL stream
cfg *KCLConfig
workerCfg *config.KinesisClientLibConfiguration
metrics kclDatadogMetrics
}
// generateWorkerID generates an unique ID for currrent worker, based off hostname and an UUID.
func generateWorkerID() (string, error) {
host, err := os.Hostname()
if err != nil {
return "", fmt.Errorf("can't generate workerID: %v", err)
}
uid, err := uuid.NewUUID()
if err != nil {
return "", fmt.Errorf("can't generate workerID: %v", err)
}
return fmt.Sprintf("%s-%s", host, uid), nil
}
// NewKCL creates a new KCL.
func NewKCL(cfg baker.InputParams) (baker.Input, error) {
// Read and validate KCL configuration
dcfg := cfg.DecodedConfig.(*KCLConfig)
dcfg.fillDefaults()
if err := dcfg.validate(); err != nil {
return nil, fmt.Errorf("can't create KCL input: %v", err)
}
// Generate constants variables
workerID, err := generateWorkerID()
if err != nil {
return nil, fmt.Errorf("can't create KCL input: %v", err)
}
const (
// Period before the end of lease during which a lease is refreshed by the owner
leaseRefreshPeriod = 20 * time.Second
// Max records to read per Kinesis getRecords() call
maxRecords = 10000
)
kcl := &KCL{
cfg: dcfg,
done: make(chan struct{}),
metrics: kclDatadogMetrics{metricsClient: cfg.Metrics},
}
kcl.workerCfg = config.NewKinesisClientLibConfig(dcfg.AppName, dcfg.Stream, dcfg.AwsRegion, workerID).
WithMaxRecords(maxRecords).
WithMaxLeasesForWorker(dcfg.MaxShards).
WithShardSyncIntervalMillis(int(dcfg.ShardSync / time.Millisecond)).
WithFailoverTimeMillis(int(dcfg.LeaseDuration / time.Millisecond)).
WithLeaseRefreshPeriodMillis(int(leaseRefreshPeriod / time.Millisecond)).
WithInitialPositionInStream(dcfg.initialPosition).
WithMonitoringService(&kcl.metrics).
WithLogger(logger.NewLogrusLogger(log.StandardLogger()))
streamShards, err := kcl.totalShards()
if err != nil {
return kcl, err
}
kcl.streamShards = streamShards
log.Infof("Total shards for the stream: %d", streamShards)
kcl.metrics.nshards = &kcl.nshards
return kcl, nil
}
func (k *KCL) totalShards() (int, error) {
s, err := session.NewSession(&aws.Config{
Region: aws.String(k.cfg.AwsRegion),
Endpoint: aws.String(k.workerCfg.KinesisEndpoint),
Credentials: k.workerCfg.KinesisCredentials,
})
if err != nil {
return 0, err
}
kc := kinesis.New(s)
var totalShards int
args := &kinesis.ListShardsInput{StreamName: aws.String(k.cfg.Stream)}
for {
resp, err := kc.ListShards(args)
if err != nil {
log.Errorf("Error in ListShards: %s Error: %+v Request: %s", k.cfg.Stream, err, args)
return 0, err
}
totalShards += len(resp.Shards)
if resp.NextToken == nil {
break
}
// The use of NextToken requires StreamName to be absent
args = &kinesis.ListShardsInput{NextToken: resp.NextToken}
}
return totalShards, nil
}
// Stop implements baker.Input
func (k *KCL) Stop() {
close(k.done)
}
// Run implements baker.Input.
func (k *KCL) Run(inch chan<- *baker.Data) error {
k.inch = inch
wk := worker.NewWorker(k, k.workerCfg)
if err := wk.Start(); err != nil {
return fmt.Errorf("input: kcl: can't start the worker with: %v", err)
}
defer wk.Shutdown()
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
// Run a loop that periodically checks the number of shards available in the stream,
// quitting baker as soon as the number changes
for {
select {
case <-k.done:
return nil
case <-ticker.C:
log.Debug("Refreshing shards number")
n, err := k.totalShards()
if err != nil {
log.Errorf("Error refreshing the shards number: %v", err)
continue
}
log.Debugf("Total shards: %d, new shards count: %d", k.streamShards, n)
if k.streamShards != n {
log.Info("Shard number has changed, shutting down")
return nil
}
}
}
}
// Stats implements baker.Input
func (k *KCL) Stats() baker.InputStats {
bag := make(baker.MetricsBag)
bag.AddGauge("kcl.shards", float64(atomic.LoadInt64(&k.nshards)))
return baker.InputStats{
NumProcessedLines: atomic.LoadInt64(&k.nlines),
Metrics: bag,
}
}
// FreeMem implements baker.Input
func (k *KCL) FreeMem(data *baker.Data) {
// Because of the way the AWS SDK works, we can't reuse
// the buffer for a further call, as each call to GetRecords()
// will return freshly allocated memory anyway.
// So nothing to do here
}
// CreateProcessor implements interfaces.IRecordProcessorFactory.
func (k *KCL) CreateProcessor() interfaces.IRecordProcessor {
return &recordProcessor{
inch: k.inch,
metrics: &k.metrics,
nlines: &k.nlines,
}
}
// recordProcessor implements KCL IRecordProcessor.
type recordProcessor struct {
nlines *int64 // per-worker number of processed lines (exposed via baker stats)
inch chan<- *baker.Data
metrics *kclDatadogMetrics
shardID string // ID of the shard this processor consumes
tags []string // tags for metrics to associate with this record processor
}
// Shutdown is invoked by the Amazon Kinesis Client Library to indicate it will
// no longer send data records to this RecordProcessor instance.
func (p *recordProcessor) Shutdown(input *interfaces.ShutdownInput) {
log.WithFields(log.Fields{
"shard": p.shardID,
"reason": aws.StringValue(interfaces.ShutdownReasonMessage(input.ShutdownReason)),
}).Info("Shutting down a KCL record processor")
// The shard is closed and completely read, so we checkpoint the nil value that informs
// vmware-go-kcl about that fact
if input.ShutdownReason == interfaces.TERMINATE {
if err := input.Checkpointer.Checkpoint(nil); err != nil {
log.Errorf("Error checkpointing nil: %v", err)
}
}
}
// Initialize is invoked by the Amazon Kinesis Client Library before data
// records are delivered to the RecordProcessor instance (via processRecords).
func (p *recordProcessor) Initialize(input *interfaces.InitializationInput) {
p.shardID = input.ShardId
p.tags = []string{fmt.Sprintf("shard:%s", p.shardID)}
log.WithFields(log.Fields{
"shard": input.ShardId,
"checkpoint": aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)}).
Info("Initializing a new RecordProcessor")
}
// ProcessRecords process data records. vmware kcl will invoke this method to
// deliver data records.
//
// Upon fail over, the new instance will get records with sequence number greater
// than checkpoint position for each partition key.
//
// 'input' provides the records to be processed as well as information and
// capabilities related to them (eg checkpointing).
func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {
// Control read throughput to prevent throttling.
//
// Kinesis imposes limits on GetRecords, see
// https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
//
// Each shard can support up to a maximum total data read rate of 2 MiB
// per second via GetRecords. If a call to GetRecords returns 10 MiB, the
// maximum size GetRecords is allowed to return, subsequent calls made
// within the next 5 seconds will meet a ProvisionedThroughputExceededException.
//
// Limiting the number of records per call would work but would increase
// the number of performed IO syscalls and will increase the risk to meet
// the limits imposed by AWS on API calls.
//
// The strategy we're using is to not limit MaxRecords but sleeping for 6s.
// Doing so, we're guaranteed to never exceed the per-shard read througput
// limit of 2MB/s, while being close to it on data peaks. This has the
// added advantage of reducing the number of IO syscalls.
time.Sleep(6 * time.Second)
// Skip if no records
if len(input.Records) == 0 {
log.Debug("No records to process")
return
}
// Send the records to Baker pipeline
var nlines int64
for _, v := range input.Records {
nlines += int64(bytes.Count(v.Data, []byte{'\n'}))
p.inch <- &baker.Data{Bytes: v.Data}
}
// Increment the total number of lines processed by the KCL worker.
// note: p.nlines is shared among all record processors
atomic.AddInt64(p.nlines, nlines)
// Checkpoint it after processing this batch
lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
log.Debugf("Processed %d records: checkpoint=%s, msBehindLatest=%v", len(input.Records), aws.StringValue(lastRecordSequenceNumber), input.MillisBehindLatest)
if err := input.Checkpointer.Checkpoint(lastRecordSequenceNumber); err != nil {
log.Errorf("Error checkpointing at %s: %s", *lastRecordSequenceNumber, err)
}
}
// kclMetrics implements kcl metrics.MonitoringService.
type kclDatadogMetrics struct {
nshards *int64 // keep track of the current number of shards (exposed via baker stats)
metricsClient baker.MetricsClient
}
func (m *kclDatadogMetrics) Init(appname, streamname, workerID string) error { return nil }
func (m *kclDatadogMetrics) Start() error { return nil }
func (m *kclDatadogMetrics) Shutdown() {}
func (m *kclDatadogMetrics) LeaseGained(shard string) { atomic.AddInt64(m.nshards, 1) }
func (m *kclDatadogMetrics) LeaseLost(shard string) { atomic.AddInt64(m.nshards, -1) }
func (m *kclDatadogMetrics) LeaseRenewed(shard string) {
m.metricsClient.DeltaCountWithTags("leases_renewals", 1, []string{"shard:" + shard})
}
func (m *kclDatadogMetrics) IncrRecordsProcessed(shard string, count int) {
m.metricsClient.DeltaCountWithTags("processed_records", int64(count), []string{"shard:" + shard})
}
func (m *kclDatadogMetrics) IncrBytesProcessed(shard string, count int64) {
m.metricsClient.DeltaCountWithTags("processed_bytes", count, []string{"shard:" + shard})
}
func (m *kclDatadogMetrics) MillisBehindLatest(shard string, ms float64) {
m.metricsClient.GaugeWithTags("ms_behind_latest_milliseconds", ms, []string{"shard:" + shard})
}
func (m *kclDatadogMetrics) RecordGetRecordsTime(shard string, ms float64) {
t := time.Duration(ms) * time.Millisecond
m.metricsClient.DurationWithTags("get_records_time_milliseconds", t, []string{"shard:" + shard})
}
func (m *kclDatadogMetrics) RecordProcessRecordsTime(shard string, ms float64) {
t := time.Duration(ms) * time.Millisecond
m.metricsClient.DurationWithTags("process_records_time_milliseconds", t, []string{"shard:" + shard})
}