diff --git a/.golangci.yml b/.golangci.yml index 1f1b42c5b..58ecd7542 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -18,7 +18,7 @@ # Run `make lint` from the root path of this project to check code with golangci-lint. run: - deadline: 6m + timeout: 5m linters: # Uncomment this line to run only the explicitly enabled linters diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 568dcaa96..98848d71e 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -532,8 +532,8 @@ func (pc *partitionConsumer) internalAckWithTxn(req *ackWithTxnRequest) { req.err = newError(ConsumerClosed, "Failed to ack by closing or closed consumer") return } - if req.Transaction.state != TxnOpen { - pc.log.WithField("state", req.Transaction.state).Error("Failed to ack by a non-open transaction.") + if req.Transaction.state.Load() != int32(TxnOpen) { + pc.log.WithField("state", req.Transaction.state.Load()).Error("Failed to ack by a non-open transaction.") req.err = newError(InvalidStatus, "Failed to ack by a non-open transaction.") return } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 7694c9671..371ffb6a8 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1135,8 +1135,8 @@ func (p *partitionProducer) prepareTransaction(sr *sendRequest) error { } txn := (sr.msg.Transaction).(*transaction) - if txn.state != TxnOpen { - p.log.WithField("state", txn.state).Error("Failed to send message" + + if txn.state.Load() != int32(TxnOpen) { + p.log.WithField("state", txn.state.Load()).Error("Failed to send message" + " by a non-open transaction.") return joinErrors(ErrTransaction, fmt.Errorf("failed to send message by a non-open transaction")) diff --git a/pulsar/transaction_impl.go b/pulsar/transaction_impl.go index 82e6fcfa7..a48978160 100644 --- a/pulsar/transaction_impl.go +++ b/pulsar/transaction_impl.go @@ -19,6 +19,8 @@ package pulsar import ( "context" + "errors" + "fmt" "sync" "sync/atomic" "time" @@ -33,9 +35,9 @@ type subscription struct { } type transaction struct { - sync.Mutex + mu sync.Mutex txnID TxnID - state TxnState + state atomic.Int32 tcClient *transactionCoordinatorClient registerPartitions map[string]bool registerAckSubscriptions map[subscription]bool @@ -54,7 +56,7 @@ type transaction struct { // 1. When the transaction is committed or aborted, a bool will be read from opsFlow chan. // 2. When the opsCount increment from 0 to 1, a bool will be read from opsFlow chan. opsFlow chan bool - opsCount int32 + opsCount atomic.Int32 opTimeout time.Duration log log.Logger } @@ -62,47 +64,52 @@ type transaction struct { func newTransaction(id TxnID, tcClient *transactionCoordinatorClient, timeout time.Duration) *transaction { transaction := &transaction{ txnID: id, - state: TxnOpen, registerPartitions: make(map[string]bool), registerAckSubscriptions: make(map[subscription]bool), opsFlow: make(chan bool, 1), - opTimeout: 5 * time.Second, + opTimeout: tcClient.client.operationTimeout, tcClient: tcClient, } - //This means there are not pending requests with this transaction. The transaction can be committed or aborted. + transaction.state.Store(int32(TxnOpen)) + // This means there are not pending requests with this transaction. The transaction can be committed or aborted. transaction.opsFlow <- true go func() { - //Set the state of the transaction to timeout after timeout + // Set the state of the transaction to timeout after timeout <-time.After(timeout) - atomic.CompareAndSwapInt32((*int32)(&transaction.state), int32(TxnOpen), int32(TxnTimeout)) + transaction.state.CompareAndSwap(int32(TxnOpen), int32(TxnTimeout)) }() transaction.log = tcClient.log.SubLogger(log.Fields{}) return transaction } func (txn *transaction) GetState() TxnState { - return txn.state + return TxnState(txn.state.Load()) } -func (txn *transaction) Commit(_ context.Context) error { - if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnCommitting)) || - txn.state == TxnCommitting) { - return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string()) +func (txn *transaction) Commit(ctx context.Context) error { + if !(txn.state.CompareAndSwap(int32(TxnOpen), int32(TxnCommitting))) { + txnState := txn.state.Load() + return newError(InvalidStatus, txnStateErrorMessage(TxnOpen, TxnState(txnState))) } - //Wait for all operations to complete + // Wait for all operations to complete select { case <-txn.opsFlow: + case <-ctx.Done(): + txn.state.Store(int32(TxnOpen)) + return ctx.Err() case <-time.After(txn.opTimeout): + txn.state.Store(int32(TxnTimeout)) return newError(TimeoutError, "There are some operations that are not completed after the timeout.") } - //Send commit transaction command to transaction coordinator + // Send commit transaction command to transaction coordinator err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_COMMIT) if err == nil { - atomic.StoreInt32((*int32)(&txn.state), int32(TxnCommitted)) + txn.state.Store(int32(TxnCommitted)) } else { - if e, ok := err.(*Error); ok && (e.Result() == TransactionNoFoundError || e.Result() == InvalidStatus) { - atomic.StoreInt32((*int32)(&txn.state), int32(TxnError)) + var e *Error + if errors.As(err, &e) && (e.Result() == TransactionNoFoundError || e.Result() == InvalidStatus) { + txn.state.Store(int32(TxnError)) return err } txn.opsFlow <- true @@ -110,40 +117,45 @@ func (txn *transaction) Commit(_ context.Context) error { return err } -func (txn *transaction) Abort(_ context.Context) error { - if !(atomic.CompareAndSwapInt32((*int32)(&txn.state), int32(TxnOpen), int32(TxnAborting)) || - txn.state == TxnAborting) { - return newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string()) +func (txn *transaction) Abort(ctx context.Context) error { + if !(txn.state.CompareAndSwap(int32(TxnOpen), int32(TxnAborting))) { + txnState := txn.state.Load() + return newError(InvalidStatus, txnStateErrorMessage(TxnOpen, TxnState(txnState))) } - //Wait for all operations to complete + // Wait for all operations to complete select { case <-txn.opsFlow: + case <-ctx.Done(): + txn.state.Store(int32(TxnOpen)) + return ctx.Err() case <-time.After(txn.opTimeout): + txn.state.Store(int32(TxnTimeout)) return newError(TimeoutError, "There are some operations that are not completed after the timeout.") } - //Send abort transaction command to transaction coordinator + // Send abort transaction command to transaction coordinator err := txn.tcClient.endTxn(&txn.txnID, pb.TxnAction_ABORT) if err == nil { - atomic.StoreInt32((*int32)(&txn.state), int32(TxnAborted)) + txn.state.Store(int32(TxnAborted)) } else { - if e, ok := err.(*Error); ok && (e.Result() == TransactionNoFoundError || e.Result() == InvalidStatus) { - atomic.StoreInt32((*int32)(&txn.state), int32(TxnError)) - } else { - txn.opsFlow <- true + var e *Error + if errors.As(err, &e) && (e.Result() == TransactionNoFoundError || e.Result() == InvalidStatus) { + txn.state.Store(int32(TxnError)) + return err } + txn.opsFlow <- true } return err } func (txn *transaction) registerSendOrAckOp() error { - if atomic.AddInt32(&txn.opsCount, 1) == 1 { - //There are new operations that not completed + if txn.opsCount.Add(1) == 1 { + // There are new operations that were not completed select { case <-txn.opsFlow: return nil case <-time.After(txn.opTimeout): - if _, err := txn.checkIfOpen(); err != nil { + if err := txn.verifyOpen(); err != nil { return err } return newError(TimeoutError, "Failed to get the semaphore to register the send/ack operation") @@ -154,23 +166,22 @@ func (txn *transaction) registerSendOrAckOp() error { func (txn *transaction) endSendOrAckOp(err error) { if err != nil { - atomic.StoreInt32((*int32)(&txn.state), int32(TxnError)) + txn.state.Store(int32(TxnError)) } - if atomic.AddInt32(&txn.opsCount, -1) == 0 { - //This means there are not pending send/ack requests + if txn.opsCount.Add(-1) == 0 { + // This means there are no pending send/ack requests txn.opsFlow <- true } } func (txn *transaction) registerProducerTopic(topic string) error { - isOpen, err := txn.checkIfOpen() - if !isOpen { + if err := txn.verifyOpen(); err != nil { return err } _, ok := txn.registerPartitions[topic] if !ok { - txn.Lock() - defer txn.Unlock() + txn.mu.Lock() + defer txn.mu.Unlock() if _, ok = txn.registerPartitions[topic]; !ok { err := txn.tcClient.addPublishPartitionToTxn(&txn.txnID, []string{topic}) if err != nil { @@ -183,8 +194,7 @@ func (txn *transaction) registerProducerTopic(topic string) error { } func (txn *transaction) registerAckTopic(topic string, subName string) error { - isOpen, err := txn.checkIfOpen() - if !isOpen { + if err := txn.verifyOpen(); err != nil { return err } sub := subscription{ @@ -193,8 +203,8 @@ func (txn *transaction) registerAckTopic(topic string, subName string) error { } _, ok := txn.registerAckSubscriptions[sub] if !ok { - txn.Lock() - defer txn.Unlock() + txn.mu.Lock() + defer txn.mu.Unlock() if _, ok = txn.registerAckSubscriptions[sub]; !ok { err := txn.tcClient.addSubscriptionToTxn(&txn.txnID, topic, subName) if err != nil { @@ -210,14 +220,15 @@ func (txn *transaction) GetTxnID() TxnID { return txn.txnID } -func (txn *transaction) checkIfOpen() (bool, error) { - if txn.state == TxnOpen { - return true, nil +func (txn *transaction) verifyOpen() error { + txnState := txn.state.Load() + if txnState != int32(TxnOpen) { + return newError(InvalidStatus, txnStateErrorMessage(TxnOpen, TxnState(txnState))) } - return false, newError(InvalidStatus, "Expect transaction state is TxnOpen but "+txn.state.string()) + return nil } -func (state TxnState) string() string { +func (state TxnState) String() string { switch state { case TxnOpen: return "TxnOpen" @@ -237,3 +248,8 @@ func (state TxnState) string() string { return "Unknown" } } + +//nolint:unparam +func txnStateErrorMessage(expected, actual TxnState) string { + return fmt.Sprintf("Expected transaction state: %s, actual: %s", expected, actual) +} diff --git a/pulsar/transaction_test.go b/pulsar/transaction_test.go index eb88c7067..75b36ea88 100644 --- a/pulsar/transaction_test.go +++ b/pulsar/transaction_test.go @@ -29,7 +29,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestTCClient(t *testing.T) { +const txnTimeout = 10 * time.Minute + +func TestTxn_TCClient(t *testing.T) { //1. Prepare: create PulsarClient and init transaction coordinator client. topic := newTopicName() sub := "my-sub" @@ -52,13 +54,13 @@ func TestTCClient(t *testing.T) { stats, err := transactionStats(id1) assert.NoError(t, err) assert.Equal(t, "OPEN", stats["status"]) - producedPartitions := stats["producedPartitions"].(map[string]interface{}) - ackedPartitions := stats["ackedPartitions"].(map[string]interface{}) + producedPartitions := stats["producedPartitions"].(map[string]any) + ackedPartitions := stats["ackedPartitions"].(map[string]any) _, ok := producedPartitions[topic] assert.True(t, ok) temp, ok := ackedPartitions[topic] assert.True(t, ok) - subscriptions := temp.(map[string]interface{}) + subscriptions := temp.(map[string]any) _, ok = subscriptions[sub] assert.True(t, ok) //5. Test End transaction @@ -78,12 +80,12 @@ func TestTCClient(t *testing.T) { } else { assert.Equal(t, err.Error(), "http error status code: 404") } - defer consumer.Close() - defer tc.close() - defer client.Close() + consumer.Close() + tc.close() + client.Close() } -//Test points: +// Test points: // 1. Abort and commit txn. // 1. Do nothing, just open a transaction and then commit it or abort it. // The operations of committing/aborting txn should success at the first time and fail at the second time. @@ -96,8 +98,8 @@ func TestTCClient(t *testing.T) { // 1. Register ack topic and send topic, and call http request to get the stats of the transaction // to do verification. -// TestTxnImplCommitOrAbort Test abort and commit txn -func TestTxnImplCommitOrAbort(t *testing.T) { +// Test abort and commit txn +func TestTxn_ImplCommitOrAbort(t *testing.T) { tc, _ := createTcClient(t) //1. Open a transaction and then commit it. //The operations of committing txn1 should success at the first time and fail at the second time. @@ -105,20 +107,20 @@ func TestTxnImplCommitOrAbort(t *testing.T) { err := txn1.Commit(context.Background()) require.Nil(t, err, fmt.Sprintf("Failed to commit the transaction %d:%d\n", txn1.txnID.MostSigBits, txn1.txnID.LeastSigBits)) - txn1.state = TxnOpen + txn1.state.Store(int32(TxnOpen)) txn1.opsFlow <- true err = txn1.Commit(context.Background()) assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError) assert.Equal(t, txn1.GetState(), TxnError) //2. Open a transaction and then abort it. //The operations of aborting txn2 should success at the first time and fail at the second time. - id2, err := tc.newTransaction(time.Hour) - require.Nil(t, err, "Failed to new a transaction") - txn2 := newTransaction(*id2, tc, time.Hour) + id2, err := tc.newTransaction(txnTimeout) + require.Nil(t, err, "Failed to create a transaction") + txn2 := newTransaction(*id2, tc, txnTimeout) err = txn2.Abort(context.Background()) require.Nil(t, err, fmt.Sprintf("Failed to abort the transaction %d:%d\n", id2.MostSigBits, id2.LeastSigBits)) - txn2.state = TxnOpen + txn2.state.Store(int32(TxnOpen)) txn2.opsFlow <- true err = txn2.Abort(context.Background()) assert.Equal(t, err.(*Error).Result(), TransactionNoFoundError) @@ -129,8 +131,8 @@ func TestTxnImplCommitOrAbort(t *testing.T) { assert.Equal(t, err.(*Error).Result(), InvalidStatus) } -// TestRegisterOpAndEndOp Test the internal API including the registerSendOrAckOp and endSendOrAckOp. -func TestRegisterOpAndEndOp(t *testing.T) { +// Test the internal API including the registerSendOrAckOp and endSendOrAckOp. +func TestTxn_RegisterOpAndEndOp(t *testing.T) { tc, _ := createTcClient(t) //1. Register 4 operation but only end 3 operations, the transaction can not be committed or aborted. res := registerOpAndEndOp(t, tc, 4, 3, nil, true) @@ -151,8 +153,8 @@ func TestRegisterOpAndEndOp(t *testing.T) { assert.Equal(t, res.(*Error).Result(), InvalidStatus) } -// TestRegisterTopic Test the internal API, registerAckTopic and registerProducerTopic -func TestRegisterTopic(t *testing.T) { +// Test the internal API, registerAckTopic and registerProducerTopic +func TestTxn_RegisterTopic(t *testing.T) { //1. Prepare: create PulsarClient and init transaction coordinator client. topic := newTopicName() sub := "my-sub" @@ -172,11 +174,11 @@ func TestRegisterTopic(t *testing.T) { //4. Call http request to get the stats of the transaction to do verification. stats2, err := transactionStats(&txn.txnID) assert.NoError(t, err) - topics := stats2["producedPartitions"].(map[string]interface{}) - subTopics := stats2["ackedPartitions"].(map[string]interface{}) + topics := stats2["producedPartitions"].(map[string]any) + subTopics := stats2["ackedPartitions"].(map[string]any) assert.NotNil(t, topics[topic]) assert.NotNil(t, subTopics[topic]) - subs := subTopics[topic].(map[string]interface{}) + subs := subTopics[topic].(map[string]any) assert.NotNil(t, subs[sub]) } @@ -198,9 +200,9 @@ func registerOpAndEndOp(t *testing.T, tc *transactionCoordinatorClient, rp int, } func createTxn(tc *transactionCoordinatorClient, t *testing.T) *transaction { - id, err := tc.newTransaction(time.Hour) - require.Nil(t, err, "Failed to new a transaction.") - return newTransaction(*id, tc, time.Hour) + id, err := tc.newTransaction(txnTimeout) + require.Nil(t, err, "Failed to create a new transaction.") + return newTransaction(*id, tc, txnTimeout) } // createTcClient Create a transaction coordinator client to send request @@ -216,7 +218,7 @@ func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) { return c.(*client).tcClient, c.(*client) } -// TestConsumeAndProduceWithTxn is a test function that validates the behavior of producing and consuming +// Validate the behavior of producing and consuming // messages with and without transactions. It consists of the following steps: // // 1. Prepare: Create a PulsarClient and initialize the transaction coordinator client. @@ -230,7 +232,7 @@ func createTcClient(t *testing.T) (*transactionCoordinatorClient, *client) { // // The test ensures that the consumer can only receive messages sent with a transaction after it is committed, // and that it can always receive messages sent without a transaction. -func TestConsumeAndProduceWithTxn(t *testing.T) { +func TestTxn_ConsumeAndProduce(t *testing.T) { // Step 1: Prepare - Create PulsarClient and initialize the transaction coordinator client. topic := newTopicName() sub := "my-sub" @@ -249,7 +251,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) { // Step 3: Open a transaction, send 10 messages with the transaction and 10 messages without the transaction. // Expectation: We can receive the 10 messages sent without a transaction and // cannot receive the 10 messages sent with the transaction. - txn, err := client.NewTransaction(time.Hour) + txn, err := client.NewTransaction(txnTimeout) require.Nil(t, err) for i := 0; i < 10; i++ { _, err = producer.Send(context.Background(), &ProducerMessage{ @@ -289,7 +291,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) { // Acknowledge the rest of the 10 messages with the transaction. // Expectation: After committing the transaction, all messages of the subscription will be acknowledged. _ = txn.Commit(context.Background()) - txn, err = client.NewTransaction(time.Hour) + txn, err = client.NewTransaction(txnTimeout) require.Nil(t, err) for i := 0; i < 9; i++ { msg, _ := consumer.Receive(context.Background()) @@ -307,7 +309,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) { // Create a goroutine to attempt receiving a message and send it to the 'done1' channel. done2 := make(chan Message) go func() { - consumer.Receive(context.Background()) + _, _ = consumer.Receive(context.Background()) close(done2) }() @@ -323,7 +325,7 @@ func TestConsumeAndProduceWithTxn(t *testing.T) { producer.Close() } -func TestAckAndSendWithTxn(t *testing.T) { +func TestTxn_AckAndSend(t *testing.T) { // Prepare: Create PulsarClient and initialize the transaction coordinator client. sourceTopic := newTopicName() sinkTopic := newTopicName() @@ -357,7 +359,7 @@ func TestAckAndSendWithTxn(t *testing.T) { } // Open a transaction and consume messages from the source topic while sending messages to the sink topic. - txn, err := client.NewTransaction(time.Hour) + txn, err := client.NewTransaction(txnTimeout) require.Nil(t, err) for i := 0; i < 10; i++ { @@ -393,7 +395,7 @@ func TestAckAndSendWithTxn(t *testing.T) { sinkProducer.Close() } -func TestTransactionAbort(t *testing.T) { +func TestTxn_TransactionAbort(t *testing.T) { // Prepare: Create PulsarClient and initialize the transaction coordinator client. topic := newTopicName() sub := "my-sub" @@ -410,7 +412,7 @@ func TestTransactionAbort(t *testing.T) { }) // Open a transaction and send 10 messages with the transaction. - txn, err := client.NewTransaction(time.Hour) + txn, err := client.NewTransaction(txnTimeout) require.Nil(t, err) for i := 0; i < 10; i++ { @@ -449,7 +451,7 @@ func consumerShouldNotReceiveMessage(t *testing.T, consumer Consumer) { } } -func TestTransactionAckChunkMessage(t *testing.T) { +func TestTxn_AckChunkMessage(t *testing.T) { topic := newTopicName() sub := "my-sub" @@ -457,7 +459,7 @@ func TestTransactionAckChunkMessage(t *testing.T) { _, client := createTcClient(t) // Create transaction and register the send operation. - txn, err := client.NewTransaction(time.Hour) + txn, err := client.NewTransaction(txnTimeout) require.Nil(t, err) // Create a producer with chunking enabled to send a large message that will be split into chunks. @@ -487,13 +489,13 @@ func TestTransactionAckChunkMessage(t *testing.T) { }) require.NoError(t, err) _, ok := msgID.(*chunkMessageID) - require.True(t, ok) + require.True(t, ok, fmt.Sprintf("Expected message ID of type *chunkMessageID, got type %T", msgID)) err = txn.Commit(context.Background()) require.Nil(t, err) // Receive the message using a new transaction and ack it. - txn2, err := client.NewTransaction(time.Hour) + txn2, err := client.NewTransaction(txnTimeout) require.Nil(t, err) message, err := consumer.Receive(context.Background()) require.Nil(t, err) @@ -501,7 +503,7 @@ func TestTransactionAckChunkMessage(t *testing.T) { err = consumer.AckWithTxn(message, txn2) require.Nil(t, err) - txn2.Abort(context.Background()) + _ = txn2.Abort(context.Background()) // Close the consumer to simulate reconnection and receive the same message again. consumer.Close() @@ -518,7 +520,7 @@ func TestTransactionAckChunkMessage(t *testing.T) { require.NotNil(t, message) // Create a new transaction and ack the message again. - txn3, err := client.NewTransaction(time.Hour) + txn3, err := client.NewTransaction(txnTimeout) require.Nil(t, err) err = consumer.AckWithTxn(message, txn3) @@ -541,7 +543,7 @@ func TestTransactionAckChunkMessage(t *testing.T) { consumerShouldNotReceiveMessage(t, consumer) } -func TestTxnConnReconnect(t *testing.T) { +func TestTxn_ConnReconnect(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -593,3 +595,8 @@ func TestTxnConnReconnect(t *testing.T) { err = txn.Commit(context.Background()) assert.NoError(t, err) } + +func TestTxn_txnStateErrorMessage(t *testing.T) { + expected := "Expected transaction state: TxnOpen, actual: TxnTimeout" + assert.Equal(t, expected, txnStateErrorMessage(TxnOpen, TxnTimeout)) +}