Skip to content

Commit

Permalink
nsqd: diskqueue syncs when only reads have occurred
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Apr 9, 2016
1 parent 9156b4b commit da8c8aa
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 8 deletions.
21 changes: 13 additions & 8 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,15 +569,15 @@ func (d *diskQueue) handleReadError() {
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var rcount int64
var wcount int64
var r chan []byte

syncTicker := time.NewTicker(d.syncTimeout)

for {
// dont sync all the time :)
if count == d.syncEvery {
count = 0
if wcount == d.syncEvery {
d.needSync = true
}

Expand All @@ -586,6 +586,8 @@ func (d *diskQueue) ioLoop() {
if err != nil {
d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
}
rcount = 0
wcount = 0
}

if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
Expand All @@ -607,19 +609,22 @@ func (d *diskQueue) ioLoop() {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
rcount++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
rcount = 0
wcount = 0
case dataWrite := <-d.writeChan:
count++
wcount++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count > 0 {
count = 0
d.needSync = true
if wcount == 0 && rcount == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
Expand Down
61 changes: 61 additions & 0 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,67 @@ func TestDiskQueueCorruption(t *testing.T) {
equal(t, <-dq.ReadChan(), msg)
}

type md struct {
depth int64
readFileNum int64
writeFileNum int64
readPos int64
writePos int64
}

func readMetaDataFile(fileName string) md {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
panic(err)
}
defer f.Close()

var ret md
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&ret.depth,
&ret.readFileNum, &ret.readPos,
&ret.writeFileNum, &ret.writePos)
if err != nil {
panic(err)
}
return ret
}

func TestDiskQueueSyncAfterRead(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
defer dq.Close()

msg := make([]byte, 1000)
dq.Put(msg)

time.Sleep(100 * time.Millisecond)

d := readMetaDataFile(dq.(*diskQueue).metaDataFileName())
equal(t, d.depth, int64(1))
equal(t, d.readFileNum, int64(0))
equal(t, d.writeFileNum, int64(0))
equal(t, d.readPos, int64(0))
equal(t, d.writePos, int64(1004))
dq.Put(msg)

<-dq.ReadChan()
time.Sleep(100 * time.Millisecond)

d = readMetaDataFile(dq.(*diskQueue).metaDataFileName())
equal(t, d.depth, int64(1))
equal(t, d.readFileNum, int64(0))
equal(t, d.writeFileNum, int64(0))
equal(t, d.readPos, int64(1004))
equal(t, d.writePos, int64(2008))
}

func TestDiskQueueTorture(t *testing.T) {
var wg sync.WaitGroup

Expand Down

0 comments on commit da8c8aa

Please sign in to comment.