Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Fix the transaction acknowledgement and send logic for chunk message #1069

Merged
merged 7 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,10 @@ func (pc *partitionConsumer) ackIDCommon(msgID MessageID, withResponse bool, txn
}

if cmid, ok := msgID.(*chunkMessageID); ok {
return pc.unAckChunksTracker.ack(cmid)
if txn == nil {
return pc.unAckChunksTracker.ack(cmid)
}
return pc.unAckChunksTracker.ackWithTxn(cmid, txn)
}

trackingID := toTrackingMessageID(msgID)
Expand Down Expand Up @@ -2202,9 +2205,19 @@ func (u *unAckChunksTracker) remove(cmid *chunkMessageID) {
}

func (u *unAckChunksTracker) ack(cmid *chunkMessageID) error {
return u.ackWithTxn(cmid, nil)
}

func (u *unAckChunksTracker) ackWithTxn(cmid *chunkMessageID, txn Transaction) error {
ids := u.get(cmid)
for _, id := range ids {
if err := u.pc.AckID(id); err != nil {
var err error
if txn == nil {
err = u.pc.AckID(id)
} else {
err = u.pc.AckIDWithTxn(id, txn)
}
if err != nil {
return err
}
}
Expand Down
8 changes: 1 addition & 7 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,14 +979,11 @@ func (p *partitionProducer) failTimeoutMessages() {
WithField("properties", sr.msg.Properties)
}

if sr.callback != nil {
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
sr.callbackOnce.Do(func() {
sr.callback(nil, sr.msg, errSendTimeout)
liangyepianzhou marked this conversation as resolved.
Show resolved Hide resolved
})
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}

// flag the send has completed with error, flush make no effect
Expand Down Expand Up @@ -1255,9 +1252,6 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}

// Mark this pending item as done
Expand Down
109 changes: 107 additions & 2 deletions pulsar/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ func TestTransactionAbort(t *testing.T) {
// Abort the transaction.
_ = txn.Abort(context.Background())

consumerShouldNotReceiveMessage(t, consumer)

// Clean up: Close the consumer and producer instances.
consumer.Close()
producer.Close()
}

func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) {
// Expectation: The consumer should not receive any messages.
done := make(chan struct{})
go func() {
Expand All @@ -438,8 +446,105 @@ func TestTransactionAbort(t *testing.T) {
require.Fail(t, "The consumer should not receive any messages")
case <-time.After(time.Second):
}
}

// Clean up: Close the consumer and producer instances.
func TestSendAndAckChunkMessage(t *testing.T) {
topic := newTopicName()
sub := "my-sub"

// Prepare: Create PulsarClient and initialize the transaction coordinator client.
_, client := createTcClient(t)

// Create transaction and register the send operation.
txn, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
txn.(*transaction).registerSendOrAckOp()

// Create a producer with chunking enabled to send a large message that will be split into chunks.
producer, err := client.CreateProducer(ProducerOptions{
Name: "test",
Topic: topic,
EnableChunking: true,
DisableBatching: true,
})
require.NoError(t, err)
require.NotNil(t, producer)
defer producer.Close()

// Subscribe to the consumer.
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: sub,
})
require.NoError(t, err)
defer consumer.Close()

// Send a large message that will be split into chunks.
msgID, err := producer.Send(context.Background(), &ProducerMessage{
Transaction: txn,
Payload: createTestMessagePayload(_brokerMaxMessageSize),
})
require.NoError(t, err)
_, ok := msgID.(*chunkMessageID)
require.True(t, ok)

// Attempt to commit the transaction, it should not succeed at this point.
err = txn.Commit(context.Background())
require.NotNil(t, err)

// End the previously registered send operation, allowing the transaction to commit successfully.
txn.(*transaction).endSendOrAckOp(nil)

// Commit the transaction successfully now.
err = txn.Commit(context.Background())
require.Nil(t, err)

// Receive the message using a new transaction and ack it.
txn2, err := client.NewTransaction(time.Hour)
require.Nil(t, err)
message, err := consumer.Receive(context.Background())
require.Nil(t, err)

err = consumer.AckWithTxn(message, txn2)
require.Nil(t, err)

txn2.Abort(context.Background())

// Close the consumer to simulate reconnection and receive the same message again.
consumer.Close()
producer.Close()

// Subscribe to the consumer again.
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: sub,
})
require.Nil(t, err)
message, err = consumer.Receive(context.Background())
require.Nil(t, err)
require.NotNil(t, message)

// Create a new transaction and ack the message again.
txn3, err := client.NewTransaction(time.Hour)
require.Nil(t, err)

err = consumer.AckWithTxn(message, txn3)
require.Nil(t, err)

// Commit the third transaction.
err = txn3.Commit(context.Background())
require.Nil(t, err)

// Close the consumer again.
consumer.Close()

// Subscribe to the consumer again and verify that no message is received.
consumer, err = client.Subscribe(ConsumerOptions{
Topic: topic,
Type: Exclusive,
SubscriptionName: sub,
})
require.Nil(t, err)
consumerShouldNotReceiveMessage(t, consumer)
}