diff --git a/consumer/consumer.go b/consumer/consumer.go index 7bd2b027..813250fb 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -275,8 +275,6 @@ func (dc *defaultConsumer) start() error { retryTopic := internal.GetRetryTopic(dc.consumerGroup) sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll}) dc.subscriptionDataTable.Store(retryTopic, sub) - - dc.option.ChangeInstanceNameToPID() dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.client.GetNameSrv()) } else { dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID()) diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index ef165b06..c8861ae9 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -95,7 +95,6 @@ func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) { defaultOpts.Namesrv = srvs dc := &defaultConsumer{ - client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: utils.WrapNamespace(defaultOpts.Namespace, defaultOpts.GroupName), cType: _PullConsume, state: int32(internal.StateCreateJust), @@ -153,6 +152,11 @@ func (pc *defaultPullConsumer) Start() error { err = errors.Wrap(err, "the consumer group option validate fail") return } + if pc.model == Clustering { + pc.option.ChangeInstanceNameToPID() + } + + pc.client = internal.GetOrNewRocketMQClient(pc.option.ClientOptions, nil) err = pc.defaultConsumer.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { rlog.Error("defaultPullConsumer the consumer group has been created, specify another one", map[string]interface{}{ diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go index 85f9725b..70241a65 100644 --- a/consumer/push_consumer.go +++ b/consumer/push_consumer.go @@ -94,7 +94,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) { } dc := &defaultConsumer{ - client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil), consumerGroup: defaultOpts.GroupName, cType: _PushConsume, state: int32(internal.StateCreateJust), @@ -149,6 +148,11 @@ func (pc *pushConsumer) Start() error { return } + if pc.model == Clustering { + pc.option.ChangeInstanceNameToPID() + } + + pc.client = internal.GetOrNewRocketMQClient(pc.option.ClientOptions, nil) err = pc.client.RegisterConsumer(pc.consumerGroup, pc) if err != nil { rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{ @@ -157,7 +161,6 @@ func (pc *pushConsumer) Start() error { err = errors2.ErrCreated return } - err = pc.defaultConsumer.start() if err != nil { return