Skip to content

Commit

Permalink
Merge pull request cockroachdb#2695 from cockroachdb/pmattis/decoding…
Browse files Browse the repository at this point in the history
…-errors

Remove usage of encoding.MustDecode* routines.
  • Loading branch information
petermattis committed Sep 28, 2015
2 parents ee3ad9b + 1cd00d0 commit ef44d75
Show file tree
Hide file tree
Showing 15 changed files with 178 additions and 130 deletions.
9 changes: 2 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,8 @@ func ObjectIDForKey(key proto.Key) (uint32, bool) {
}

// Consume first encoded int.
defer func() {
// Nothing to do, default return values mean "could not decode", which is
// definitely the case if DecodeUvarint panics.
_ = recover()
}()
_, id64 := encoding.MustDecodeUvarint(remaining)
return uint32(id64), true
_, id64, err := encoding.DecodeUvarint(remaining)
return uint32(id64), err == nil
}

// GetValue searches the kv list for 'key' and returns its
Expand Down
34 changes: 16 additions & 18 deletions keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/log"
)
Expand Down Expand Up @@ -88,17 +89,6 @@ func RaftHardStateKey(rangeID proto.RangeID) proto.Key {
return MakeRangeIDKey(rangeID, LocalRaftHardStateSuffix, proto.Key{})
}

// DecodeRaftStateKey extracts the Range ID from a RaftStateKey.
func DecodeRaftStateKey(key proto.Key) proto.RangeID {
if !bytes.HasPrefix(key, LocalRangeIDPrefix) {
panic(fmt.Sprintf("key %q does not have %q prefix", key, LocalRangeIDPrefix))
}
// Cut the prefix and the Range ID.
b := key[len(LocalRangeIDPrefix):]
_, rangeID := encoding.MustDecodeUvarint(b)
return proto.RangeID(rangeID)
}

