Skip to content

Commit

Permalink
change table suffix to -table
Browse files Browse the repository at this point in the history
  • Loading branch information
Diogo Behrens committed Dec 4, 2017
1 parent a2986e8 commit 3247cf0
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 8 deletions.
9 changes: 7 additions & 2 deletions graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import (
"strings"
)

const (
tableSuffix = "-table"
loopSuffix = "-loop"
)

type Stream string
type Streams []Stream
type Table string
Expand Down Expand Up @@ -288,10 +293,10 @@ func GroupTable(group Group) Table {
}

func tableName(group Group) string {
return string(group) + "-state"
return string(group) + tableSuffix
}

// loopName returns the name of the loop topic of group.
func loopName(group Group) string {
return string(group) + "-loop"
return string(group) + loopSuffix
}
12 changes: 6 additions & 6 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestProcessor_process(t *testing.T) {
promise = new(kafka.Promise)
gomock.InOrder(
st.EXPECT().Set("key", []byte("message")),
producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise),
producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise),
st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil),
st.EXPECT().SetOffset(int64(322)),
consumer.EXPECT().Commit("sometopic", int32(1), int64(123)),
Expand All @@ -225,9 +225,9 @@ func TestProcessor_process(t *testing.T) {
promise2 := new(kafka.Promise)
gomock.InOrder(
st.EXPECT().Set("key", []byte("message")),
producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise),
producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise),
st.EXPECT().Set("key", []byte("message2")),
producer.EXPECT().Emit("group-state", "key", []byte("message2")).Return(promise2),
producer.EXPECT().Emit(tableName(group), "key", []byte("message2")).Return(promise2),
st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil),
st.EXPECT().SetOffset(int64(323)),
consumer.EXPECT().Commit("sometopic", int32(1), int64(123)),
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestProcessor_processFail(t *testing.T) {
promise := new(kafka.Promise)
gomock.InOrder(
st.EXPECT().Set("key", []byte("message")),
producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise),
producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise),
st.EXPECT().GetOffset(int64(0)).Return(int64(321), errors.New("getOffset failed")),
consumer.EXPECT().Close().Do(func() { close(p.done) }),
producer.EXPECT().Close(),
Expand All @@ -305,7 +305,7 @@ func TestProcessor_processFail(t *testing.T) {
p = newProcessor()
gomock.InOrder(
st.EXPECT().Set("key", []byte("message")),
producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise),
producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise),
st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil),
st.EXPECT().SetOffset(int64(322)).Return(errors.New("setOffset failed")),
consumer.EXPECT().Close().Do(func() { close(p.done) }),
Expand All @@ -329,7 +329,7 @@ func TestProcessor_processFail(t *testing.T) {
p = newProcessor()
gomock.InOrder(
st.EXPECT().Set("key", []byte("message")),
producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise),
producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise),
st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil),
st.EXPECT().SetOffset(int64(322)),
consumer.EXPECT().Commit("sometopic", int32(1), int64(123)).Return(errors.New("commit error")),
Expand Down

0 comments on commit 3247cf0

Please sign in to comment.