Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework concurrency semantics of valueLog.maxFid (#1184) #1187

Merged
merged 1 commit into from
Jan 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ func (vlog *valueLog) iterate(lf *logFile, offset uint32, fn logEntry) (uint32,
}

func (vlog *valueLog) rewrite(f *logFile, tr trace.Trace) error {
maxFid := atomic.LoadUint32(&vlog.maxFid)
vlog.filesLock.RLock()
maxFid := vlog.maxFid
vlog.filesLock.RUnlock()
y.AssertTruef(uint32(f.fid) < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
tr.LazyPrintf("Rewriting fid: %d", f.fid)

Expand Down Expand Up @@ -808,10 +810,9 @@ func (vlog *valueLog) dropAll() (int, error) {
}

vlog.db.opt.Infof("Value logs deleted. Creating value log file: 0")
if _, err := vlog.createVlogFile(0); err != nil {
if _, err := vlog.createVlogFile(0); err != nil { // Called while writes are stopped.
return count, err
}
atomic.StoreUint32(&vlog.maxFid, 0)
return count, nil
}

Expand All @@ -832,12 +833,12 @@ type valueLog struct {
// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
filesMap map[uint32]*logFile
maxFid uint32
filesToBeDeleted []uint32
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
numActiveIterators int32

db *DB
maxFid uint32 // accessed via atomics.
writableLogOffset uint32 // read by read, written by write. Must access via atomics.
numEntriesWritten uint32
opt Options
Expand Down Expand Up @@ -997,14 +998,15 @@ func (vlog *valueLog) createVlogFile(fid uint32) (*logFile, error) {
if err = lf.mmap(2 * vlog.opt.ValueLogFileSize); err != nil {
return nil, errFile(err, lf.path, "Mmap value log file")
}

vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
atomic.StoreUint32(&vlog.writableLogOffset, vlogHeaderSize)
vlog.numEntriesWritten = 0

vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
vlog.filesLock.Unlock()

return lf, nil
Expand Down Expand Up @@ -1155,12 +1157,12 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
// plain text mode or vice versa. A single vlog file can't have both
// encrypted entries and plain text entries.
if last.encryptionEnabled() != vlog.db.shouldEncrypt() {
newid := atomic.AddUint32(&vlog.maxFid, 1)
newid := vlog.maxFid + 1
_, err := vlog.createVlogFile(newid)
if err != nil {
return y.Wrapf(err, "Error while creating log file %d in valueLog.open", newid)
}
last, ok = vlog.filesMap[vlog.maxFid]
last, ok = vlog.filesMap[newid]
y.AssertTrue(ok)
}
lastOffset, err := last.fd.Seek(0, io.SeekEnd)
Expand Down Expand Up @@ -1222,7 +1224,7 @@ func (vlog *valueLog) Close() error {
err = munmapErr
}

maxFid := atomic.LoadUint32(&vlog.maxFid)
maxFid := vlog.maxFid
if !vlog.opt.ReadOnly && id == maxFid {
// truncate writable log file to correct offset.
if truncErr := f.fd.Truncate(
Expand Down Expand Up @@ -1320,7 +1322,7 @@ func (vlog *valueLog) sync(fid uint32) error {
}

vlog.filesLock.RLock()
maxFid := atomic.LoadUint32(&vlog.maxFid)
maxFid := vlog.maxFid
// During replay it is possible to get sync call with fid less than maxFid.
// Because older file has already been synced, we can return from here.
if fid < maxFid || len(vlog.filesMap) == 0 {
Expand Down Expand Up @@ -1353,7 +1355,7 @@ func (vlog *valueLog) write(reqs []*request) error {
return nil
}
vlog.filesLock.RLock()
maxFid := atomic.LoadUint32(&vlog.maxFid)
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
vlog.filesLock.RUnlock()

Expand Down Expand Up @@ -1385,7 +1387,7 @@ func (vlog *valueLog) write(reqs []*request) error {
return err
}

newid := atomic.AddUint32(&vlog.maxFid, 1)
newid := vlog.maxFid + 1
y.AssertTruef(newid > 0, "newid has overflown uint32: %v", newid)
newlf, err := vlog.createVlogFile(newid)
if err != nil {
Expand Down Expand Up @@ -1446,28 +1448,33 @@ func (vlog *valueLog) write(reqs []*request) error {

// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func (vlog *valueLog) getFileRLocked(fid uint32) (*logFile, error) {
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
ret, ok := vlog.filesMap[fid]
ret, ok := vlog.filesMap[vp.Fid]
if !ok {
// log file has gone away, will need to retry the operation.
return nil, ErrRetry
}

// Check for valid offset if we are reading from writable log.
maxFid := vlog.maxFid
if vp.Fid == maxFid {
currentOffset := vlog.woffset()
if vp.Offset >= currentOffset {
return nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, currentOffset)
}
}

ret.lock.RLock()
return ret, nil
}

// Read reads the value log at a given location.
// TODO: Make this read private.
func (vlog *valueLog) Read(vp valuePointer, s *y.Slice) ([]byte, func(), error) {
// Check for valid offset if we are reading from writable log.
maxFid := atomic.LoadUint32(&vlog.maxFid)
if vp.Fid == maxFid && vp.Offset >= vlog.woffset() {
return nil, nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, vlog.woffset())
}
buf, lf, err := vlog.readValueBytes(vp, s)
// log file is locked so, decide whether to lock immediately or let the caller to
// unlock it, after caller uses it.
Expand Down Expand Up @@ -1517,10 +1524,11 @@ func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
// logFile unlocking.
func (vlog *valueLog) readValueBytes(vp valuePointer, s *y.Slice) ([]byte, *logFile, error) {
lf, err := vlog.getFileRLocked(vp.Fid)
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
}

buf, err := lf.read(vp, s)
return buf, lf, err
}
Expand Down