Skip to content

Commit

Permalink
Merge pull request #730 from mreiferson/diskqueue-read-sync-730
Browse files Browse the repository at this point in the history
nsqd: persisted data repeatedly consumed
  • Loading branch information
jehiah committed Apr 9, 2016
2 parents 9156b4b + 497111e commit 6ec6bee
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
10 changes: 6 additions & 4 deletions nsqd/diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,6 @@ func (d *diskQueue) ioLoop() {
for {
// dont sync all the time :)
if count == d.syncEvery {
count = 0
d.needSync = true
}

Expand All @@ -586,6 +585,7 @@ func (d *diskQueue) ioLoop() {
if err != nil {
d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err)
}
count = 0
}

if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
Expand All @@ -607,6 +607,7 @@ func (d *diskQueue) ioLoop() {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
d.moveForward()
case <-d.emptyChan:
Expand All @@ -616,10 +617,11 @@ func (d *diskQueue) ioLoop() {
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
if count > 0 {
count = 0
d.needSync = true
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
Expand Down
61 changes: 61 additions & 0 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,67 @@ func TestDiskQueueCorruption(t *testing.T) {
equal(t, <-dq.ReadChan(), msg)
}

type md struct {
depth int64
readFileNum int64
writeFileNum int64
readPos int64
writePos int64
}

func readMetaDataFile(fileName string) md {
f, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
panic(err)
}
defer f.Close()

var ret md
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&ret.depth,
&ret.readFileNum, &ret.readPos,
&ret.writeFileNum, &ret.writePos)
if err != nil {
panic(err)
}
return ret
}

func TestDiskQueueSyncAfterRead(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_read_after_sync" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
defer dq.Close()

msg := make([]byte, 1000)
dq.Put(msg)

time.Sleep(100 * time.Millisecond)

d := readMetaDataFile(dq.(*diskQueue).metaDataFileName())
equal(t, d.depth, int64(1))
equal(t, d.readFileNum, int64(0))
equal(t, d.writeFileNum, int64(0))
equal(t, d.readPos, int64(0))
equal(t, d.writePos, int64(1004))
dq.Put(msg)

<-dq.ReadChan()
time.Sleep(100 * time.Millisecond)

d = readMetaDataFile(dq.(*diskQueue).metaDataFileName())
equal(t, d.depth, int64(1))
equal(t, d.readFileNum, int64(0))
equal(t, d.writeFileNum, int64(0))
equal(t, d.readPos, int64(1004))
equal(t, d.writePos, int64(2008))
}

func TestDiskQueueTorture(t *testing.T) {
var wg sync.WaitGroup

Expand Down

0 comments on commit 6ec6bee

Please sign in to comment.