-
Notifications
You must be signed in to change notification settings - Fork 346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Access to partitionConsumer state not thread-safe. #448
Comments
Preliminary testing with diff --git a/go.mod b/go.mod
index bf0b627..2bb071f 100644
--- a/go.mod
+++ b/go.mod
@@ -23,6 +23,7 @@ require (
github.com/spf13/pflag v1.0.3 // indirect
github.com/stretchr/testify v1.4.0
github.com/yahoo/athenz v1.8.55
+ go.uber.org/atomic v1.7.0
)
replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index eaf7163..637ce54 100644
--- a/go.sum
+++ b/go.sum
@@ -154,6 +154,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 285cf29..54d2c5a 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/gogo/protobuf/proto"
+ "go.uber.org/atomic"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
@@ -31,9 +32,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/log"
)
-var (
- lastestMessageID = LatestMessageID()
-)
+var lastestMessageID = LatestMessageID()
type consumerState int
@@ -86,7 +85,7 @@ type partitionConsumer struct {
// this is needed for sending ConsumerMessage on the messageCh
parentConsumer Consumer
- state consumerState
+ state atomic.Int32
options *partitionConsumerOpts
conn internal.Connection
@@ -127,7 +126,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.TopicMetrics) (*partitionConsumer, error) {
pc := &partitionConsumer{
- state: consumerInit,
parentConsumer: parent,
client: client,
options: options,
@@ -148,6 +146,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
dlq: dlq,
metrics: metrics,
}
+ pc.setState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
"topic": options.topic,
@@ -159,14 +158,16 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
err := pc.grabConn()
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
+ pc.nackTracker.Close()
return nil, err
}
pc.log.Info("Created consumer")
- pc.state = consumerReady
+ pc.setState(consumerReady)
if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
+ pc.nackTracker.Close()
return nil, err
}
if msgID.entryID != noMessageEntry {
@@ -174,6 +175,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
err = pc.requestSeek(msgID.messageID)
if err != nil {
+ pc.nackTracker.Close()
return nil, err
}
}
@@ -198,12 +200,13 @@ func (pc *partitionConsumer) Unsubscribe() error {
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
- if pc.state == consumerClosed || pc.state == consumerClosing {
+ pstate := pc.getState()
+ if pstate == consumerClosed || pstate == consumerClosing {
pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
return
}
- pc.state = consumerClosing
+ pc.setState(consumerClosing)
requestID := pc.client.rpcClient.NewRequestID()
cmdUnsubscribe := &pb.CommandUnsubscribe{
RequestId: proto.Uint64(requestID),
@@ -214,7 +217,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
unsub.err = err
// Set the state to ready for closing the consumer
- pc.state = consumerReady
+ pc.setState(consumerReady)
// Should'nt remove the consumer handler
return
}
@@ -224,7 +227,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.nackTracker.Close()
}
pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
- pc.state = consumerClosed
+ pc.setState(consumerClosed)
}
func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
@@ -305,7 +308,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) {
}
func (pc *partitionConsumer) Close() {
- if pc.state != consumerReady {
+ if pc.getState() != consumerReady {
return
}
@@ -334,7 +337,8 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
}
func (pc *partitionConsumer) requestSeek(msgID messageID) error {
- if pc.state == consumerClosing || pc.state == consumerClosed {
+ pstate := pc.getState()
+ if pstate == consumerClosing || pstate == consumerClosed {
pc.log.Error("Consumer was already closed")
return nil
}
@@ -376,7 +380,8 @@ func (pc *partitionConsumer) SeekByTime(time time.Time) error {
func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) {
defer close(seek.doneCh)
- if pc.state == consumerClosing || pc.state == consumerClosed {
+ pstate := pc.getState()
+ if pstate == consumerClosing || pstate == consumerClosed {
pc.log.Error("Consumer was already closed")
return
}
@@ -738,11 +743,16 @@ func (pc *partitionConsumer) runEventsLoop() {
func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
- if pc.state != consumerReady {
+ pstate := pc.getState()
+ if pstate != consumerReady {
+ // this might be redundant but to ensure nack tracker is closed
+ if pc.nackTracker != nil {
+ pc.nackTracker.Close()
+ }
return
}
- if pc.state == consumerClosed || pc.state == consumerClosing {
+ if pstate == consumerClosed || pstate == consumerClosing {
pc.log.Error("The consumer is closing or has been closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
@@ -750,7 +760,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
return
}
- pc.state = consumerClosing
+ pc.setState(consumerClosing)
pc.log.Infof("Closing consumer=%d", pc.consumerID)
requestID := pc.client.rpcClient.NewRequestID()
@@ -769,7 +779,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
provider.Close()
}
- pc.state = consumerClosed
+ pc.setState(consumerClosed)
pc.conn.DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
@@ -790,7 +800,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
}
for maxRetry != 0 {
- if pc.state != consumerReady {
+ if pc.getState() != consumerReady {
// Consumer is already closing
return
}
@@ -876,7 +886,6 @@ func (pc *partitionConsumer) grabConn() error {
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
-
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
return err
@@ -908,7 +917,7 @@ func (pc *partitionConsumer) grabConn() error {
}
func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
- if pc.state != consumerReady {
+ if pc.getState() != consumerReady {
return trackingMessageID{}
}
wg := &sync.WaitGroup{}
@@ -1028,6 +1037,14 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}
+func (pc *partitionConsumer) getState() consumerState {
+ return consumerState(pc.state.Load())
+}
+
+func (pc *partitionConsumer) setState(state consumerState) {
+ pc.state.Store(int32(state))
+}
+
func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
if msgID.Undefined() {
return nil Note: diff is merely illustrative of the approach of using this particular package for the fix. Other changes, like the nacktracker change have entered the main branch since the changes I made for #432, and are not germane to the issue. I haven't determined if this problem is specific to the regex consumer or if this is a deeper issue with the module, though it seems more likely that it's the former since all tests seem to be run with the race detector on, and I'd think (hope? :D) that it would have been seen by now otherwise. If this is the only member accessed like this, it may be an acceptable fix to wrap access with something like |
It looks like this is a miss. In https://github.com/uber-go/atomic/blob/master/uint32.go uses sync/atomic underneath. Can we converge to use uber library for simplicity? @merlimat @flowchartsman |
#451) * make state thread safe in consumer_partition and connection * use uber atomic for all state management * fix go lint error * Apply suggestions from code review Co-authored-by: Matteo Merli <[email protected]>
Did #451 address this? If so, this should be closed as fixed, no? |
As part of my testing of the fix for #432, I have discovered a race condition in consumer_partition.go where access to the struct's
state
member is modified concurrently without synchronization:The text was updated successfully, but these errors were encountered: