Skip to content

Commit

Permalink
feat: add message timestamp to view update context
Browse files Browse the repository at this point in the history
  • Loading branch information
smirzaei committed May 6, 2024
1 parent 85b1720 commit 3df46fb
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
9 changes: 9 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type UpdateContext interface {
// It is recommended to lazily evaluate the headers to reduce overhead per message
// when headers are not used.
Headers() Headers

// Timestamp returns the timestamp of the input message.
Timestamp() time.Time
}

// UpdateCallback is invoked upon arrival of a message for a table partition.
Expand Down Expand Up @@ -95,6 +98,7 @@ type DefaultUpdateContext struct {
partition int32
offset int64
headers []*sarama.RecordHeader
timestamp time.Time
}

// Topic returns the topic of input message.
Expand All @@ -117,6 +121,11 @@ func (ctx DefaultUpdateContext) Headers() Headers {
return HeadersFromSarama(ctx.headers)
}

// Timestamp returns the timestamp of the input message.
func (ctx DefaultUpdateContext) Timestamp() time.Time {
return ctx.timestamp
}

///////////////////////////////////////////////////////////////////////////////
// processor options
///////////////////////////////////////////////////////////////////////////////
Expand Down
5 changes: 3 additions & 2 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.Partition
}

lastMessage = time.Now()
if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers); err != nil {
if err := p.storeEvent(string(msg.Key), msg.Value, msg.Offset, msg.Headers, msg.Timestamp); err != nil {
return fmt.Errorf("load: error updating storage: %v", err)
}

Expand Down Expand Up @@ -534,12 +534,13 @@ func (p *PartitionTable) TrackMessageWrite(ctx context.Context, length int) {
})
}

func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader) error {
func (p *PartitionTable) storeEvent(key string, value []byte, offset int64, headers []*sarama.RecordHeader, timestamp time.Time) error {
err := p.st.Update(&DefaultUpdateContext{
topic: p.st.topic,
partition: p.st.partition,
offset: offset,
headers: headers,
timestamp: timestamp,
}, key, value)
if err != nil {
return fmt.Errorf("Error from the update callback while recovering from the log: %v", err)
Expand Down
25 changes: 15 additions & 10 deletions partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,16 +708,19 @@ func TestPT_loadMessages(t *testing.T) {
func TestPT_storeEvent(t *testing.T) {
t.Run("succeed", func(t *testing.T) {
var (
localOffset int64
partition int32
topic = "some-topic"
key = "some-key"
value = []byte("some-vale")
actualKey string
actualValue []byte
updateCB UpdateCallback = func(ctx UpdateContext, s storage.Storage, k string, v []byte) error {
localOffset int64
partition int32
topic = "some-topic"
key = "some-key"
value = []byte("some-vale")
timestamp = time.Now()
actualKey string
actualValue []byte
actualTimestamp time.Time
updateCB UpdateCallback = func(ctx UpdateContext, s storage.Storage, k string, v []byte) error {
actualKey = k
actualValue = v
actualTimestamp = ctx.Timestamp()
return nil
}
)
Expand All @@ -735,9 +738,10 @@ func TestPT_storeEvent(t *testing.T) {
defer cancel()
err := pt.setup(ctx)
require.NoError(t, err)
err = pt.storeEvent(key, value, localOffset, nil)
err = pt.storeEvent(key, value, localOffset, nil, timestamp)
require.Equal(t, actualKey, key)
require.Equal(t, actualValue, value)
require.Equal(t, actualTimestamp, timestamp)
require.NoError(t, err)
})
t.Run("fail", func(t *testing.T) {
Expand All @@ -747,6 +751,7 @@ func TestPT_storeEvent(t *testing.T) {
topic = "some-topic"
key = "some-key"
value = []byte("some-vale")
timestamp = time.Now()
updateCB UpdateCallback = updateCallbackNoop
retErr error = fmt.Errorf("storage err")
)
Expand All @@ -764,7 +769,7 @@ func TestPT_storeEvent(t *testing.T) {
defer cancel()
err := pt.setup(ctx)
require.NoError(t, err)
err = pt.storeEvent(key, value, localOffset, nil)
err = pt.storeEvent(key, value, localOffset, nil, timestamp)
require.Error(t, err)
})
}
Expand Down

0 comments on commit 3df46fb

Please sign in to comment.