Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support fallback when resolveLockAsync #2651

Merged
merged 6 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/license-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
pull_request:
branches:
- master
- release-**

jobs:
check-license:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
branches: [ master,release-** ]

jobs:
fmt:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
*
* Copyright 2022 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.pingcap.tispark.safepoint

import com.pingcap.tikv.TiSession
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
*
* Copyright 2022 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.pingcap.tikv.exception;

public class NonAsyncCommitLockException extends RuntimeException {

public NonAsyncCommitLockException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void prewrite(
long startTs,
long lockTTL)
throws TiClientInternalException, KeyException, RegionException {
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null);
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null, false);
}

/**
Expand All @@ -437,7 +437,8 @@ public void prewrite(
long ttl,
boolean skipConstraintCheck,
boolean useAsyncCommit,
Iterable<ByteString> secondaries)
Iterable<ByteString> secondaries,
boolean fallbackTest)
throws TiClientInternalException, KeyException, RegionException {
boolean forWrite = true;
while (true) {
Expand All @@ -463,6 +464,10 @@ public void prewrite(
if (secondaries != null) {
builder.addAllSecondaries(secondaries);
}
// just for test
if (fallbackTest) {
builder.setMaxCommitTs(1);
}
}
return builder.build();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.pingcap.tikv.txn;

import com.google.protobuf.ByteString;
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
import com.pingcap.tikv.exception.ResolveLockException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -114,6 +115,10 @@ public synchronized void addKeys(
"unexpected timestamp, expected: %d, found: %d",
startTs, lockInfo.getLockVersion()));
}
if (!lockInfo.getUseAsyncCommit()) {
LOG.info("non-async commit lock found in async commit recovery");
throw new NonAsyncCommitLockException("non-async commit lock found");
}
if (!this.missingLock && lockInfo.getMinCommitTs() > this.commitTs) {
this.commitTs = lockInfo.getMinCommitTs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.pingcap.tikv.PDClient;
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.exception.KeyException;
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
import com.pingcap.tikv.exception.RegionException;
import com.pingcap.tikv.exception.TiClientInternalException;
import com.pingcap.tikv.exception.TiKVException;
Expand Down Expand Up @@ -124,21 +125,8 @@ public ResolveLockResult resolveLocks(
Set<Long> pushed = new HashSet<>(locks.size());

for (Lock l : locks) {
TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS);

if (status.getTtl() == 0) {
Set<RegionVerID> cleanRegion =
cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());

if (status.getPrimaryLock() != null && status.getPrimaryLock().getUseAsyncCommit()) {
resolveLockAsync(bo, l, status);
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
resolvePessimisticLock(bo, l, cleanRegion);
} else {
resolveLock(bo, l, status, cleanRegion);
}

} else {
TxnStatus status = resolve(l, bo, callerStartTS, cleanTxns, false);
if (status.getTtl() != 0) {
long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
msBeforeTxnExpired.update(msBeforeLockExpired);

Expand Down Expand Up @@ -169,6 +157,36 @@ public ResolveLockResult resolveLocks(
return new ResolveLockResult(msBeforeTxnExpired.value(), pushed);
}

private TxnStatus resolve(
Lock l,
BackOffer bo,
long callerStartTS,
Map<Long, Set<RegionVerID>> cleanTxns,
boolean forceSyncCommit) {
TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit);
if (status.getTtl() != 0) {
return status;
}
Set<RegionVerID> cleanRegion = cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());

if (status.getPrimaryLock() != null
&& status.getPrimaryLock().getUseAsyncCommit()
&& !forceSyncCommit) {
try {
resolveLockAsync(bo, l, status);
} catch (NonAsyncCommitLockException e) {
logger.info("fallback because of the non async commit lock");
return resolve(l, bo, callerStartTS, cleanTxns, true);
}
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
resolvePessimisticLock(bo, l, cleanRegion);
} else {
resolveLock(bo, l, status, cleanRegion);
}

return status;
}

private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cleanRegion) {
while (true) {
region = regionManager.getRegionByKey(lock.getKey());
Expand Down Expand Up @@ -225,7 +243,8 @@ private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cl
}
}

private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStartTS) {
private TxnStatus getTxnStatusFromLock(
BackOffer bo, Lock lock, long callerStartTS, boolean forceSyncCommit) {
long currentTS;

if (lock.getTtl() == 0) {
Expand All @@ -249,7 +268,8 @@ private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStart
callerStartTS,
currentTS,
rollbackIfNotExist,
lock);
lock,
forceSyncCommit);
} catch (TxnNotFoundException e) {
// If the error is something other than txnNotFoundErr, throw the error (network
// unavailable, tikv down, backoff timeout etc) to the caller.
Expand Down Expand Up @@ -293,7 +313,8 @@ private TxnStatus getTxnStatus(
Long callerStartTS,
Long currentTS,
boolean rollbackIfNotExist,
Lock lock) {
Lock lock,
boolean forceSyncCommit) {
TxnStatus status = getResolved(txnID);
if (status != null) {
return status;
Expand All @@ -317,6 +338,7 @@ private TxnStatus getTxnStatus(
.setCallerStartTs(callerStartTS)
.setCurrentTs(currentTS)
.setRollbackIfNotExist(rollbackIfNotExist)
.setForceSyncCommit(forceSyncCommit)
.build();
};

Expand Down Expand Up @@ -481,6 +503,9 @@ private AsyncResolveData checkAllSecondaries(BackOffer bo, Lock lock, TxnStatus
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
if (e.getCause() != null && e.getCause() instanceof NonAsyncCommitLockException) {
throw (NonAsyncCommitLockException) e.getCause();
}
logger.info("async commit recovery (sending CheckSecondaryLocks) finished with errors", e);
throw new TiKVException("Execution exception met.", e);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,13 @@ public void prewriteWtihoutSecondaryKeyTest() {
long startTs = session.getTimestamp().getVersion();
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
primaryKey,
primaryKeyValue,
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
secondaries,
false));

for (int i = 0; i < secondaryKeyList.size(); i++) {
if (i == secondarySize - 1) {
Expand All @@ -240,7 +246,8 @@ public void prewriteWtihoutSecondaryKeyTest() {
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null));
null,
false));
}
}

Expand Down Expand Up @@ -279,7 +286,8 @@ public void prewriteWtihoutPrimaryKeyTest() {
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null));
null,
false));
}

// skip commitString primary key
Expand Down Expand Up @@ -323,6 +331,67 @@ public void ttlExpiredTest() throws InterruptedException {
}
}

@Test
public void fallBackTest() {
if (!check()) {
return;
}

// Case 1: Fallback primary, read primary
String primaryKey = genRandomKey(64);
List<String> secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, false);
assertEquals(pointGet(primaryKey), oldValue);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}

// Case 2: Fallback primary, read secondary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, false);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}
assertEquals(pointGet(primaryKey), oldValue);

// Case 3: Fallback secondary, read primary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, false, true);
assertEquals(pointGet(primaryKey), oldValue);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}

// Case 4: Fallback secondary, read secondary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, false, true);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}
assertEquals(pointGet(primaryKey), oldValue);

// Case 5: Fallback both, read primary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, true);
assertEquals(pointGet(primaryKey), oldValue);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}

// Case 6: Fallback both, read secondary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, true);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}
assertEquals(pointGet(primaryKey), oldValue);
}

private boolean check() {
if (!init) {
skipTestInit();
Expand Down Expand Up @@ -365,7 +434,13 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
// prewriteString <primary key, value1, secondaries>
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
primaryKey,
primaryKeyValue,
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
secondaries,
false));

// prewriteString secondaryKeys
for (int i = 0; i < secondaryKeyList.size(); i++) {
Expand All @@ -376,7 +451,42 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null));
null,
false));
}
}

private void fallback(
String primaryKey,
List<String> secondaryKeyList,
boolean fallbackPrimary,
boolean fallbackSecondary) {
// put
putAll(primaryKey, secondaryKeyList);
// prewrite primary key
long startTs = session.getTimestamp().getVersion();
// prewrite secondaryKeys
List<ByteString> secondaries =
secondaryKeyList.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList());
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
primaryKey,
primaryKeyValue,
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
secondaries,
fallbackPrimary));
for (int i = 0; i < secondaryKeyList.size(); i++) {
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
secondaryKeyList.get(i),
secondaryKeyValueList[i],
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null,
fallbackSecondary));
}
}
}
Loading