Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deletion semantics #38

Merged
merged 1 commit into from
Sep 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type Context interface {
// SetValue updates the value of the key in the group table.
SetValue(value interface{})

// Delete deletes a value from the group table. IMPORTANT: this deletes the
// value associated with the key from both the local cache and the persisted
// table in Kafka.
Delete()

// Timestamp returns the timestamp of the input message. If the timestamp is
// invalid, a zero time will be returned.
Timestamp() time.Time
Expand Down Expand Up @@ -90,9 +95,14 @@ func (ctx *context) Emit(topic Stream, key string, value interface{}) {
if c == nil {
ctx.Fail(fmt.Errorf("no codec for topic %s", topic))
}
data, err := c.Encode(value)
if err != nil {
ctx.Fail(fmt.Errorf("error encoding message for topic %s: %v", topic, err))

var data []byte
if value != nil {
var err error
data, err = c.Encode(value)
if err != nil {
ctx.Fail(fmt.Errorf("error encoding message for topic %s: %v", topic, err))
}
}

ctx.emit(string(topic), key, data)
Expand Down Expand Up @@ -123,6 +133,12 @@ func (ctx *context) emit(topic string, key string, value []byte) {
})
}

func (ctx *context) Delete() {
if err := ctx.deleteKey(ctx.Key()); err != nil {
ctx.Fail(err)
}
}

// Value returns the value of the key in the group table.
func (ctx *context) Value() interface{} {
val, err := ctx.valueForKey(string(ctx.msg.Key))
Expand Down Expand Up @@ -204,6 +220,24 @@ func (ctx *context) valueForKey(key string) (interface{}, error) {
return ctx.storage.Get(key)
}

func (ctx *context) deleteKey(key string) error {
if ctx.graph.GroupTable() == nil {
return fmt.Errorf("Cannot access state in stateless processor")
}

ctx.counters.stores++
if err := ctx.storage.Delete(key); err != nil {
return fmt.Errorf("error deleting key (%s) from storage: %v", key, err)
}

ctx.counters.emits++
ctx.emitter(ctx.graph.GroupTable().Topic(), key, nil).Then(func(err error) {
ctx.emitDone(err)
})

return nil
}

// setValueForKey sets a value for a key in the processor state.
func (ctx *context) setValueForKey(key string, value interface{}) error {
if ctx.graph.GroupTable() == nil {
Expand Down
79 changes: 79 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goka
import (
"errors"
"fmt"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -171,6 +172,84 @@ func TestContext_GetSetStateless(t *testing.T) {
}()
}

func TestContext_Delete(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
storage := mock.NewMockStorage(ctrl)

offset := int64(123)
ack := 0
key := "key"

ctx := &context{
graph: DefineGroup(group, Persist(new(codec.String))),
storage: storage,
wg: new(sync.WaitGroup),
commit: func() { ack++ },
msg: &message{Offset: offset},
}

gomock.InOrder(
storage.EXPECT().Delete(key),
)
ctx.emitter = newEmitter(nil, nil)

ctx.start()
err := ctx.deleteKey(key)
ensure.Nil(t, err)
ctx.finish()

ctx.wg.Wait()

ensure.DeepEqual(t, ctx.counters, struct {
emits int
dones int
stores int
}{1, 1, 1})
}

func TestContext_DeleteStateless(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

offset := int64(123)
key := "key"

ctx := &context{
graph: DefineGroup(group),
wg: new(sync.WaitGroup),
msg: &message{Offset: offset},
}
ctx.emitter = newEmitter(nil, nil)

err := ctx.deleteKey(key)
ensure.Err(t, err, regexp.MustCompile("^Cannot access state in stateless processor$"))
}

func TestContext_DeleteStorageError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
storage := mock.NewMockStorage(ctrl)

offset := int64(123)
key := "key"

ctx := &context{
graph: DefineGroup(group, Persist(new(codec.String))),
storage: storage,
wg: new(sync.WaitGroup),
msg: &message{Offset: offset},
}

gomock.InOrder(
storage.EXPECT().Delete(key).Return(fmt.Errorf("storage error")),
)
ctx.emitter = newEmitter(nil, nil)

err := ctx.deleteKey(key)
ensure.Err(t, err, regexp.MustCompile("^error deleting key \\(key\\) from storage: storage error$"))
}

func TestContext_Set(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
11 changes: 10 additions & 1 deletion examples/testing/context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
package main

import (
time "time"

gomock "github.com/golang/mock/gomock"
goka "github.com/lovoo/goka"
time "time"
)

// Mock of Context interface
Expand All @@ -30,6 +31,14 @@ func (_m *MockContext) EXPECT() *_MockContextRecorder {
return _m.recorder
}

func (_m *MockContext) Delete() {
_m.ctrl.Call(_m, "Delete")
}

func (_mr *_MockContextRecorder) Delete() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Delete")
}

func (_m *MockContext) Emit(_param0 goka.Stream, _param1 string, _param2 interface{}) {
_m.ctrl.Call(_m, "Emit", _param0, _param1, _param2)
}
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func DefaultViewStoragePath() string {
// DefaultUpdate can be used in the function passed to WithUpdateCallback and
// WithViewCallback.
func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error {
if value == nil {
s.Delete(key)
}

return s.SetEncoded(key, value)
}

Expand Down
12 changes: 9 additions & 3 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessor(t, ctrl, consumer, 3, sb)
value = []byte("value")
)

// -- expectations --
Expand All @@ -586,7 +587,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123))
// 3. message
gomock.InOrder(
st.EXPECT().SetEncoded("key", nil).Return(nil),
st.EXPECT().SetEncoded("key", value).Return(nil),
st.EXPECT().SetOffset(int64(1)),
st.EXPECT().MarkRecovered(),
st.EXPECT().Sync(),
Expand Down Expand Up @@ -621,6 +622,7 @@ func TestProcessor_StartWithErrorAfterRebalance(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1) // with partition
ensure.Nil(t, err)
Expand Down Expand Up @@ -650,6 +652,7 @@ func TestProcessor_Start(t *testing.T) {
final = make(chan bool)
ch = make(chan kafka.Event)
p = createProcessor(t, ctrl, consumer, 3, sb)
value = []byte("value")
)

// -- expectations --
Expand All @@ -663,7 +666,7 @@ func TestProcessor_Start(t *testing.T) {
consumer.EXPECT().AddPartition(tableName(group), int32(1), int64(123))
consumer.EXPECT().AddPartition(tableName(group), int32(2), int64(123))
// 3. load message partition 1
st.EXPECT().SetEncoded("key", nil).Return(nil)
st.EXPECT().SetEncoded("key", value).Return(nil)
st.EXPECT().SetOffset(int64(1))
st.EXPECT().Sync()
st.EXPECT().MarkRecovered()
Expand Down Expand Up @@ -708,6 +711,7 @@ func TestProcessor_Start(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1) // with partition 1
ensure.Nil(t, err)
Expand Down Expand Up @@ -802,6 +806,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
ch = make(chan kafka.Event)
p = createProcessorWithTable(ctrl, consumer, 3, sb)
producer = p.producer.(*mock.MockProducer)
value = []byte("value")
)

// -- expectations --
Expand All @@ -818,7 +823,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
consumer.EXPECT().AddPartition(table, int32(1), int64(123))
consumer.EXPECT().AddPartition(table, int32(2), int64(123))
// 3. message to group table
st.EXPECT().SetEncoded("key", nil).Return(nil)
st.EXPECT().SetEncoded("key", value).Return(nil)
st.EXPECT().SetOffset(int64(1))
st.EXPECT().MarkRecovered()
st.EXPECT().Sync()
Expand Down Expand Up @@ -867,6 +872,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1)
ensure.Nil(t, err)
Expand Down
96 changes: 96 additions & 0 deletions storage/append.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package storage

import (
"fmt"
"io"
"os"
"path/filepath"
)

type file struct {
file io.WriteCloser
codec Codec
recovered bool

bytesWritten int64
}

func NewFile(path string, part int32, codec Codec) (Storage, error) {
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating storage directory: %v", err)
}

f, err := os.OpenFile(filepath.Join(path, fmt.Sprintf("part-%d", part)), os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModePerm)
if err != nil {
return nil, err
}

return &file{file: f}, nil
}

func (f *file) Recovered() bool {
return f.recovered
}

func (f *file) MarkRecovered() error {
f.recovered = true
return nil
}

func (f *file) Has(key string) (bool, error) {
return false, nil
}

func (f *file) Get(key string) (interface{}, error) {
return nil, nil
}

func (f *file) Set(key string, val interface{}) error {
data, err := f.codec.Encode(val)
if err != nil {
return err
}

return f.SetEncoded(key, data)
}

func (f *file) SetEncoded(key string, val []byte) error {
num, err := f.file.Write(val)
if err != nil {
return err
}

f.bytesWritten += int64(num)

if _, err := f.file.Write([]byte("\n")); err != nil {
return err
}

return nil
}

func (f *file) Delete(string) error {
return nil
}

func (f *file) GetOffset(def int64) (int64, error) {
return def, nil
}

func (f *file) SetOffset(val int64) error {
return nil
}

func (f *file) Iterator() (Iterator, error) {
return new(NullIter), nil
}

func (f *file) Open() error {
return nil
}

func (f *file) Close() error {
return f.file.Close()
}

func (f *file) Sync() {}