From 056fc31c9200f0ba5f157ef2621d25767bb4f51b Mon Sep 17 00:00:00 2001 From: congbo Date: Wed, 13 Apr 2022 16:43:02 +0800 Subject: [PATCH 1/7] [future][txn] Add max active txn config in transaction coordinator --- conf/broker.conf | 3 + .../pulsar/broker/ServiceConfiguration.java | 6 ++ .../TransactionMetadataStoreService.java | 3 +- .../pulsar/broker/service/ServerCnx.java | 17 +++- .../broker/transaction/TransactionTest.java | 6 +- .../TransactionCoordinatorConfigTest.java | 80 +++++++++++++++++ .../TransactionMetadataStoreProvider.java | 2 +- .../exceptions/CoordinatorException.java | 12 +++ ...InMemTransactionMetadataStoreProvider.java | 3 +- .../impl/MLTransactionMetadataStore.java | 86 +++++++++++-------- .../MLTransactionMetadataStoreProvider.java | 5 +- .../MLTransactionMetadataStoreTest.java | 18 ++-- .../TransactionMetadataStoreProviderTest.java | 2 +- 13 files changed, 183 insertions(+), 60 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java diff --git a/conf/broker.conf b/conf/broker.conf index e128632fcf714..3bc7118f8eb6b 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1386,6 +1386,9 @@ transactionBufferSnapshotMinTimeInMillis=5000 # The max concurrent requests for transaction buffer client, default is 1000 transactionBufferClientMaxConcurrentRequests=1000 +# The max active transactions per transaction coordinator +maxActiveTransactionsPerCoordinator=0 + ### --- Packages management service configuration variables (begin) --- ### # Enable the packages management service or not diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f28e086a9e9a1..823d5cdade6ae 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2525,6 +2525,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private long transactionBufferClientOperationTimeoutInMills = 3000L; + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "The max active transactions per transaction coordinator." + ) + private long maxActiveTransactionsPerCoordinator = 0L; + /**** --- KeyStore TLS config variables. --- ****/ @FieldContext( category = CATEGORY_KEYSTORE_TLS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 826b90a6b03cb..0f17c7b4dc7ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -254,7 +254,8 @@ public CompletableFuture openTransactionMetadataStore( timeoutTracker, tcId.getId()); return transactionMetadataStoreProvider .openStore(tcId, pulsarService.getManagedLedgerFactory(), v, - timeoutTracker, recoverTracker); + timeoutTracker, recoverTracker, + pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator()); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 09dbaa9e8d24b..1863d9ba10f1b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2211,11 +2211,20 @@ protected void handleNewTxn(CommandNewTxn command) { } commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId()); } else { - ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId); + if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) { + // if new txn throw ReachMaxActiveTxnException, don't return any response to client, + // otherwise client will retry, it will wast o lot of resources + // link https://github.com/apache/pulsar/issues/15133 + log.warn("New txn op reach max active transactions! tcId : {}, requestId : {}", + tcId.getId(), requestId, ex); + // do-nothing + } else { + ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId); - commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(), - BrokerServiceException.getClientErrorCode(ex), ex.getMessage()); - transactionMetadataStoreService.handleOpFail(ex, tcId); + commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(), + BrokerServiceException.getClientErrorCode(ex), ex.getMessage()); + transactionMetadataStoreService.handleOpFail(ex, tcId); + } } })); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 61d69661d6278..de4b155dfdc65 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -693,7 +693,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ MLTransactionMetadataStore metadataStore1 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1), mlTransactionLog, timeoutTracker, transactionRecoverTracker, - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().untilAsserted(() -> assertEquals(metadataStore1.getCoordinatorStats().state, "Ready")); @@ -707,7 +707,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ MLTransactionMetadataStore metadataStore2 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1), mlTransactionLog, timeoutTracker, transactionRecoverTracker, - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().untilAsserted(() -> assertEquals(metadataStore2.getCoordinatorStats().state, "Ready")); @@ -720,7 +720,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ MLTransactionMetadataStore metadataStore3 = new MLTransactionMetadataStore(new TransactionCoordinatorID(1), mlTransactionLog, timeoutTracker, transactionRecoverTracker, - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().untilAsserted(() -> assertEquals(metadataStore3.getCoordinatorStats().state, "Ready")); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java new file mode 100644 index 0000000000000..a691d6f2f3a78 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.coordinator; + + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.TransactionMetadataStoreService; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; + +@Test(groups = "broker") +public class TransactionCoordinatorConfigTest extends BrokerTestBase { + + @BeforeMethod + @Override + protected void setup() throws Exception { + ServiceConfiguration configuration = getDefaultConf(); + configuration.setTransactionCoordinatorEnabled(true); + configuration.setMaxActiveTransactionsPerCoordinator(2); + super.baseSetup(configuration); + admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testMaxActiveTxn() throws Exception { + pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()) + .enableTransaction(true).operationTimeout(3, TimeUnit.SECONDS).build(); + + // new two transaction will not reach max active txns + pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + try { + // new the third txn will timeout, broker will return any response + pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + fail(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException); + } + } +} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java index b2fd10ea9ba54..edcc42ded844f 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java @@ -68,5 +68,5 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th CompletableFuture openStore( TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, - TransactionRecoverTracker recoverTracker); + TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java index c8e7aea651651..954d891a188ac 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java @@ -119,4 +119,16 @@ public TransactionMetadataStoreStateException(TransactionCoordinatorID tcID, } } + + /** + * Exception is thrown when a operation of new transaction reach the number of max active transactions. + */ + public static class ReachMaxActiveTxnException extends CoordinatorException { + + private static final long serialVersionUID = 0L; + + public ReachMaxActiveTxnException(String message) { + super(message); + } + } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java index 4c4c04d1f94bd..8247aef4a88b8 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java @@ -37,7 +37,8 @@ public CompletableFuture openStore(TransactionCoordina ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, - TransactionRecoverTracker recoverTracker) { + TransactionRecoverTracker recoverTracker, + long maxActiveTransactionsPerCoordinator) { return CompletableFuture.completedFuture( new InMemTransactionMetadataStore(transactionCoordinatorId)); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index f93de8b017552..372818697c49c 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -75,12 +76,14 @@ public class MLTransactionMetadataStore private final LongAdder appendLogCount; private final MLTransactionSequenceIdGenerator sequenceIdGenerator; private final ExecutorService internalPinnedExecutor; + private final long maxActiveTransactionsPerCoordinator; public MLTransactionMetadataStore(TransactionCoordinatorID tcID, MLTransactionLogImpl mlTransactionLog, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker, - MLTransactionSequenceIdGenerator sequenceIdGenerator) { + MLTransactionSequenceIdGenerator sequenceIdGenerator, + long maxActiveTransactionsPerCoordinator) { super(State.None); this.sequenceIdGenerator = sequenceIdGenerator; this.tcID = tcID; @@ -88,6 +91,7 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID, this.timeoutTracker = timeoutTracker; this.transactionMetadataStoreStats = new TransactionMetadataStoreStats(); + this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator; this.createdTransactionCount = new LongAdder(); this.committedTransactionCount = new LongAdder(); this.abortedTransactionCount = new LongAdder(); @@ -207,44 +211,50 @@ public CompletableFuture getTxnMeta(TxnID txnID) { @Override public CompletableFuture newTransaction(long timeOut) { - CompletableFuture completableFuture = new CompletableFuture<>(); - internalPinnedExecutor.execute(() -> { - if (!checkIfReady()) { - completableFuture.completeExceptionally(new CoordinatorException - .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction")); - return; - } + if (this.maxActiveTransactionsPerCoordinator == 0 + || this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(() -> { + if (!checkIfReady()) { + completableFuture.completeExceptionally(new CoordinatorException + .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction")); + return; + } - long mostSigBits = tcID.getId(); - long leastSigBits = sequenceIdGenerator.generateSequenceId(); - TxnID txnID = new TxnID(mostSigBits, leastSigBits); - long currentTimeMillis = System.currentTimeMillis(); - TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() - .setTxnidMostBits(mostSigBits) - .setTxnidLeastBits(leastSigBits) - .setStartTime(currentTimeMillis) - .setTimeoutMs(timeOut) - .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) - .setLastModificationTime(currentTimeMillis) - .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); - transactionLog.append(transactionMetadataEntry) - .whenComplete((position, throwable) -> { - if (throwable != null) { - completableFuture.completeExceptionally(throwable); - } else { - appendLogCount.increment(); - TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); - List positions = new ArrayList<>(); - positions.add(position); - Pair> pair = MutablePair.of(txn, positions); - txnMetaMap.put(leastSigBits, pair); - this.timeoutTracker.addTransaction(leastSigBits, timeOut); - createdTransactionCount.increment(); - completableFuture.complete(txnID); - } - }); - }); - return completableFuture; + long mostSigBits = tcID.getId(); + long leastSigBits = sequenceIdGenerator.generateSequenceId(); + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + long currentTimeMillis = System.currentTimeMillis(); + TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() + .setTxnidMostBits(mostSigBits) + .setTxnidLeastBits(leastSigBits) + .setStartTime(currentTimeMillis) + .setTimeoutMs(timeOut) + .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW) + .setLastModificationTime(currentTimeMillis) + .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId()); + transactionLog.append(transactionMetadataEntry) + .whenComplete((position, throwable) -> { + if (throwable != null) { + completableFuture.completeExceptionally(throwable); + } else { + appendLogCount.increment(); + TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut); + List positions = new ArrayList<>(); + positions.add(position); + Pair> pair = MutablePair.of(txn, positions); + txnMetaMap.put(leastSigBits, pair); + this.timeoutTracker.addTransaction(leastSigBits, timeOut); + createdTransactionCount.increment(); + completableFuture.complete(txnID); + } + }); + }); + return completableFuture; + } else { + return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op " + + "reach max active txn! tcId : " + getTransactionCoordinatorID().getId())); + } } @Override diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index 0711f00ac709c..da5f2aa01d215 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -41,7 +41,8 @@ public CompletableFuture openStore(TransactionCoordina ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, - TransactionRecoverTracker recoverTracker) { + TransactionRecoverTracker recoverTracker, + long maxActiveTransactionsPerCoordinator) { MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator(); managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator); MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId, @@ -50,6 +51,6 @@ public CompletableFuture openStore(TransactionCoordina // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties. return txnLog.initialize().thenApply(__ -> new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, - recoverTracker, mlTransactionSequenceIdGenerator)); + recoverTracker, mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator)); } } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 2aa678059d44f..47db35f0f8c5b 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -76,7 +76,7 @@ public void testTransactionOperation() throws Exception { MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); int checkReplayRetryCount = 0; while (true) { checkReplayRetryCount++; @@ -151,7 +151,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().until(transactionMetadataStore::checkIfReady); TxnID txnID = transactionMetadataStore.newTransaction(20000).get(); @@ -181,7 +181,7 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().until(transactionMetadataStore::checkIfReady); txnID = transactionMetadataStore.newTransaction(100000).get(); @@ -206,7 +206,7 @@ public void testInitTransactionReader() throws Exception { MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); int checkReplayRetryCount = 0; while (true) { if (checkReplayRetryCount > 3) { @@ -249,7 +249,7 @@ public void testInitTransactionReader() throws Exception { MLTransactionMetadataStore transactionMetadataStoreTest = new MLTransactionMetadataStore(transactionCoordinatorID, txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); while (true) { if (checkReplayRetryCount > 6) { @@ -318,7 +318,7 @@ public void testDeleteLog() throws Exception { MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); int checkReplayRetryCount = 0; while (true) { if (checkReplayRetryCount > 3) { @@ -385,7 +385,7 @@ public void testRecoverWhenDeleteFromCursor() throws Exception { MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().until(transactionMetadataStore::checkIfReady); @@ -404,7 +404,7 @@ public void testRecoverWhenDeleteFromCursor() throws Exception { transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().until(transactionMetadataStore::checkIfReady); } @@ -426,7 +426,7 @@ public void testManageLedgerWriteFailState() throws Exception { MLTransactionMetadataStore transactionMetadataStore = new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(), - mlTransactionSequenceIdGenerator); + mlTransactionSequenceIdGenerator, 0L); Awaitility.await().until(transactionMetadataStore::checkIfReady); transactionMetadataStore.newTransaction(5000).get(); diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java index a54caaf16c17b..3f80d850b1af0 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java @@ -63,7 +63,7 @@ public TransactionMetadataStoreProviderTest(String providerClassName) throws Exc public void setup() throws Exception { this.tcId = new TransactionCoordinatorID(1L); this.store = this.provider.openStore(tcId, null, null, - null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl()).get(); + null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), 0L).get(); } @Test From 3e5f983acbc2eb832cb31aebc9fcff4e6fab2445 Mon Sep 17 00:00:00 2001 From: congbo Date: Wed, 13 Apr 2022 16:53:46 +0800 Subject: [PATCH 2/7] Add doc --- site2/docs/reference-configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index af0f3656568d9..3ce1d8f74700d 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -331,6 +331,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater |replicatorPrefix| Replicator prefix used for replicator producer name and cursor name pulsar.repl|| |transactionCoordinatorEnabled|Whether to enable transaction coordinator in broker.|true| |transactionMetadataStoreProviderClassName| |org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider| +|maxActiveTransactionsPerCoordinator| Max number of active transactions per transaction coordinator.|0| |defaultRetentionTimeInMinutes| Default message retention time |0| |defaultRetentionSizeInMB| Default retention size |0| |keepAliveIntervalSeconds| How often to check whether the connections are still alive |30| From 2e53f9693b47fc3b382b87cbdb2c6e04f7fe215e Mon Sep 17 00:00:00 2001 From: congbo Date: Wed, 13 Apr 2022 17:07:23 +0800 Subject: [PATCH 3/7] Fix some code style --- .../TransactionCoordinatorConfigTest.java | 12 +++--------- .../coordinator/impl/MLTransactionMetadataStore.java | 1 - 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java index a691d6f2f3a78..7f3ab8e0ebb75 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java @@ -19,27 +19,21 @@ package org.apache.pulsar.broker.transaction.coordinator; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; import com.google.common.collect.Sets; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; -import org.awaitility.Awaitility; -import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.concurrent.TimeUnit; - -import static org.testng.AssertJUnit.assertTrue; -import static org.testng.AssertJUnit.fail; - @Test(groups = "broker") public class TransactionCoordinatorConfigTest extends BrokerTestBase { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index 372818697c49c..0b0d789946c64 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; From 7ad48f51c7bbb210cac85784675d5be9e7ebf6b5 Mon Sep 17 00:00:00 2001 From: congbo Date: Wed, 13 Apr 2022 17:11:18 +0800 Subject: [PATCH 4/7] Add some test --- .../TransactionCoordinatorConfigTest.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java index 7f3ab8e0ebb75..53148f311dd3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java @@ -27,6 +27,7 @@ import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -60,9 +61,28 @@ public void testMaxActiveTxn() throws Exception { pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()) .enableTransaction(true).operationTimeout(3, TimeUnit.SECONDS).build(); - // new two transaction will not reach max active txns + // new two txn will not reach max active txns + Transaction commitTxn = + pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + Transaction abortTxn = + pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + try { + // new the third txn will timeout, broker will return any response + pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + fail(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException); + } + + // release active txn + commitTxn.commit().get(); + abortTxn.abort().get(); + + // two txn end, can continue new txn pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + + // reach max active txns again try { // new the third txn will timeout, broker will return any response pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); From a622555b48233ef6d1e387825b33c48cba670690 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Mon, 30 May 2022 19:19:14 +0800 Subject: [PATCH 5/7] fix conflic --- .../coordinator/TransactionCoordinatorConfigTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java index 53148f311dd3a..6a98184779a3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.transaction.coordinator; - import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; import com.google.common.collect.Sets; @@ -29,7 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -47,7 +46,7 @@ protected void setup() throws Exception { super.baseSetup(configuration); admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + admin.topics().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); } @AfterMethod(alwaysRun = true) From 57d0c7b042bc44aa0d3573c709e201af12d19c59 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Fri, 24 Jun 2022 01:00:17 +0800 Subject: [PATCH 6/7] fix some tests --- .../coordinator/TransactionCoordinatorConfigTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java index 6a98184779a3f..b72b28e52aed5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java @@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -46,7 +47,11 @@ protected void setup() throws Exception { super.baseSetup(configuration); admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); - admin.topics().createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); + pulsar.getPulsarResources() + .getNamespaceResources() + .getPartitionedTopicResources() + .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, + new PartitionedTopicMetadata(1)); } @AfterMethod(alwaysRun = true) From aaa3df6349aa3ed9afa0cd067e30903f902c8a96 Mon Sep 17 00:00:00 2001 From: congbobo184 Date: Fri, 24 Jun 2022 13:05:40 +0800 Subject: [PATCH 7/7] fix some comment --- conf/broker.conf | 2 +- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 36f25e1676be1..91e2005929675 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1432,7 +1432,7 @@ transactionBufferSnapshotMinTimeInMillis=5000 # The max concurrent requests for transaction buffer client, default is 1000 transactionBufferClientMaxConcurrentRequests=1000 -# The max active transactions per transaction coordinator +# The max active transactions per transaction coordinator, default value 0 indicates no limit. maxActiveTransactionsPerCoordinator=0 # MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 502fc40144d91..57183ad357277 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2594,7 +2594,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_TRANSACTION, - doc = "The max active transactions per transaction coordinator." + doc = "The max active transactions per transaction coordinator, default value 0 indicates no limit." ) private long maxActiveTransactionsPerCoordinator = 0L;