Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of disabling notifications on server side #529

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions common/error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,31 @@ import (
)

const (
CodeNotInitialized codes.Code = 100
CodeInvalidTerm codes.Code = 101
CodeInvalidStatus codes.Code = 102
CodeCancelled codes.Code = 103
CodeAlreadyClosed codes.Code = 104
CodeLeaderAlreadyConnected codes.Code = 105
CodeNodeIsNotLeader codes.Code = 106
CodeNodeIsNotFollower codes.Code = 107
CodeInvalidSession codes.Code = 108
CodeInvalidSessionTimeout codes.Code = 109
CodeNamespaceNotFound codes.Code = 110
CodeNotInitialized codes.Code = 100
CodeInvalidTerm codes.Code = 101
CodeInvalidStatus codes.Code = 102
CodeCancelled codes.Code = 103
CodeAlreadyClosed codes.Code = 104
CodeLeaderAlreadyConnected codes.Code = 105
CodeNodeIsNotLeader codes.Code = 106
CodeNodeIsNotFollower codes.Code = 107
CodeInvalidSession codes.Code = 108
CodeInvalidSessionTimeout codes.Code = 109
CodeNamespaceNotFound codes.Code = 110
CodeNotificationsNotEnabled codes.Code = 111
)

var (
ErrorNotInitialized = status.Error(CodeNotInitialized, "oxia: server not initialized yet")
ErrorCancelled = status.Error(CodeCancelled, "oxia: operation was cancelled")
ErrorInvalidTerm = status.Error(CodeInvalidTerm, "oxia: invalid term")
ErrorInvalidStatus = status.Error(CodeInvalidStatus, "oxia: invalid status")
ErrorLeaderAlreadyConnected = status.Error(CodeLeaderAlreadyConnected, "oxia: leader is already connected")
ErrorAlreadyClosed = status.Error(CodeAlreadyClosed, "oxia: node is shutting down")
ErrorNodeIsNotLeader = status.Error(CodeNodeIsNotLeader, "oxia: node is not leader for shard")
ErrorNodeIsNotFollower = status.Error(CodeNodeIsNotFollower, "oxia: node is not follower for shard")
ErrorInvalidSession = status.Error(CodeInvalidSession, "oxia: session not found")
ErrorInvalidSessionTimeout = status.Error(CodeInvalidSessionTimeout, "oxia: invalid session timeout")
ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found")
ErrorNotInitialized = status.Error(CodeNotInitialized, "oxia: server not initialized yet")
ErrorCancelled = status.Error(CodeCancelled, "oxia: operation was cancelled")
ErrorInvalidTerm = status.Error(CodeInvalidTerm, "oxia: invalid term")
ErrorInvalidStatus = status.Error(CodeInvalidStatus, "oxia: invalid status")
ErrorLeaderAlreadyConnected = status.Error(CodeLeaderAlreadyConnected, "oxia: leader is already connected")
ErrorAlreadyClosed = status.Error(CodeAlreadyClosed, "oxia: node is shutting down")
ErrorNodeIsNotLeader = status.Error(CodeNodeIsNotLeader, "oxia: node is not leader for shard")
ErrorNodeIsNotFollower = status.Error(CodeNodeIsNotFollower, "oxia: node is not follower for shard")
ErrorInvalidSession = status.Error(CodeInvalidSession, "oxia: session not found")
ErrorInvalidSessionTimeout = status.Error(CodeInvalidSessionTimeout, "oxia: invalid session timeout")
ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found")
ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace")
)
20 changes: 13 additions & 7 deletions server/follower_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ type followerController struct {
// Offset of the last entry appended and not fully synced yet on the wal
lastAppendedOffset int64

status proto.ServingStatus
wal wal.Wal
kvFactory kv.Factory
db kv.DB
status proto.ServingStatus
wal wal.Wal
kvFactory kv.Factory
db kv.DB
termOptions kv.TermOptions

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -134,14 +135,16 @@ func NewFollowerController(config Config, namespace string, shardId int64, wf wa
return nil, err
}

if fc.term, err = fc.db.ReadTerm(); err != nil {
if fc.term, fc.termOptions, err = fc.db.ReadTerm(); err != nil {
return nil, err
}

if fc.term != wal.InvalidTerm {
fc.status = proto.ServingStatus_FENCED
}

fc.db.EnableNotifications(fc.termOptions.NotificationsEnabled)

commitOffset, err := fc.db.ReadCommitOffset()
if err != nil {
return nil, err
Expand Down Expand Up @@ -281,10 +284,13 @@ func (fc *followerController) NewTerm(req *proto.NewTermRequest) (*proto.NewTerm
}
}

if err := fc.db.UpdateTerm(req.Term); err != nil {
fc.termOptions = kv.ToDbOption(req.Options)
if err := fc.db.UpdateTerm(req.Term, fc.termOptions); err != nil {
return nil, err
}

fc.db.EnableNotifications(fc.termOptions.NotificationsEnabled)

fc.term = req.Term
fc.setLogger()
fc.status = proto.ServingStatus_FENCED
Expand Down Expand Up @@ -693,7 +699,7 @@ func (fc *followerController) handleSnapshot(stream proto.OxiaLogReplication_Sen
}

// The new term must be persisted, to avoid rolling it back
if err = newDb.UpdateTerm(fc.term); err != nil {
if err = newDb.UpdateTerm(fc.term, fc.termOptions); err != nil {
fc.closeStreamNoMutex(errors.Wrap(err, "Failed to update term in db"))
}

Expand Down
4 changes: 2 additions & 2 deletions server/follower_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestFollower_RestoreCommitOffset(t *testing.T) {
}}}, 9, 0, kv.NoOpCallback)
assert.NoError(t, err)

assert.NoError(t, db.UpdateTerm(6))
assert.NoError(t, db.UpdateTerm(6, kv.TermOptions{}))
assert.NoError(t, db.Close())

fc, err := NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory)
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestFollowerController_RejectEntriesWithDifferentTerm(t *testing.T) {
db, err := kv.NewDB(common.DefaultNamespace, shardId, kvFactory, 1*time.Hour, common.SystemClock)
assert.NoError(t, err)
// Force a new term in the DB before opening
assert.NoError(t, db.UpdateTerm(5))
assert.NoError(t, db.UpdateTerm(5, kv.TermOptions{}))
assert.NoError(t, db.Close())

walFactory := wal.NewWalFactory(&wal.FactoryOptions{BaseWalDir: t.TempDir()})
Expand Down
88 changes: 73 additions & 15 deletions server/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
Expand All @@ -38,12 +39,14 @@ var (
ErrMissingPartitionKey = errors.New("oxia: sequential key operation requires partition key")
ErrMissingSequenceDeltas = errors.New("oxia: sequential key operation missing some sequence deltas")
ErrSequenceDeltaIsZero = errors.New("oxia: sequential key operation requires first delta do be > 0")
ErrNotificationsDisabled = errors.New("oxia: notifications disabled")
)

const (
commitOffsetKey = common.InternalKeyPrefix + "commit-offset"
commitLastVersionIdKey = common.InternalKeyPrefix + "last-version-id"
termKey = common.InternalKeyPrefix + "term"
termOptionsKey = termKey + "-options"
)

type UpdateOperationCallback interface {
Expand All @@ -59,9 +62,15 @@ type RangeScanIterator interface {
Next() bool
}

type TermOptions struct {
NotificationsEnabled bool
}

type DB interface {
io.Closer

EnableNotifications(enable bool)

ProcessWrite(b *proto.WriteRequest, commitOffset int64, timestamp uint64, updateOperationCallback UpdateOperationCallback) (*proto.WriteResponse, error)
Get(request *proto.GetRequest) (*proto.GetResponse, error)
List(request *proto.ListRequest) (KeyIterator, error)
Expand All @@ -70,8 +79,8 @@ type DB interface {

ReadNextNotifications(ctx context.Context, startOffset int64) ([]*proto.NotificationBatch, error)

UpdateTerm(newTerm int64) error
ReadTerm() (term int64, err error)
UpdateTerm(newTerm int64, options TermOptions) error
ReadTerm() (term int64, options TermOptions, err error)

Snapshot() (Snapshot, error)

Expand All @@ -87,8 +96,9 @@ func NewDB(namespace string, shardId int64, factory Factory, notificationRetenti

labels := metrics.LabelsForShard(namespace, shardId)
db := &db{
kv: kv,
shardId: shardId,
kv: kv,
shardId: shardId,
notificationsEnabled: true,
log: slog.With(
slog.String("component", "db"),
slog.String("namespace", namespace),
Expand Down Expand Up @@ -136,6 +146,7 @@ type db struct {
versionIdTracker atomic.Int64
notificationsTracker *notificationsTracker
log *slog.Logger
notificationsEnabled bool

putCounter metrics.Counter
deleteCounter metrics.Counter
Expand All @@ -153,6 +164,10 @@ func (d *db) Snapshot() (Snapshot, error) {
return d.kv.Snapshot()
}

func (d *db) EnableNotifications(enabled bool) {
d.notificationsEnabled = enabled
}

func (d *db) Close() error {
return multierr.Combine(
d.notificationsTracker.Close(),
Expand All @@ -173,7 +188,10 @@ func now() uint64 {

func (d *db) applyWriteRequest(b *proto.WriteRequest, batch WriteBatch, commitOffset int64, timestamp uint64, updateOperationCallback UpdateOperationCallback) (*notifications, *proto.WriteResponse, error) {
res := &proto.WriteResponse{}
notifications := newNotifications(d.shardId, commitOffset, timestamp)
var notifications *notifications
if d.notificationsEnabled {
notifications = newNotifications(d.shardId, commitOffset, timestamp)
}

d.putCounter.Add(len(b.Puts))
for _, putReq := range b.Puts {
Expand Down Expand Up @@ -225,16 +243,20 @@ func (d *db) ProcessWrite(b *proto.WriteRequest, commitOffset int64, timestamp u
return nil, err
}

// Add the notifications to the batch as well
if err := d.addNotifications(batch, notifications); err != nil {
return nil, err
if notifications != nil {
// Add the notifications to the batch as well
if err := d.addNotifications(batch, notifications); err != nil {
return nil, err
}
}

if err := batch.Commit(); err != nil {
return nil, err
}

d.notificationsTracker.UpdatedCommitOffset(commitOffset)
if notifications != nil {
d.notificationsTracker.UpdatedCommitOffset(commitOffset)
}

if err := batch.Close(); err != nil {
return nil, err
Expand Down Expand Up @@ -376,7 +398,7 @@ func (d *db) readASCIILong(key string) (int64, error) {
return res, nil
}

func (d *db) UpdateTerm(newTerm int64) error {
func (d *db) UpdateTerm(newTerm int64, options TermOptions) error {
batch := d.kv.NewWriteBatch()

if _, err := d.applyPut(batch, nil, &proto.PutRequest{
Expand All @@ -386,6 +408,17 @@ func (d *db) UpdateTerm(newTerm int64) error {
return err
}

serOptions, err := json.Marshal(options)
if err != nil {
return err
}
if _, err := d.applyPut(batch, nil, &proto.PutRequest{
Key: termOptionsKey,
Value: serOptions,
}, now(), NoOpCallback, true); err != nil {
return err
}

if err := batch.Commit(); err != nil {
return err
}
Expand All @@ -399,23 +432,36 @@ func (d *db) UpdateTerm(newTerm int64) error {
return d.kv.Flush()
}

func (d *db) ReadTerm() (term int64, err error) {
func (d *db) ReadTerm() (term int64, options TermOptions, err error) {
getReq := &proto.GetRequest{
Key: termKey,
IncludeValue: true,
}
gr, err := applyGet(d.kv, getReq)
if err != nil {
return wal.InvalidTerm, err
return wal.InvalidTerm, TermOptions{}, err
}
if gr.Status == proto.Status_KEY_NOT_FOUND {
return wal.InvalidTerm, nil
return wal.InvalidTerm, TermOptions{}, nil
}

if _, err = fmt.Sscanf(string(gr.Value), "%d", &term); err != nil {
return wal.InvalidTerm, err
return wal.InvalidTerm, TermOptions{}, err
}

if gr, err = applyGet(d.kv, &proto.GetRequest{Key: termOptionsKey, IncludeValue: true}); err != nil {
return wal.InvalidTerm, TermOptions{}, err
}

if gr.Status == proto.Status_KEY_NOT_FOUND {
options = TermOptions{}
} else {
if err := json.Unmarshal(gr.Value, &options); err != nil {
return wal.InvalidTerm, TermOptions{}, err
}
}
return term, nil

return term, options, nil
}

func (d *db) applyPut(batch WriteBatch, notifications *notifications, putReq *proto.PutRequest, timestamp uint64, updateOperationCallback UpdateOperationCallback, internal bool) (*proto.PutResponse, error) { //nolint:revive
Expand Down Expand Up @@ -690,6 +736,9 @@ func deserialize(value []byte, se *proto.StorageEntry) error {
}

func (d *db) ReadNextNotifications(ctx context.Context, startOffset int64) ([]*proto.NotificationBatch, error) {
if !d.notificationsEnabled {
return nil, ErrNotificationsDisabled
}
return d.notificationsTracker.ReadNextNotifications(ctx, startOffset)
}

Expand All @@ -704,3 +753,12 @@ func (*noopCallback) OnDelete(WriteBatch, string) error {
}

var NoOpCallback UpdateOperationCallback = &noopCallback{}

func ToDbOption(opt *proto.NewTermOptions) TermOptions {
to := TermOptions{NotificationsEnabled: true}
if opt != nil {
to.NotificationsEnabled = opt.EnableNotifications
}

return to
}
23 changes: 23 additions & 0 deletions server/kv/db_notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,26 @@ func TestDB_NotificationsCancelWait(t *testing.T) {
assert.NoError(t, db.Close())
assert.NoError(t, factory.Close())
}

func TestDB_NotificationsDisabled(t *testing.T) {
factory, err := NewPebbleKVFactory(testKVOptions)
assert.NoError(t, err)
db, err := NewDB(common.DefaultNamespace, 1, factory, 1*time.Hour, common.SystemClock)
assert.NoError(t, err)

db.EnableNotifications(false)
t0 := now()
_, _ = db.ProcessWrite(&proto.WriteRequest{
Puts: []*proto.PutRequest{{
Key: "a",
Value: []byte("0"),
}},
}, 0, t0, NoOpCallback)

notifications, err := db.ReadNextNotifications(context.Background(), 0)
assert.Error(t, ErrNotificationsDisabled, err)
assert.Nil(t, notifications)

assert.NoError(t, db.Close())
assert.NoError(t, factory.Close())
}
10 changes: 6 additions & 4 deletions server/kv/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,24 +450,26 @@ func TestDb_UpdateTerm(t *testing.T) {
db, err := NewDB(common.DefaultNamespace, 1, factory, 0, common.SystemClock)
assert.NoError(t, err)

term, err := db.ReadTerm()
term, options, err := db.ReadTerm()
assert.NoError(t, err)
assert.Equal(t, wal.InvalidOffset, term)
assert.Equal(t, TermOptions{}, options)

err = db.UpdateTerm(1)
err = db.UpdateTerm(1, TermOptions{NotificationsEnabled: true})
assert.NoError(t, err)

term, err = db.ReadTerm()
term, options, err = db.ReadTerm()
assert.NoError(t, err)
assert.EqualValues(t, 1, term)
assert.Equal(t, TermOptions{NotificationsEnabled: true}, options)

assert.NoError(t, db.Close())

// Reopen and verify the term is maintained
db, err = NewDB(common.DefaultNamespace, 1, factory, 0, common.SystemClock)
assert.NoError(t, err)

term, err = db.ReadTerm()
term, _, err = db.ReadTerm()
assert.NoError(t, err)
assert.Equal(t, wal.InvalidOffset, term)

Expand Down
Loading
Loading