diff --git a/nsqd/diskqueue.go b/nsqd/diskqueue.go index c65a90d09..9bed27bb3 100644 --- a/nsqd/diskqueue.go +++ b/nsqd/diskqueue.go @@ -577,7 +577,6 @@ func (d *diskQueue) ioLoop() { for { // dont sync all the time :) if count == d.syncEvery { - count = 0 d.needSync = true } @@ -586,6 +585,7 @@ func (d *diskQueue) ioLoop() { if err != nil { d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) } + count = 0 } if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { @@ -607,6 +607,7 @@ func (d *diskQueue) ioLoop() { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read case r <- dataRead: + count++ // moveForward sets needSync flag if a file is removed d.moveForward() case <-d.emptyChan: @@ -616,10 +617,11 @@ func (d *diskQueue) ioLoop() { count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: - if count > 0 { - count = 0 - d.needSync = true + if count == 0 { + // avoid sync when there's no activity + continue } + d.needSync = true case <-d.exitChan: goto exit } diff --git a/nsqd/diskqueue_test.go b/nsqd/diskqueue_test.go index 6829757f7..78e5aa819 100644 --- a/nsqd/diskqueue_test.go +++ b/nsqd/diskqueue_test.go @@ -181,6 +181,67 @@ func TestDiskQueueCorruption(t *testing.T) { equal(t, <-dq.ReadChan(), msg) } +type md struct { + depth int64 + readFileNum int64 + writeFileNum int64 + readPos int64 + writePos int64 +} + +func readMetaDataFile(fileName string) md { + f, err := os.OpenFile(fileName, os.O_RDONLY, 0600) + if err != nil { + panic(err) + } + defer f.Close() + + var ret md + _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", + &ret.depth, + &ret.readFileNum, &ret.readPos, + &ret.writeFileNum, &ret.writePos) + if err != nil { + panic(err) + } + return ret +} + +func TestDiskQueueSyncAfterRead(t *testing.T) { + l := newTestLogger(t) + dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + dq := newDiskQueue(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + defer dq.Close() + + msg := make([]byte, 1000) + dq.Put(msg) + + time.Sleep(100 * time.Millisecond) + + d := readMetaDataFile(dq.(*diskQueue).metaDataFileName()) + equal(t, d.depth, int64(1)) + equal(t, d.readFileNum, int64(0)) + equal(t, d.writeFileNum, int64(0)) + equal(t, d.readPos, int64(0)) + equal(t, d.writePos, int64(1004)) + dq.Put(msg) + + <-dq.ReadChan() + time.Sleep(100 * time.Millisecond) + + d = readMetaDataFile(dq.(*diskQueue).metaDataFileName()) + equal(t, d.depth, int64(1)) + equal(t, d.readFileNum, int64(0)) + equal(t, d.writeFileNum, int64(0)) + equal(t, d.readPos, int64(1004)) + equal(t, d.writePos, int64(2008)) +} + func TestDiskQueueTorture(t *testing.T) { var wg sync.WaitGroup