Skip to content

Commit

Permalink
treat etag errors across all state stores
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron2 committed Jan 9, 2021
1 parent d7eeb24 commit e8cb38e
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 18 deletions.
9 changes: 6 additions & 3 deletions state/aerospike/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
var gen uint32
gen, err = convertETag(req.ETag)
if err != nil {
return errInvalidETag
return err
}
// pass etag and fail writes is etag in DB is not same as passed by dapr (EXPECT_GEN_EQUAL)
writePolicy.Generation = gen
Expand All @@ -134,6 +134,9 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
}
err = aspike.client.Put(writePolicy, asKey, as.BinMap(data))
if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}
return fmt.Errorf("aerospike: failed to save value for key %s - %v", req.Key, err)
}

Expand Down Expand Up @@ -184,7 +187,7 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
var gen uint32
gen, err = convertETag(req.ETag)
if err != nil {
return errInvalidETag
return err
}
// pass etag and fail writes is etag in DB is not same as passed by dapr (EXPECT_GEN_EQUAL)
writePolicy.Generation = gen
Expand Down Expand Up @@ -231,7 +234,7 @@ func parseHosts(hostsMeta string) ([]*as.Host, error) {
func convertETag(eTag string) (uint32, error) {
i, err := strconv.ParseUint(eTag, 10, 32)
if err != nil {
return 0, err
return 0, state.NewETagError(state.ETagInvalid, err)
}

return uint32(i), nil
Expand Down
8 changes: 8 additions & 0 deletions state/azure/blobstorage/blobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)

if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

Expand All @@ -216,6 +220,10 @@ func (r *StateStore) deleteFile(req *state.DeleteRequest) error {
if err != nil {
r.logger.Debugf("delete file %s, err %s", req.Key, err)

if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

Expand Down
8 changes: 8 additions & 0 deletions state/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (c *StateStore) Set(req *state.SetRequest) error {
}

if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

Expand Down Expand Up @@ -270,6 +274,10 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
c.logger.Debugf("Error from cosmos.DeleteDocument e=%e, e.Error=%s", err, err.Error())
}

if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

Expand Down
14 changes: 12 additions & 2 deletions state/azure/tablestorage/tablestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ func (r *StateStore) Init(metadata state.Metadata) error {
func (r *StateStore) Delete(req *state.DeleteRequest) error {
r.logger.Debugf("delete %s", req.Key)

return r.deleteRow(req)
err := r.deleteRow(req)
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
Expand All @@ -122,7 +127,12 @@ func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (r *StateStore) Set(req *state.SetRequest) error {
r.logger.Debugf("saving %s", req.Key)

return r.writeRow(req)
err := r.writeRow(req)
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

func NewAzureTablesStateStore(logger logger.Logger) *StateStore {
Expand Down
14 changes: 11 additions & 3 deletions state/couchbase/couchbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (cbs *Couchbase) Set(req *state.SetRequest) error {
// compare-and-swap (CAS) for managing concurrent modifications - https://docs.couchbase.com/go-sdk/current/concurrent-mutations-cluster.html
cas, cerr := eTagToCas(req.ETag)
if cerr != nil {
return fmt.Errorf("couchbase error: failed to set value for key %s - %v", req.Key, err)
return err
}
if req.Options.Consistency == state.Strong {
_, err = cbs.bucket.ReplaceDura(req.Key, value, cas, 0, cbs.numReplicasDurableReplication, cbs.numReplicasDurablePersistence)
Expand All @@ -160,6 +160,10 @@ func (cbs *Couchbase) Set(req *state.SetRequest) error {
}

if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return fmt.Errorf("couchbase error: failed to set value for key %s - %v", req.Key, err)
}

Expand Down Expand Up @@ -196,7 +200,7 @@ func (cbs *Couchbase) Delete(req *state.DeleteRequest) error {
if req.ETag != "" {
cas, err = eTagToCas(req.ETag)
if err != nil {
return fmt.Errorf("couchbase error: failed to delete key %s - %v", req.Key, err)
return err
}
}
if req.Options.Consistency == state.Strong {
Expand All @@ -205,6 +209,10 @@ func (cbs *Couchbase) Delete(req *state.DeleteRequest) error {
_, err = cbs.bucket.Remove(req.Key, cas)
}
if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return fmt.Errorf("couchbase error: failed to delete key %s - %v", req.Key, err)
}

Expand All @@ -217,7 +225,7 @@ func eTagToCas(eTag string) (gocb.Cas, error) {
// CAS is a 64-bit integer - https://docs.couchbase.com/go-sdk/current/concurrent-mutations-cluster.html#cas-value-format
temp, err := strconv.ParseUint(eTag, 10, 64)
if err != nil {
return cas, err
return cas, state.NewETagError(state.ETagInvalid, err)
}
cas = gocb.Cas(temp)

Expand Down
52 changes: 52 additions & 0 deletions state/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

package state

import (
"errors"
"fmt"
)

type ETagErrorKind string

const (
mismatchPrefix = "possible etag mismatch. error from state store"
invalidPrefix = "invalid etag value"

ETagInvalid ETagErrorKind = "invalid"
ETagMismatch ETagErrorKind = "mismatch"
)

// ETagError is a custom error type for etag exceptions
type ETagError struct {
err error
kind ETagErrorKind
}

func (e *ETagError) Error() string {
var prefix string

switch e.kind {
case ETagInvalid:
prefix = invalidPrefix
case ETagMismatch:
prefix = mismatchPrefix
}

if e.err != nil {
return fmt.Sprintf("%s: %s", prefix, e.err)
}

return errors.New(prefix).Error()
}

// NewETagError returns an ETagError wrapping an existing context error
func NewETagError(kind ETagErrorKind, err error) *ETagError {
return &ETagError{
err: err,
kind: kind,
}
}
41 changes: 41 additions & 0 deletions state/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------

package state

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestETagError(t *testing.T) {
t.Run("invalid with context error", func(t *testing.T) {
cerr := errors.New("error1")
err := NewETagError(ETagInvalid, cerr)

assert.Equal(t, invalidPrefix+": error1", err.Error())
})

t.Run("invalid without context error", func(t *testing.T) {
err := NewETagError(ETagInvalid, nil)

assert.Equal(t, invalidPrefix, err.Error())
})

t.Run("mismatch with context error", func(t *testing.T) {
cerr := errors.New("error1")
err := NewETagError(ETagMismatch, cerr)

assert.Equal(t, mismatchPrefix+": error1", err.Error())
})

t.Run("mismatch without context error", func(t *testing.T) {
err := NewETagError(ETagMismatch, nil)

assert.Equal(t, mismatchPrefix, err.Error())
})
}
6 changes: 3 additions & 3 deletions state/postgresql/postgresdbaccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *postgresDBAccess) setValue(req *state.SetRequest) error {
var etag int
etag, err = strconv.Atoi(req.ETag)
if err != nil {
return err
return state.NewETagError(state.ETagInvalid, err)
}

// When an etag is provided do an update - no insert
Expand Down Expand Up @@ -176,7 +176,7 @@ func (p *postgresDBAccess) deleteValue(req *state.DeleteRequest) error {
// Convert req.ETag to integer for postgres compatibility
etag, conversionError := strconv.Atoi(req.ETag)
if conversionError != nil {
return conversionError
return state.NewETagError(state.ETagInvalid, err)
}

result, err = p.db.Exec("DELETE FROM state WHERE key = $1 and xmin = $2", req.Key, etag)
Expand Down Expand Up @@ -238,7 +238,7 @@ func (p *postgresDBAccess) returnSingleDBResult(result sql.Result, err error) er
}

if rowsAffected == 0 {
noRowsErr := errors.New("database operation failed: no rows match given key and etag")
noRowsErr := state.NewETagError(state.ETagMismatch, err)
p.logger.Error(noRowsErr)

return noRowsErr
Expand Down
7 changes: 5 additions & 2 deletions state/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
}
_, err := r.client.DoContext(context.Background(), "EVAL", delQuery, 1, req.Key, req.ETag).Result()
if err != nil {
return fmt.Errorf("failed to delete key '%s' due to ETag mismatch", req.Key)
return state.NewETagError(state.ETagMismatch, err)
}

return nil
Expand Down Expand Up @@ -292,6 +292,9 @@ func (r *StateStore) setValue(req *state.SetRequest) error {

_, err = r.client.DoContext(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result()
if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}
return fmt.Errorf("failed to set key %s: %s", req.Key, err)
}

Expand Down Expand Up @@ -363,7 +366,7 @@ func (r *StateStore) parseETag(req *state.SetRequest) (int, error) {
}
ver, err := strconv.Atoi(req.ETag)
if err != nil {
return -1, err
return -1, state.NewETagError(state.ETagInvalid, err)
}

return ver, nil
Expand Down
14 changes: 11 additions & 3 deletions state/sqlserver/sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
var b []byte
b, err = hex.DecodeString(req.ETag)
if err != nil {
return err
return state.NewETagError(state.ETagInvalid, err)
}

res, err = s.db.Exec(s.deleteWithETagCommand, sql.Named(keyColumnName, req.Key), sql.Named(rowVersionColumnName, b))
Expand All @@ -336,6 +336,10 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
}

if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

Expand Down Expand Up @@ -384,7 +388,7 @@ func (s *SQLServer) executeBulkDelete(db dbExecutor, req []state.DeleteRequest)
if d.ETag != "" {
etag, err = hex.DecodeString(d.ETag)
if err != nil {
return err
return state.NewETagError(state.ETagInvalid, err)
}
}
values[i] = TvpDeleteTableStringKey{ID: d.Key, RowVersion: etag}
Expand Down Expand Up @@ -473,12 +477,16 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
var b []byte
b, err = hex.DecodeString(req.ETag)
if err != nil {
return err
return state.NewETagError(state.ETagInvalid, err)
}
etag.Value = b
}
res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag)
if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

Expand Down
20 changes: 18 additions & 2 deletions state/zookeeper/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,15 @@ func (s *StateStore) Delete(req *state.DeleteRequest) error {
return nil
}

return err
if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

return nil
}, req)
}

Expand Down Expand Up @@ -222,7 +230,15 @@ func (s *StateStore) Set(req *state.SetRequest) error {
_, err = s.conn.Create(r.Path, r.Data, 0, nil)
}

return err
if err != nil {
if req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}

return err
}

return nil
}, req)
}

Expand Down

0 comments on commit e8cb38e

Please sign in to comment.