Skip to content

Commit

Permalink
Merge pull request nsqio#108 from youzan/add-more-grpc
Browse files Browse the repository at this point in the history
Topic data consume may be wrong while leader changed and in force fix mode
  • Loading branch information
absolute8511 authored Oct 7, 2019
2 parents e2dad96 + a19e298 commit a4123b9
Show file tree
Hide file tree
Showing 12 changed files with 734 additions and 53 deletions.
70 changes: 69 additions & 1 deletion apps/nsq_data_tool/tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/youzan/nsq/consistence"
"github.com/youzan/nsq/internal/clusterinfo"
"github.com/youzan/nsq/internal/http_api"
"github.com/youzan/nsq/internal/levellogger"
"github.com/youzan/nsq/internal/version"
"github.com/youzan/nsq/nsqd"
Expand All @@ -19,8 +21,10 @@ var (
topic = flag.String("topic", "", "NSQ topic")
partition = flag.Int("partition", -1, "NSQ topic partition")
dataPath = flag.String("data_path", "", "the data path of nsqd")
srcNsqLookupd = flag.String("src_nsqlookupd", "", "")
dstNsqLookupd = flag.String("dest_nsqlookupd", "", "")
view = flag.String("view", "commitlog", "commitlog | topicdata | delayedqueue")
searchMode = flag.String("search_mode", "count", "the view start of mode. (count|id|timestamp|virtual_offset)")
searchMode = flag.String("search_mode", "count", "the view start of mode. (count|id|timestamp|virtual_offset|check_channels)")
viewStart = flag.Int64("view_start", 0, "the start count of message.")
viewStartID = flag.Int64("view_start_id", 0, "the start id of message.")
viewStartTimestamp = flag.Int64("view_start_timestamp", 0, "the start timestamp of message.")
Expand All @@ -37,6 +41,66 @@ func getBackendName(topicName string, part int) string {
return backendName
}

func checkChannelStats() {
lsrc := clusterinfo.LookupdAddressDC{
DC: "",
Addr: *srcNsqLookupd,
}

client := http_api.NewClient(nil)
ci := clusterinfo.New(nil, client)
srcnodes, err := ci.GetLookupdProducers([]clusterinfo.LookupdAddressDC{lsrc})
if err != nil {
log.Printf("error: %v", err.Error())
return
}
_, channels, err := ci.GetNSQDStats(srcnodes, "", "", true)
if err != nil {
log.Printf("error: %v", err.Error())
return
}
ldest := clusterinfo.LookupdAddressDC{
DC: "",
Addr: *dstNsqLookupd,
}

destnodes, err := ci.GetLookupdProducers([]clusterinfo.LookupdAddressDC{ldest})
if err != nil {
log.Printf("error: %v", err.Error())
return
}
_, channels2, err := ci.GetNSQDStats(destnodes, "", "", true)
if err != nil {
log.Printf("error: %v", err.Error())
return
}

for name, ch := range channels {
ch2, ok := channels2[name]
if !ok {
log.Printf("src channel not found in dst: %v", name)
continue
}
if ch.Skipped == ch2.Skipped && ch.Paused == ch2.Paused {
continue
}
log.Printf("ch %v mismatch, src channel (paused: %v, skipped: %v), dest channel: (%v, %v)\n", name, ch.Paused, ch.Skipped,
ch2.Paused, ch2.Skipped)
}
for name, ch := range channels2 {
ch2, ok := channels[name]
if !ok {
log.Printf("dest channel not found in src: %v", name)
continue
}
if ch.Skipped == ch2.Skipped && ch.Paused == ch2.Paused {
continue
}
log.Printf("ch %v mismatch, src channel (paused: %v, skipped: %v), dest channel: (%v, %v)\n", name, ch2.Paused, ch2.Skipped,
ch.Paused, ch.Skipped)
}
}

func main() {
flag.Parse()

Expand All @@ -45,6 +109,10 @@ func main() {
return
}

if *searchMode == "check_channels" {
checkChannelStats()
return
}
nsqd.SetLogger(levellogger.NewSimpleLog())
nsqd.NsqLogger().SetLevel(int32(*logLevel))
consistence.SetCoordLogger(levellogger.NewSimpleLog(), int32(*logLevel))
Expand Down
4 changes: 2 additions & 2 deletions consistence/data_placement_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (s LFListT) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s LFListT) Less(i, j int) bool {
if math.Abs(s[i].loadFactor-s[j].loadFactor) < 0.5 {
if math.Abs(s[i].loadFactor-s[j].loadFactor) < 1 {
return s[i].topic < s[j].topic
}
return s[i].loadFactor < s[j].loadFactor
Expand Down Expand Up @@ -1885,7 +1885,7 @@ func (dpm *DataPlacement) rebalanceTopNTopicsByLoad(monitorChan chan struct{},
return false, moved, sortedTopNTopics
default:
}
if !dpm.lookupCoord.IsClusterStable() || !dpm.lookupCoord.IsMineLeader() || movedCnt > 10 {
if !dpm.lookupCoord.IsClusterStable() || !dpm.lookupCoord.IsMineLeader() || movedCnt > 20 {
coordLog.Infof("no balance since cluster is not stable or too much moved %v while checking balance", movedCnt)
return false, moved, sortedTopNTopics
}
Expand Down
33 changes: 22 additions & 11 deletions consistence/nsqd_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ const (
)

var (
MaxRetryWait = time.Second * 3
MaxRetryWait = time.Second
ForceFixLeaderData = false
MaxTopicRetentionSizePerDay = int64(1024 * 1024 * 1024 * 16)
flushTicker = time.Second * 2
)

var testCatchupPausedPullLogs int32
Expand Down Expand Up @@ -432,7 +433,7 @@ func (ncoord *NsqdCoordinator) periodFlushCommitLogs() {
tmpCoords := make(map[string]map[int]*TopicCoordinator)
syncCounter := 0
defer ncoord.wg.Done()
flushTicker := time.NewTicker(time.Second * 2)
flushTicker := time.NewTicker(flushTicker)
doFlush := func() {
syncCounter++
ncoord.getAllCoords(tmpCoords)
Expand Down Expand Up @@ -850,14 +851,19 @@ func checkAndFixLocalLogQueueEnd(tc *coordData,
coordLog.Infof("no commit last log data : %v", err)
return nil
}
coordLog.Infof("current topic %v log: %v:%v, %v",
tname, logIndex, logOffset, logData)

commitEndOffset := nsqd.BackendOffset(logData.MsgOffset + int64(logData.MsgSize))
commitEndCnt := logData.MsgCnt + int64(logData.MsgNum) - 1
if localLogQ.TotalDataSize() == int64(commitEndOffset) && localLogQ.TotalMessageCnt() == uint64(commitEndCnt) {
return nil
}
coordLog.Infof("current topic %v log: %v:%v, %v, diskqueue end: %v",
tname, logIndex, logOffset, logData, localLogQ.TotalDataSize())

if !forceFix && !tryFixEnd {
return nil
}
localErr := localLogQ.ResetBackendEndNoLock(nsqd.BackendOffset(logData.MsgOffset+int64(logData.MsgSize)),
logData.MsgCnt+int64(logData.MsgNum)-1)
localErr := localLogQ.ResetBackendEndNoLock(commitEndOffset, commitEndCnt)
if localErr == nil {
return nil
}
Expand Down Expand Up @@ -2260,8 +2266,8 @@ func (ncoord *NsqdCoordinator) TryFixLocalTopic(topic string, pid int) error {

// handle all the things while leader is changed or isr is changed.
func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator,
localTopic *nsqd.Topic, syncCommitDisk bool) *CoordErr {
// flush topic data and channel comsume data if any cluster topic info changed
localTopic *nsqd.Topic, lockedAndFixCommitDisk bool) *CoordErr {
// flush topic data and channel consume data if any cluster topic info changed
tcData := topicCoord.GetData()
master := tcData.GetLeader() == ncoord.myNode.GetID()
// leader changed (maybe down), we make sure out data is flushed to keep data safe
Expand All @@ -2271,7 +2277,11 @@ func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator
if master {
isWriteDisabled := topicCoord.IsWriteDisabled()
localTopic.Lock()
localErr := checkAndFixLocalLogQueueEnd(tcData, localTopic, tcData.logMgr, !isWriteDisabled && syncCommitDisk, ForceFixLeaderData)
// lockedAndFixCommitDisk means we need make sure the commit log and disk queue is consistence
// while enable the new leader (or old leader while leader not changed), we need make sure all data is consistence
// we can not force fix data while not lockedAndFixCommitDisk because the write lock is not hold without it
localErr := checkAndFixLocalLogQueueEnd(tcData, localTopic, tcData.logMgr,
!isWriteDisabled && lockedAndFixCommitDisk, ForceFixLeaderData && lockedAndFixCommitDisk)
if localErr != nil {
atomic.StoreInt32(&topicCoord.disableWrite, 1)
isWriteDisabled = true
Expand All @@ -2280,7 +2290,7 @@ func (ncoord *NsqdCoordinator) switchStateForMaster(topicCoord *TopicCoordinator
}
if tcData.delayedLogMgr != nil && !tcData.topicInfo.OrderedMulti {
localErr = checkAndFixLocalLogQueueEnd(tcData, localTopic.GetDelayedQueue(), tcData.delayedLogMgr,
!isWriteDisabled && syncCommitDisk, ForceFixLeaderData)
!isWriteDisabled && lockedAndFixCommitDisk, ForceFixLeaderData && lockedAndFixCommitDisk)
if localErr != nil {
atomic.StoreInt32(&topicCoord.disableWrite, 1)
isWriteDisabled = true
Expand Down Expand Up @@ -2695,7 +2705,8 @@ func (ncoord *NsqdCoordinator) readTopicRawData(topic string, partition int, off
coordLog.Infof("read topic %v data at offset %v, size: %v, error: %v", t.GetFullName(), offset, size, err)
break
}
buf, err := snap.ReadRaw(size)
var buf []byte
buf, err = snap.ReadRaw(size)
if err != nil {
coordLog.Infof("read topic data at offset %v, size:%v(actual: %v), error: %v", offset, size, len(buf), err)
break
Expand Down
24 changes: 22 additions & 2 deletions consistence/nsqd_coordinator_cluster_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ import (
"github.com/youzan/nsq/nsqd"
)

var testSlaveTimeout int32

func setTestSlaveTimeout(enableTest bool) {
if enableTest {
atomic.StoreInt32(&testSlaveTimeout, 1)
} else {
atomic.StoreInt32(&testSlaveTimeout, 0)
}
}

func isTestSlaveTimeout() bool {
return atomic.LoadInt32(&testSlaveTimeout) == 1
}

type localWriteFunc func(*coordData) *CoordErr
type localExitFunc func(*CoordErr)
type localCommitFunc func() error
Expand Down Expand Up @@ -152,6 +166,9 @@ func (ncoord *NsqdCoordinator) internalPutMessageToCluster(topic *nsqd.Topic,
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
if isTestSlaveTimeout() {
return NewCoordErr("timeout test for slave sync", CoordNetErr)
}
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
if putDelayed {
putErr := c.PutDelayedMessage(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msg)
Expand Down Expand Up @@ -289,6 +306,9 @@ func (ncoord *NsqdCoordinator) PutMessagesToCluster(topic *nsqd.Topic,
return nil
}
doSlaveSync := func(c *NsqdRpcClient, nodeID string, tcData *coordData) *CoordErr {
if isTestSlaveTimeout() {
return NewCoordErr("timeout test for slave sync", CoordNetErr)
}
// should retry if failed, and the slave should keep the last success write to avoid the duplicated
putErr := c.PutMessages(&tcData.topicLeaderSession, &tcData.topicInfo, commitLog, msgs)
if putErr != nil {
Expand Down Expand Up @@ -393,7 +413,7 @@ retrysync:
clusterWriteErr = ErrTopicExiting
goto exitsync
}
time.Sleep(time.Second)
time.Sleep(time.Second / 2)
}
if needRefreshISR {
tcData = coord.GetData()
Expand Down Expand Up @@ -507,7 +527,7 @@ retrysync:
coordLog.Infof("request the failed node: %v to leave topic %v isr", nid, topicFullName)
}
}
time.Sleep(time.Second)
time.Sleep(time.Second / 2)
}
} else {
needLeaveISR = true
Expand Down
18 changes: 8 additions & 10 deletions consistence/nsqd_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,12 @@ func ensureCatchupForTopic(nsqdCoord *NsqdCoordinator, topicInfo RpcAdminTopicIn
func newNsqdNode(t *testing.T, id string) (*nsqdNs.NSQD, int, *NsqdNodeInfo, string) {
opts := nsqdNs.NewOptions()
opts.Logger = newTestLogger(t)
opts.MaxBytesPerFile = 1024 * 1024
if testing.Verbose() {
opts.Logger = &levellogger.SimpleLogger{}
opts.LogLevel = levellogger.LOG_INFO
opts.Logger = levellogger.NewSimpleLog()
nsqdNs.SetLogger(opts.Logger)
glog.SetFlags(0, "", "", true, true, 1)
glog.StartWorker(time.Second)
} else {
opts.LogLevel = levellogger.LOG_INFO
Expand All @@ -259,9 +262,10 @@ func newNsqdNode(t *testing.T, id string) (*nsqdNs.NSQD, int, *NsqdNodeInfo, str
nsqd := mustStartNSQD(opts)
randPort := rand.Int31n(10000) + 20000
nodeInfo := NsqdNodeInfo{
NodeIP: "127.0.0.1",
TcpPort: "0",
RpcPort: strconv.Itoa(int(randPort)),
NodeIP: "127.0.0.1",
TcpPort: "0",
RpcPort: strconv.Itoa(int(randPort)),
HttpPort: strconv.Itoa(int(randPort + 1)),
}
nodeInfo.ID = GenNsqdNodeID(&nodeInfo, id)
return nsqd, int(randPort), &nodeInfo, opts.DataPath
Expand Down Expand Up @@ -1822,12 +1826,6 @@ func TestNsqdCoordLeaderChangeWhileWrite(t *testing.T) {
// RESULT: all new isr nodes should be synced
}

func TestNsqdCoordISRChangedWhileWrite(t *testing.T) {
// TODO: leader write while network split, and part of isr agreed with write
// leader need rollback (or just move leader out from isr and sync with new leader )
// RESULT: all new isr nodes should be synced
}

func TestNsqdCoordSyncChannelList(t *testing.T) {
topic := "coordTestTopicSyncChannel"
partition := 1
Expand Down
2 changes: 1 addition & 1 deletion consistence/nsqlookup_coord_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (nlcoord *NsqLookupCoordinator) GetTopicMetaInfo(topicName string) (TopicMe
return TopicMetaInfo{}, err
}
if !cached {
coordLog.Infof("miss cache read for topic info: %v", topicName)
coordLog.Debugf("miss cache read for topic info: %v", topicName)
}
return meta, err
}
Expand Down
Loading

0 comments on commit a4123b9

Please sign in to comment.