// RaftTruncatedStateKey returns a system-local key for a RaftTruncatedState.
func RaftTruncatedStateKey(rangeID proto.RangeID) proto.Key {
return MakeRangeIDKey(rangeID, LocalRaftTruncatedStateSuffix, proto.Key{})
Expand Down Expand Up @@ -151,17 +141,20 @@ func MakeRangeKey(key, suffix, detail proto.Key) proto.Key {

// DecodeRangeKey decodes the range key into range start key,
// suffix and optional detail (may be nil).
func DecodeRangeKey(key proto.Key) (startKey, suffix, detail proto.Key) {
func DecodeRangeKey(key proto.Key) (startKey, suffix, detail proto.Key, err error) {
if !bytes.HasPrefix(key, LocalRangePrefix) {
panic(fmt.Sprintf("key %q does not have %q prefix",
key, LocalRangePrefix))
return nil, nil, nil, util.Errorf("key %q does not have %q prefix",
key, LocalRangePrefix)
}
// Cut the prefix and the Range ID.
b := key[len(LocalRangePrefix):]
b, startKey = encoding.MustDecodeBytes(b, nil)
b, startKey, err = encoding.DecodeBytes(b, nil)
if err != nil {
return nil, nil, nil, err
}
if len(b) < LocalSuffixLength {
panic(fmt.Sprintf("key %q does not have suffix of length %d",
key, LocalSuffixLength))
return nil, nil, nil, util.Errorf("key %q does not have suffix of length %d",
key, LocalSuffixLength)
}
// Cut the response cache suffix.
suffix = b[:LocalSuffixLength]
Expand Down Expand Up @@ -214,6 +207,8 @@ func TransactionKey(key proto.Key, id []byte) proto.Key {
// key) are addressable (e.g. range metadata and txn records). Range
// local keys incorporating the Range ID are not (e.g. response cache
// entries, and range stats).
//
// TODO(pmattis): Should KeyAddress return an error when the key is malformed?
func KeyAddress(k proto.Key) proto.Key {
if k == nil {
return nil
Expand All @@ -224,7 +219,10 @@ func KeyAddress(k proto.Key) proto.Key {
}
if bytes.HasPrefix(k, LocalRangePrefix) {
k = k[len(LocalRangePrefix):]
_, k = encoding.MustDecodeBytes(k, nil)
_, k, err := encoding.DecodeBytes(k, nil)
if err != nil {
panic(err)
}
return k
}
log.Fatalf("local key %q malformed; should contain prefix %q",
Expand Down
14 changes: 12 additions & 2 deletions storage/engine/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func (gc *GarbageCollector) Filter(keys []proto.EncodedKey, values [][]byte) pro
delTS := proto.ZeroTimestamp
survivors := false
for i, key := range keys {
_, ts, isValue := MVCCDecodeKey(key)
_, ts, isValue, err := MVCCDecodeKey(key)
if err != nil {
log.Errorf("unable to decode MVCC key: %q: %v", key, err)
return proto.ZeroTimestamp
}
if !isValue {
log.Errorf("unexpected MVCC metadata encountered: %q", key)
return proto.ZeroTimestamp
Expand Down Expand Up @@ -88,7 +92,13 @@ func (gc *GarbageCollector) Filter(keys []proto.EncodedKey, values [][]byte) pro
// If there are no non-deleted survivors, return timestamp of first key
// to delete all entries.
if !survivors {
_, ts, _ := MVCCDecodeKey(keys[0])
_, ts, _, err := MVCCDecodeKey(keys[0])
if err != nil {
// TODO(tschottdorf): Perhaps we should be propagating an error
// (e.g. ReplicaCorruptionError) up to the caller.
log.Errorf("unable to decode MVCC key: %q: %v", keys[0], err)
return proto.ZeroTimestamp
}
return ts
}
return delTS
Expand Down
79 changes: 61 additions & 18 deletions storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,10 @@ func mvccGetInternal(engine Engine, key proto.Key, metaKey proto.EncodedKey,
return nil, nil, err
}
if valueKey != nil {
_, ts, _ := MVCCDecodeKey(valueKey)
_, ts, _, err := MVCCDecodeKey(valueKey)
if err != nil {
return nil, nil, err
}
if timestamp.Less(ts) {
// Third case: Our read timestamp is sufficiently behind the newest
// value, but there is another previous write with the same issues
Expand Down Expand Up @@ -622,7 +625,10 @@ func mvccGetInternal(engine Engine, key proto.Key, metaKey proto.EncodedKey,
return nil, ignoredIntents, nil
}

_, ts, isValue := MVCCDecodeKey(valueKey)
_, ts, isValue, err := MVCCDecodeKey(valueKey)
if err != nil {
return nil, nil, err
}
if !isValue {
return nil, nil, util.Errorf("expected scan to versioned value reading key %q; got %q", key, valueKey)
}
Expand Down Expand Up @@ -972,7 +978,10 @@ func getScanMetaKey(iter Iterator, encEndKey proto.EncodedKey) (proto.Key, proto
if bytes.Compare(metaKey, encEndKey) >= 0 {
return nil, nil, iter.Error()
}
key, _, isValue := MVCCDecodeKey(metaKey)
key, _, isValue, err := MVCCDecodeKey(metaKey)
if err != nil {
return nil, nil, err
}
if isValue {
return nil, nil, util.Errorf("expected an MVCC metadata key: %q", metaKey)
}
Expand All @@ -988,7 +997,10 @@ func getReverseScanMetaKey(iter Iterator, encEndKey proto.EncodedKey) (proto.Key

// The row with oldest version will be got by seeking reversely. We use the
// key of this row to get the MVCC metadata key.
key, _, isValue := MVCCDecodeKey(metaKey)
key, _, isValue, err := MVCCDecodeKey(metaKey)
if err != nil {
return nil, nil, err
}
// If this isn't the meta key yet, scan again to get the meta key.
// TODO(tschottdorf): can we save any work here or leverage
// getScanMetaKey() above after doing the Seek() below?
Expand All @@ -999,7 +1011,10 @@ func getReverseScanMetaKey(iter Iterator, encEndKey proto.EncodedKey) (proto.Key
}

metaKey = iter.Key()
_, _, isValue = MVCCDecodeKey(metaKey)
_, _, isValue, err = MVCCDecodeKey(metaKey)
if err != nil {
return nil, nil, err
}
if isValue {
return nil, nil, util.Errorf("expected an MVCC metadata key: %q", metaKey)
}
Expand Down Expand Up @@ -1323,7 +1338,10 @@ func MVCCResolveWriteIntent(engine Engine, ms *MVCCStats, key proto.Key, timesta
// Clear stat counters attributable to the intent we're aborting.
updateStatsOnAbort(ms, key, origMetaKeySize, origMetaValSize, 0, 0, meta, nil, origAgeSeconds, 0)
} else {
_, ts, isValue := MVCCDecodeKey(kvs[0].Key)
_, ts, isValue, err := MVCCDecodeKey(kvs[0].Key)
if err != nil {
return err
}
if !isValue {
return util.Errorf("expected an MVCC value key: %s", kvs[0].Key)
}
Expand Down Expand Up @@ -1378,7 +1396,10 @@ func MVCCResolveWriteIntentRange(engine Engine, ms *MVCCStats, key, endKey proto
break
}

currentKey, _, isValue := MVCCDecodeKey(kvs[0].Key)
currentKey, _, isValue, err := MVCCDecodeKey(kvs[0].Key)
if err != nil {
return 0, err
}
if isValue {
return 0, util.Errorf("expected an MVCC metadata key: %s", kvs[0].Key)
}
Expand Down Expand Up @@ -1437,7 +1458,10 @@ func MVCCGarbageCollect(engine Engine, ms *MVCCStats, keys []proto.GCRequest_GCK
// Note that we start the for loop by iterating once to move past
// the metadata key.
for iter.Next(); iter.Valid(); iter.Next() {
_, ts, isValue := MVCCDecodeKey(iter.Key())
_, ts, isValue, err := MVCCDecodeKey(iter.Key())
if err != nil {
return err
}
if !isValue {
break
}
Expand Down Expand Up @@ -1524,7 +1548,10 @@ func MVCCFindSplitKey(engine Engine, rangeID proto.RangeID, key, endKey proto.Ke
done := !bestSplitKey.Equal(encStartKey) && diff > bestSplitDiff

// Add this key/value to the size scanned so far.
_, _, isValue := MVCCDecodeKey(kv.Key)
_, _, isValue, err := MVCCDecodeKey(kv.Key)
if err != nil {
return false, err
}
if isValue {
sizeSoFar += mvccVersionTimestampSize + int64(len(kv.Value))
} else {
Expand All @@ -1542,7 +1569,10 @@ func MVCCFindSplitKey(engine Engine, rangeID proto.RangeID, key, endKey proto.Ke

// The key is an MVCC key, so to avoid corrupting MVCC we get the
// associated mvcc metadata key, which is fine to split in front of.
humanKey, _, _ := MVCCDecodeKey(bestSplitKey)
humanKey, _, _, err := MVCCDecodeKey(bestSplitKey)
if err != nil {
return nil, err
}
return humanKey, nil
}

Expand All @@ -1560,7 +1590,10 @@ func MVCCComputeStats(iter Iterator, nowNanos int64) (MVCCStats, error) {
meta := &MVCCMetadata{}

for ; iter.Valid(); iter.Next() {
key, ts, isValue := MVCCDecodeKey(iter.Key())
key, ts, isValue, err := MVCCDecodeKey(iter.Key())
if err != nil {
return ms, err
}
_, sys := updateStatsForKey(&ms, key)
if !isValue {
totalBytes := int64(len(iter.Value())) + int64(len(iter.Key()))
Expand Down Expand Up @@ -1662,18 +1695,28 @@ func mvccEncodeTimestamp(key proto.EncodedKey, timestamp proto.Timestamp) proto.
// exactly 12 trailing bytes and they're decoded into a timestamp.
// The decoded key, timestamp and true are returned to indicate the
// key is for an MVCC versioned value.
func MVCCDecodeKey(encodedKey proto.EncodedKey) (proto.Key, proto.Timestamp, bool) {
tsBytes, key := encoding.MustDecodeBytes(encodedKey, nil)
func MVCCDecodeKey(encodedKey proto.EncodedKey) (proto.Key, proto.Timestamp, bool, error) {
tsBytes, key, err := encoding.DecodeBytes(encodedKey, nil)
if err != nil {
return nil, proto.Timestamp{}, false, err
}
if len(tsBytes) == 0 {
return key, proto.Timestamp{}, false
return key, proto.Timestamp{}, false, nil
} else if len(tsBytes) != 12 {
panic(fmt.Sprintf("there should be 12 bytes for encoded timestamp: %q", tsBytes))
return nil, proto.Timestamp{}, false,
util.Errorf("there should be 12 bytes for encoded timestamp: %q", tsBytes)
}
var walltime uint64
var logical uint32
tsBytes, walltime = encoding.MustDecodeUint64Decreasing(tsBytes)
tsBytes, logical = encoding.MustDecodeUint32Decreasing(tsBytes)
return key, proto.Timestamp{WallTime: int64(walltime), Logical: int32(logical)}, true
tsBytes, walltime, err = encoding.DecodeUint64Decreasing(tsBytes)
if err != nil {
return nil, proto.Timestamp{}, false, err
}
tsBytes, logical, err = encoding.DecodeUint32Decreasing(tsBytes)
if err != nil {
return nil, proto.Timestamp{}, false, err
}
return key, proto.Timestamp{WallTime: int64(walltime), Logical: int32(logical)}, true, nil
}

// willOverflow returns true iff adding both inputs would under- or overflow
Expand Down
10 changes: 8 additions & 2 deletions storage/engine/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2411,7 +2411,10 @@ func TestMVCCGarbageCollect(t *testing.T) {
t.Fatal(err)
}
for i, kv := range kvsn {
key, ts, _ := MVCCDecodeKey(kv.Key)
key, ts, _, err := MVCCDecodeKey(kv.Key)
if err != nil {
t.Fatal(err)
}
if log.V(1) {
log.Infof("%d: %q, ts=%s", i, key, ts)
}
Expand Down Expand Up @@ -2444,7 +2447,10 @@ func TestMVCCGarbageCollect(t *testing.T) {
t.Fatalf("number of kvs %d != expected %d", len(kvs), len(expEncKeys))
}
for i, kv := range kvs {
key, ts, _ := MVCCDecodeKey(kv.Key)
key, ts, _, err := MVCCDecodeKey(kv.Key)
if err != nil {
t.Fatal(err)
}
log.Infof("%d: %q, ts=%s", i, key, ts)
if !kv.Key.Equal(expEncKeys[i]) {
t.Errorf("%d: expected key %q; got %q", i, expEncKeys[i], kv.Key)
Expand Down
6 changes: 5 additions & 1 deletion storage/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ func (gcq *gcQueue) process(now proto.Timestamp, repl *Replica,

// Iterate through the keys and values of this replica's range.
for ; iter.Valid(); iter.Next() {
baseKey, ts, isValue := engine.MVCCDecodeKey(iter.Key())
baseKey, ts, isValue, err := engine.MVCCDecodeKey(iter.Key())
if err != nil {
log.Errorf("unable to decode MVCC key: %q: %v", iter.Key(), err)
continue
}
if !isValue {
// Moving to the next key (& values).
processKeysAndValues()
Expand Down
19 changes: 14 additions & 5 deletions storage/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/keys"
"github.com/cockroachdb/cockroach/proto"
"github.com/cockroachdb/cockroach/storage/engine"
"github.com/cockroachdb/cockroach/util"
"github.com/cockroachdb/cockroach/util/leaktest"
"github.com/cockroachdb/cockroach/util/log"
gogoproto "github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -246,7 +247,10 @@ func TestGCQueueProcess(t *testing.T) {
t.Fatal(err)
}
for i, kv := range kvs {
if key, ts, isValue := engine.MVCCDecodeKey(kv.Key); isValue {
if key, ts, isValue, err := engine.MVCCDecodeKey(kv.Key); isValue {
if err != nil {
t.Fatal(err)
}
if log.V(1) {
log.Infof("%d: %q, ts=%s", i, key, ts)
}
Expand All @@ -260,7 +264,10 @@ func TestGCQueueProcess(t *testing.T) {
t.Fatalf("expected length %d; got %d", len(expKVs), len(kvs))
}
for i, kv := range kvs {
key, ts, isValue := engine.MVCCDecodeKey(kv.Key)
key, ts, isValue, err := engine.MVCCDecodeKey(kv.Key)
if err != nil {
t.Fatal(err)
}
if !key.Equal(expKVs[i].key) {
t.Errorf("%d: expected key %q; got %q", i, expKVs[i].key, key)
}
Expand Down Expand Up @@ -349,12 +356,14 @@ func TestGCQueueIntentResolution(t *testing.T) {
// Iterate through all values to ensure intents have been fully resolved.
meta := &engine.MVCCMetadata{}
err := tc.store.Engine().Iterate(engine.MVCCEncodeKey(proto.KeyMin), engine.MVCCEncodeKey(proto.KeyMax), func(kv proto.RawKeyValue) (bool, error) {
if key, _, isValue := engine.MVCCDecodeKey(kv.Key); !isValue {
if key, _, isValue, err := engine.MVCCDecodeKey(kv.Key); err != nil {
return false, err
} else if !isValue {
if err := gogoproto.Unmarshal(kv.Value, meta); err != nil {
t.Fatalf("unable to unmarshal mvcc metadata for key %s", key)
return false, err
}
if meta.Txn != nil {
t.Fatalf("non-nil Txn after GC for key %s", key)
return false, util.Errorf("non-nil Txn after GC for key %s", key)
}
}
return false, nil
Expand Down
Loading

0 comments on commit ef44d75

Please sign in to comment.