Go Kinesis client library
go-kcl is a hackday project at Zemanta so api is not in any way stable yet and it does contain bugs.
Currently it covers only the streaming part of Kinesis
We do believe in tests, but we'd like to stabilize apis first. Anyway they do
This library is a wrapper around kinesis part of AWS SDK. It facilitates reading from and putting into the streams.
It depends on AWS SDK and Aerospike client library so first of all you'll need
go get github.com/aws/aws-sdk-go
go get github.com/aerospike/aerospike-client-go
Aerospike is currently used to store locks and state but we plan to add support for etcd in the future.
Client:
client := kcl.New(awsConfig, locker, checkpointer, snitcher)
Stream creation example:
err := client.CreateStream(streamName, shardCount)
if err != nil {
// handle err
}
// so something with the stream
Stream update example (change the number of shards):
err := client.UpdateStream(streamName, shardsCount)
if err != nil {
// handle err
}
// so something with new shards
Delete stream example:
err := client.DeleteStream(streamName)
if err != nil {
// handle err
}
List streams:
streamNames, err := client.ListStreams()
if err != nil {
// handle err
}
// do something with streams
It supports reading from a single shard and locking it so two clients don't consume the same shard. Example:
client := kcl.New(awsConfig, locker, checkpointer)
reader, err := client.NewLockedReader(streamName, shardId, clientName)
if err != nil {
return err
}
wg := &sync.WaitGroup{}
go func(wg *sync.WaitGroup) {
for record := range reader.Records() {
// handle record
}
wg.Done()
}(wg)
// wait for until ready to close
err = reader.CloseUpdateCheckpointAndRelease(wg)
It also supports also the shared reader that tries to read from as many shards as available. Example:
client := kcl.New(awsConfig, locker, checkpointer, snitch)
reader, err := client.NewSharedReader(streamName, clientName)
if err != nil {
return err
}
go func() {
for record := range reader.Records() {
// handle record
}
}()
// wait for until ready to close
err = reader.CloseUpdateCheckpointAndRelease()
if err != nil {
// handle err
}
err = reader.UpdateCheckpoint()
Example of putting a record into a stream:
client := kcl.New(awsConfig, locker, checkpointer, snitcher)
err := client.PutRecord(streamName, partitionKey, record)
client := kcl.New(awsConfig, locker, checkpointer, snitcher)
err := client.PutRecords(streamName, records)