Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature][transaction] Add a configuration to control max active transaction of coordinator #15157

3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,9 @@ transactionBufferSnapshotMinTimeInMillis=5000
# The max concurrent requests for transaction buffer client, default is 1000
transactionBufferClientMaxConcurrentRequests=1000

# The max active transactions per transaction coordinator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# The max active transactions per transaction coordinator
# The max active transactions per transaction coordinator, default value 0 indicates no limit.

maxActiveTransactionsPerCoordinator=0

# MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex,
# It stores the position in pendingAckStore as its value and saves a position used to determine
# whether the previous data can be cleaned up as a key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long transactionBufferClientOperationTimeoutInMills = 3000L;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "The max active transactions per transaction coordinator."
Copy link
Contributor

@gaoran10 gaoran10 Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
doc = "The max active transactions per transaction coordinator."
doc = "The max active transactions per transaction coordinator, default value 0 indicates no limit."

)
private long maxActiveTransactionsPerCoordinator = 0L;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "MLPendingAckStore maintain a ConcurrentSkipListMap pendingAckLogIndex`,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(
timeoutTracker, tcId.getId());
return transactionMetadataStoreProvider
.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
timeoutTracker, recoverTracker);
timeoutTracker, recoverTracker,
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2221,11 +2221,20 @@ protected void handleNewTxn(CommandNewTxn command) {
}
commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId());
} else {
ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);
if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) {
// if new txn throw ReachMaxActiveTxnException, don't return any response to client,
// otherwise client will retry, it will wast o lot of resources
// link https://github.com/apache/pulsar/issues/15133
log.warn("New txn op reach max active transactions! tcId : {}, requestId : {}",
tcId.getId(), requestId, ex);
// do-nothing
} else {
ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);

commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
transactionMetadataStoreService.handleOpFail(ex, tcId);
commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
doNothing().when(timeoutTracker).start();
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
metadataStore1.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
Expand All @@ -708,7 +708,8 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{

MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);

mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
metadataStore2.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
Expand All @@ -721,7 +722,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{

MLTransactionMetadataStore metadataStore3 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
metadataStore3.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.coordinator;

import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
import com.google.common.collect.Sets;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class TransactionCoordinatorConfigTest extends BrokerTestBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
ServiceConfiguration configuration = getDefaultConf();
configuration.setTransactionCoordinatorEnabled(true);
configuration.setMaxActiveTransactionsPerCoordinator(2);
super.baseSetup(configuration);
admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
pulsar.getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(1));
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testMaxActiveTxn() throws Exception {
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
.enableTransaction(true).operationTimeout(3, TimeUnit.SECONDS).build();

// new two txn will not reach max active txns
Transaction commitTxn =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
Transaction abortTxn =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
try {
// new the third txn will timeout, broker will return any response
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
}

// release active txn
commitTxn.commit().get();
abortTxn.abort().get();

// two txn end, can continue new txn
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();

// reach max active txns again
try {
// new the third txn will timeout, broker will return any response
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th
CompletableFuture<TransactionMetadataStore> openStore(
TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker);
TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator);
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,16 @@ public TransactionMetadataStoreStateException(TransactionCoordinatorID tcID,

}
}

/**
* Exception is thrown when a operation of new transaction reach the number of max active transactions.
*/
public static class ReachMaxActiveTxnException extends CoordinatorException {

private static final long serialVersionUID = 0L;

public ReachMaxActiveTxnException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
TransactionRecoverTracker recoverTracker,
long maxActiveTransactionsPerCoordinator) {
return CompletableFuture.completedFuture(
new InMemTransactionMetadataStore(transactionCoordinatorId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,21 @@ public class MLTransactionMetadataStore
private final LongAdder appendLogCount;
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
private final ExecutorService internalPinnedExecutor;
private final long maxActiveTransactionsPerCoordinator;

public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
TransactionTimeoutTracker timeoutTracker,
MLTransactionSequenceIdGenerator sequenceIdGenerator) {
MLTransactionSequenceIdGenerator sequenceIdGenerator,
long maxActiveTransactionsPerCoordinator) {
super(State.None);
this.sequenceIdGenerator = sequenceIdGenerator;
this.tcID = tcID;
this.transactionLog = mlTransactionLog;
this.timeoutTracker = timeoutTracker;
this.transactionMetadataStoreStats = new TransactionMetadataStoreStats();

this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator;
this.createdTransactionCount = new LongAdder();
this.committedTransactionCount = new LongAdder();
this.abortedTransactionCount = new LongAdder();
Expand Down Expand Up @@ -219,44 +222,50 @@ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {

@Override
public CompletableFuture<TxnID> newTransaction(long timeOut) {
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
completableFuture.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
return;
}
if (this.maxActiveTransactionsPerCoordinator == 0
|| this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) {
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
completableFuture.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
return;
}

long mostSigBits = tcID.getId();
long leastSigBits = sequenceIdGenerator.generateSequenceId();
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(mostSigBits)
.setTxnidLeastBits(leastSigBits)
.setStartTime(currentTimeMillis)
.setTimeoutMs(timeOut)
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
transactionLog.append(transactionMetadataEntry)
.whenComplete((position, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
appendLogCount.increment();
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
txnMetaMap.put(leastSigBits, pair);
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
createdTransactionCount.increment();
completableFuture.complete(txnID);
}
});
});
return completableFuture;
long mostSigBits = tcID.getId();
long leastSigBits = sequenceIdGenerator.generateSequenceId();
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(mostSigBits)
.setTxnidLeastBits(leastSigBits)
.setStartTime(currentTimeMillis)
.setTimeoutMs(timeOut)
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
transactionLog.append(transactionMetadataEntry)
.whenComplete((position, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
} else {
appendLogCount.increment();
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
txnMetaMap.put(leastSigBits, pair);
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
createdTransactionCount.increment();
completableFuture.complete(txnID);
}
});
});
return completableFuture;
} else {
return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op "
+ "reach max active txn! tcId : " + getTransactionCoordinatorID().getId()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
TransactionRecoverTracker recoverTracker,
long maxActiveTransactionsPerCoordinator) {
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
Expand All @@ -50,6 +51,6 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
// MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
return txnLog.initialize().thenCompose(__ ->
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
mlTransactionSequenceIdGenerator).init(recoverTracker));
mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker));
}
}
Loading