Skip to content

Commit

Permalink
Merge pull request #407 from jehiah/touch_logic_407
Browse files Browse the repository at this point in the history
nsqd: touch handling
  • Loading branch information
mreiferson committed Jul 18, 2014
2 parents 0c33ce3 + c08abe2 commit c8ddec6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
7 changes: 3 additions & 4 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,15 @@ func (c *Channel) PutMessage(msg *Message) error {
}

// TouchMessage resets the timeout for an in-flight message
func (c *Channel) TouchMessage(clientID int64, id MessageID) error {
func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout time.Duration) error {
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
c.removeFromInFlightPQ(msg)

currentTimeout := time.Unix(0, msg.pri)
newTimeout := currentTimeout.Add(c.context.nsqd.options.MsgTimeout)
if newTimeout.Add(c.context.nsqd.options.MsgTimeout).Sub(msg.deliveryTS) >=
newTimeout := time.Now().Add(clientMsgTimeout)
if newTimeout.Sub(msg.deliveryTS) >=
c.context.nsqd.options.MaxMsgTimeout {
// we would have gone over, set to the max
newTimeout = msg.deliveryTS.Add(c.context.nsqd.options.MaxMsgTimeout)
Expand Down
5 changes: 4 additions & 1 deletion nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,10 @@ func (p *protocolV2) TOUCH(client *clientV2, params [][]byte) ([]byte, error) {
return nil, util.NewFatalClientErr(nil, "E_INVALID", err.Error())
}

err = client.Channel.TouchMessage(client.ID, *id)
client.RLock()
msgTimeout := client.MsgTimeout
client.RUnlock()
err = client.Channel.TouchMessage(client.ID, *id, msgTimeout)
if err != nil {
return nil, util.NewClientErr(err, "E_TOUCH_FAILED",
fmt.Sprintf("TOUCH %s failed %s", *id, err.Error()))
Expand Down

0 comments on commit c8ddec6

Please sign in to comment.