Skip to content

Commit

Permalink
[Improve] Add InitialSubscriptionName for DLQPolicy (#1264)
Browse files Browse the repository at this point in the history
Fixes #1239 

### Modifications
Add `InitialSubscriptionName` for DLQPolicy.
  • Loading branch information
crossoverJie authored Aug 1, 2024
1 parent 8dd4ed1 commit dad98f1
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ type DLQPolicy struct {

// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
RetryLetterTopic string

// InitialSubscriptionName Name of the initial subscription name of the dead letter topic.
// If this field is not set, the initial subscription for the dead letter topic will not be created.
// If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
// will fail to be created.
InitialSubscriptionName string
}

// AckGroupingOptions controls how to group ACK requests
Expand Down
85 changes: 85 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,91 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
assert.Error(t, err)
assert.Nil(t, msg)
}
func TestDeadLetterTopicWithInitialSubscription(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/" + newTopicName()
dlqSub, sub, consumerName := "init-sub", "my-sub", "my-consumer"
dlqTopic := fmt.Sprintf("%s-%s-DLQ", topic, sub)
ctx := context.Background()

// create consumer
maxRedeliveryCount, sendMessages := 1, 100

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &DLQPolicy{
MaxDeliveries: uint32(maxRedeliveryCount),
DeadLetterTopic: dlqTopic,
InitialSubscriptionName: dlqSub,
},
Name: consumerName,
ReceiverQueueSize: sendMessages,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})
assert.Nil(t, err)
defer producer.Close()

// send messages
for i := 0; i < sendMessages; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}

// nack all messages
for i := 0; i < sendMessages*(maxRedeliveryCount+1); i++ {
ctx, canc := context.WithTimeout(context.Background(), 3*time.Second)
defer canc()
msg, _ := consumer.Receive(ctx)
if msg == nil {
break
}
consumer.Nack(msg)
}

// create dlq consumer
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: dlqTopic,
SubscriptionName: dlqSub,
})
assert.Nil(t, err)
defer dlqConsumer.Close()

for i := 0; i < sendMessages; i++ {
ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
err = dlqConsumer.Ack(msg)
assert.Nil(t, err)
}

// No more messages on the DLQ
ctx, canc := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer canc()
msg, err := dlqConsumer.Receive(ctx)
assert.Error(t, err)
assert.Nil(t, msg)

}

func TestDLQMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
Expand Down
1 change: 1 addition & 0 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
if opt.Name == "" {
opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)
}
opt.initialSubscriptionName = r.policy.InitialSubscriptionName

// the origin code sets to LZ4 compression with no options
// so the new design allows compression type to be overwritten but still set lz4 by default
Expand Down
6 changes: 6 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ type ProducerOptions struct {
// - ProducerAccessModeShared
// - ProducerAccessModeExclusive
ProducerAccessMode

// initialSubscriptionName Name of the initial subscription name of the dead letter topic.
// If this field is not set, the initial subscription for the dead letter topic will not be created.
// If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
// will fail to be created.
initialSubscriptionName string
}

// Producer is used to publish messages on a topic
Expand Down
1 change: 1 addition & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
Epoch: proto.Uint64(atomic.LoadUint64(&p.epoch)),
UserProvidedProducerName: proto.Bool(p.userProvidedProducerName),
ProducerAccessMode: toProtoProducerAccessMode(p.options.ProducerAccessMode).Enum(),
InitialSubscriptionName: proto.String(p.options.initialSubscriptionName),
}

if p.topicEpoch != nil {
Expand Down

0 comments on commit dad98f1

Please sign in to comment.