-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
executable file
·82 lines (63 loc) · 1.79 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package main
import (
"fmt"
"os"
"time"
"flag"
kinesis "github.com/sendgridlabs/go-kinesis"
)
var streamName *string
var region *string
func init() {
streamName = flag.String("stream", "", "stream name")
region = flag.String("region", "", "AWS region eg. us-west-2")
flag.Parse()
}
func getShardRecords(client kinesis.KinesisClient, streamName string, shardId string, messages chan []kinesis.GetRecordsRecords) {
args := kinesis.NewArgs()
args.Add("StreamName", streamName)
args.Add("ShardId", shardId)
args.Add("ShardIteratorType", "LATEST") // AT_SEQUENCE_NUMBER | AFTER_SEQUENCE_NUMBER | TRIM_HORIZON | LATEST
resp, _ := client.GetShardIterator(args)
shardIterator := resp.ShardIterator
for {
args = kinesis.NewArgs()
args.Add("ShardIterator", shardIterator)
records, err := client.GetRecords(args)
if err != nil {
time.Sleep(3000 * time.Millisecond)
continue
}
messages <- records.Records
shardIterator = records.NextShardIterator
time.Sleep(2000 * time.Millisecond)
}
}
func main() {
auth, err := kinesis.NewAuthFromEnv()
if err != nil {
fmt.Printf("Unable to retrieve authentication credentials from the environment: %v", err)
os.Exit(1)
}
awsRegion := *region
stream := *streamName
fmt.Println("authenticating")
ksis := kinesis.New(auth, awsRegion)
fmt.Println("finding shards")
args := kinesis.NewArgs()
args.Add("StreamName", stream)
description, err := ksis.DescribeStream(args)
if err != nil {
fmt.Printf("Unable to retrieve description from kinesis API: %v", err)
os.Exit(1)
}
messages := make(chan []kinesis.GetRecordsRecords)
for _, shard := range description.StreamDescription.Shards {
go getShardRecords(ksis, stream, shard.ShardId, messages)
}
for {
for _, d := range <-messages {
fmt.Println(string((d.GetData())))
}
}
}