Skip to content
This repository has been archived by the owner on Dec 27, 2020. It is now read-only.

Commit

Permalink
Consumer cached proxy implementation (#41)
Browse files Browse the repository at this point in the history
* Consumer cached proxy implementation

* Code review fixes

* Code review fixes:
* Unit-testing FIRST principal
* Fixed bug with loadKeys for maxCount > bufferSize
  • Loading branch information
alldroll authored and coderworld10 committed Dec 13, 2019
1 parent d9016af commit 24d5945
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 10 deletions.
2 changes: 2 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
GRpcAPIPort int
SendGridAPIKey string
TemplatePattern string
CacheSize int
}

// Start launches kgs service
Expand Down Expand Up @@ -40,6 +41,7 @@ func Start(
securityPolicy,
provider.SendGridAPIKey(config.SendGridAPIKey),
provider.TemplatePattern(config.TemplatePattern),
provider.CacheSize(config.CacheSize),
)
if err != nil {
panic(err)
Expand Down
81 changes: 81 additions & 0 deletions app/usecase/keys/consumer_cached.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package keys

import (
"errors"
)

var _ Consumer = (*ConsumerCached)(nil)

type bufferEntry struct {
key string
err error
}

type ConsumerCached struct {
delegate Consumer
bufferSize int
buffer chan bufferEntry
}

func (p ConsumerCached) ConsumeInBatch(maxCount uint) ([]string, error) {
keys := make([]string, 0, maxCount)

for ; maxCount > 0; maxCount-- {
// there is probability to get the list of keys with the size less than bufferSize
// in this case we listen to done channel to perform loop break
done := make(chan bool)

// we should load new keys as soon as our buffer is empty
if len(p.buffer) == 0 {
go func() {
p.loadKeys()
done <- true
}()
}

select {
case entry := <-p.buffer:
if entry.err != nil {
return keys, entry.err
}

keys = append(keys, entry.key)
case <-done:
break
}

}

return keys, nil
}

func (p ConsumerCached) loadKeys() {
keys, err := p.delegate.ConsumeInBatch(uint(p.bufferSize))

if err != nil {
p.buffer <- bufferEntry{
key: "",
err: err,
}
}

for _, key := range keys {
p.buffer <- bufferEntry{
key: key,
err: nil,
}
}
}

// NewCachedConsumer returns the cached proxy implementation of Consumer interface
func NewCachedConsumer(bufferSize int, delegate Consumer) (ConsumerCached, error) {
if bufferSize < 1 {
return ConsumerCached{}, errors.New("buffer size can't be less than 1")
}

return ConsumerCached{
bufferSize: bufferSize,
buffer: make(chan bufferEntry, bufferSize),
delegate: delegate,
}, nil
}
143 changes: 143 additions & 0 deletions app/usecase/keys/consumer_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,144 @@
package keys

import (
"errors"
"testing"

"github.com/byliuyang/kgs/app/entity"
"github.com/byliuyang/kgs/app/usecase/repo/repotest"

"github.com/byliuyang/app/mdtest"
)

func TestCachedConsumer(t *testing.T) {
keys := []entity.Key{
"aaaa", "aaab", "aaac", "aaad", "aaae", "aaaf", "aaag", "aaah",
"baaa", "baab", "baac", "baad", "baae", "baaf", "baag", "baah",
"caaa", "caab", "caac", "caad", "caae", "caaf", "caag", "caah",
}

testCases := []struct {
name string
expected fakeConsumerResult
skipBefore uint
count uint
}{
{
name: "should load keys and return three items from the buffer #1",
expected: fakeConsumerResult{
keys: []string{"aaaa", "aaab", "aaac"},
err: nil,
},
skipBefore: 0,
count: 3,
},
{
name: "should load keys twice and return the entire buffer #2",
expected: fakeConsumerResult{
keys: []string{"baaa", "baab", "baac", "baad", "baae", "baaf", "baag", "baah"},
err: nil,
},
skipBefore: 8,
count: 8,
},
{
name: "should return two items from the buffer #1",
expected: fakeConsumerResult{
keys: []string{"aaad", "aaae"},
err: nil,
},
skipBefore: 3,
count: 2,
},
{
name: "should return one item from the buffer #1",
expected: fakeConsumerResult{
keys: []string{"aaaf"},
err: nil,
},
skipBefore: 5,
count: 1,
},
{
name: "should return the first two items from the buffer #1, load new keys and return one key from the buffer #2",
expected: fakeConsumerResult{
keys: []string{"aaag", "aaah", "baaa"},
err: nil,
},
skipBefore: 6,
count: 3,
},
{
name: "should return the first seven items from the buffer #2, load new keys and return three items from the buffer #3",
expected: fakeConsumerResult{
keys: []string{"baab", "baac", "baad", "baae", "baaf", "baag", "baah", "caaa", "caab", "caac"},
err: nil,
},
skipBefore: 9,
count: 10,
},
{
name: "should return four items from the buffer #3",
expected: fakeConsumerResult{
keys: []string{"caad", "caae", "caaf", "caag"},
err: nil,
},
skipBefore: 19,
count: 4,
},
{
name: "should return the last item from the buffer #3 and return an error",
expected: fakeConsumerResult{
keys: []string{"caah"},
err: fakeConsumerError,
},
skipBefore: 23,
count: 10,
},
{
name: "should return empty list because there are no more keys",
expected: fakeConsumerResult{
keys: []string{},
err: nil,
},
skipBefore: 24,
count: 10,
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
availableKeysRepo := repotest.NewAvailableKeyFake()
allocatedKeysRepo := repotest.NewAllocatedKeyFake()

for _, key := range keys {
err := availableKeysRepo.Create(key)
mdtest.Equal(t, nil, err)
}

mockConsumer := NewConsumerPersist(
&availableKeysRepo,
&allocatedKeysRepo,
)

consumer, err := NewCachedConsumer(8, mockConsumer)
mdtest.Equal(t, err, nil)

_, err = consumer.ConsumeInBatch(testCase.skipBefore)
mdtest.Equal(t, err, nil)

allocatedKeysRepo.FakeError(testCase.expected.err)
actual, err := consumer.ConsumeInBatch(testCase.count)

mdtest.Equal(t, testCase.expected.keys, actual)
mdtest.Equal(t, testCase.expected.err, err)
})
}
}

