From 3247cf08bcd786943338f9c8ba6e3ade173b675c Mon Sep 17 00:00:00 2001 From: Diogo Behrens Date: Fri, 24 Nov 2017 11:07:50 +0100 Subject: [PATCH] change table suffix to -table --- graph.go | 9 +++++++-- processor_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/graph.go b/graph.go index 64c0b30e..694252a6 100644 --- a/graph.go +++ b/graph.go @@ -6,6 +6,11 @@ import ( "strings" ) +const ( + tableSuffix = "-table" + loopSuffix = "-loop" +) + type Stream string type Streams []Stream type Table string @@ -288,10 +293,10 @@ func GroupTable(group Group) Table { } func tableName(group Group) string { - return string(group) + "-state" + return string(group) + tableSuffix } // loopName returns the name of the loop topic of group. func loopName(group Group) string { - return string(group) + "-loop" + return string(group) + loopSuffix } diff --git a/processor_test.go b/processor_test.go index 02896eab..acd56b57 100644 --- a/processor_test.go +++ b/processor_test.go @@ -206,7 +206,7 @@ func TestProcessor_process(t *testing.T) { promise = new(kafka.Promise) gomock.InOrder( st.EXPECT().Set("key", []byte("message")), - producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise), + producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise), st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil), st.EXPECT().SetOffset(int64(322)), consumer.EXPECT().Commit("sometopic", int32(1), int64(123)), @@ -225,9 +225,9 @@ func TestProcessor_process(t *testing.T) { promise2 := new(kafka.Promise) gomock.InOrder( st.EXPECT().Set("key", []byte("message")), - producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise), + producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise), st.EXPECT().Set("key", []byte("message2")), - producer.EXPECT().Emit("group-state", "key", []byte("message2")).Return(promise2), + producer.EXPECT().Emit(tableName(group), "key", []byte("message2")).Return(promise2), st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil), st.EXPECT().SetOffset(int64(323)), consumer.EXPECT().Commit("sometopic", int32(1), int64(123)), @@ -282,7 +282,7 @@ func TestProcessor_processFail(t *testing.T) { promise := new(kafka.Promise) gomock.InOrder( st.EXPECT().Set("key", []byte("message")), - producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise), + producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise), st.EXPECT().GetOffset(int64(0)).Return(int64(321), errors.New("getOffset failed")), consumer.EXPECT().Close().Do(func() { close(p.done) }), producer.EXPECT().Close(), @@ -305,7 +305,7 @@ func TestProcessor_processFail(t *testing.T) { p = newProcessor() gomock.InOrder( st.EXPECT().Set("key", []byte("message")), - producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise), + producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise), st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil), st.EXPECT().SetOffset(int64(322)).Return(errors.New("setOffset failed")), consumer.EXPECT().Close().Do(func() { close(p.done) }), @@ -329,7 +329,7 @@ func TestProcessor_processFail(t *testing.T) { p = newProcessor() gomock.InOrder( st.EXPECT().Set("key", []byte("message")), - producer.EXPECT().Emit("group-state", "key", []byte("message")).Return(promise), + producer.EXPECT().Emit(tableName(group), "key", []byte("message")).Return(promise), st.EXPECT().GetOffset(int64(0)).Return(int64(321), nil), st.EXPECT().SetOffset(int64(322)), consumer.EXPECT().Commit("sometopic", int32(1), int64(123)).Return(errors.New("commit error")),