var fakeConsumerError = errors.New("some trouble with delegate")

type fakeConsumerResult struct {
keys []string
err error
}
45 changes: 45 additions & 0 deletions app/usecase/repo/repotest/allocatedkey.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package repotest

import (
"fmt"

"github.com/byliuyang/kgs/app/entity"
"github.com/byliuyang/kgs/app/usecase/repo"
)

var _ repo.AllocatedKey = (*AllocatedKeyFake)(nil)

type AllocatedKeyFake struct {
keys map[entity.Key]struct{}
err error
}

func (a AllocatedKeyFake) CreateInBatch(keys []entity.Key) error {
for _, key := range keys {
if err := a.create(key); err != nil {
return err
}
}

return a.err
}

func (a *AllocatedKeyFake) FakeError(err error) {
a.err = err
}

func (a AllocatedKeyFake) create(key entity.Key) error {
if _, ok := a.keys[key]; ok {
return fmt.Errorf("key exists: %s", string(key))
}

a.keys[key] = struct{}{}

return nil
}

func NewAllocatedKeyFake() AllocatedKeyFake {
return AllocatedKeyFake{
keys: make(map[entity.Key]struct{}),
}
}
26 changes: 19 additions & 7 deletions app/usecase/repo/repotest/availablekey.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package repotest

import (
"errors"
"fmt"
"sort"

"github.com/byliuyang/kgs/app/entity"
"github.com/byliuyang/kgs/app/usecase/repo"
Expand All @@ -11,35 +11,47 @@ import (
var _ repo.AvailableKey = (*AvailableKeyFake)(nil)

type AvailableKeyFake struct {
keys map[entity.Key]bool
keys map[entity.Key]struct{}
}

func (a AvailableKeyFake) Create(key entity.Key) error {
if _, ok := a.keys[key]; ok {
return errors.New(fmt.Sprintf("key exists: %s", string(key)))
return fmt.Errorf("key exists: %s", string(key))
}
a.keys[key] = true
a.keys[key] = struct{}{}
return nil
}

func (a AvailableKeyFake) RetrieveInBatch(maxCount uint) ([]entity.Key, error) {
panic("implement me")
keys := a.GetKeys()
if len(keys) <= int(maxCount) {
return keys, nil
}
return keys[:maxCount], nil
}

func (a AvailableKeyFake) DeleteInBatch(keys []entity.Key) error {
panic("implement me")
for _, key := range keys {
delete(a.keys, key)
}
return nil
}

func (k AvailableKeyFake) GetKeys() []entity.Key {
var keys []entity.Key
for key := range k.keys {
keys = append(keys, key)
}

sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})

return keys
}

func NewAvailableKeyFake() AvailableKeyFake {
return AvailableKeyFake{
keys: make(map[entity.Key]bool),
keys: make(map[entity.Key]struct{}),
}
}
13 changes: 13 additions & 0 deletions dep/provider/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package provider

import (
"github.com/byliuyang/kgs/app/usecase/keys"
)

// CacheSize specifies the size of the local cache for fetched keys
type CacheSize int

// NewConsumer creates a buffered cached keys Consumer
func NewConsumer(bufferSize CacheSize, delegate keys.ConsumerPersist) (keys.ConsumerCached, error) {
return keys.NewCachedConsumer(int(bufferSize), delegate)
}
4 changes: 3 additions & 1 deletion dep/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func InitGRpcService(
securityPolicy fw.SecurityPolicy,
sendGridAPIKey provider.SendGridAPIKey,
templatePattern provider.TemplatePattern,
cacheSize provider.CacheSize,
) (mdservice.Service, error) {
wire.Build(
wire.Bind(new(fw.Server), new(mdgrpc.GRpc)),
Expand All @@ -79,7 +80,7 @@ func InitGRpcService(
wire.Bind(new(proto.KeyGenServer), new(rpc.KeyGenServer)),
wire.Bind(new(notification.Notifier), new(notification.EmailNotifier)),
wire.Bind(new(keys.Producer), new(keys.ProducerPersist)),
wire.Bind(new(keys.Consumer), new(keys.ConsumerPersist)),
wire.Bind(new(keys.Consumer), new(keys.ConsumerCached)),
wire.Bind(new(gen.Generator), new(gen.Alphabet)),
wire.Bind(new(repo.AvailableKey), new(db.AvailableKeySQL)),
wire.Bind(new(repo.AllocatedKey), new(db.AllocatedKeySQL)),
Expand All @@ -95,6 +96,7 @@ func InitGRpcService(
provider.NewEmailNotifier,
provider.NewTemplate,
keys.NewProducerPersist,
provider.NewConsumer,
keys.NewConsumerPersist,
db.NewAvailableKeySQL,
db.NewAllocatedKeySQL,
Expand Down
Loading

0 comments on commit 24d5945

Please sign in to comment.