Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(db/trans-cache): optimize for bloomFilter initialization #5394

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions chainbase/src/main/java/org/tron/core/ChainBaseManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.tron.core.db.PbftSignDataStore;
import org.tron.core.db.RecentBlockStore;
import org.tron.core.db.RecentTransactionStore;
import org.tron.core.db.TransactionCache;
import org.tron.core.db.TransactionStore;
import org.tron.core.db2.core.ITronChainBase;
import org.tron.core.exception.BadItemException;
Expand Down Expand Up @@ -237,6 +238,9 @@ public class ChainBaseManager {
@Autowired
private DbStatService dbStatService;

@Autowired
private TransactionCache transactionCache;

@Getter
@Setter
private NodeType nodeType;
Expand Down Expand Up @@ -291,6 +295,7 @@ public void closeAllStore() {
closeOneStore(pbftSignDataStore);
closeOneStore(sectionBloomStore);
closeOneStore(accountAssetStore);
closeOneStore(transactionCache);
}

// for test only
Expand Down
149 changes: 146 additions & 3 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,24 @@
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.primitives.Longs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.bouncycastle.util.encoders.Hex;
Expand All @@ -17,6 +31,7 @@
import org.tron.common.storage.leveldb.LevelDbDataSourceImpl;
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.JsonUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.capsule.BytesCapsule;
Expand All @@ -42,6 +57,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
private BloomFilter<byte[]>[] bloomFilters = new BloomFilter[2];
// filterStartBlock record the start block of the active filter
private volatile long filterStartBlock = INVALID_BLOCK;
private volatile long currentBlockNum = INVALID_BLOCK;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A check for currentBlockNum with block header is needed?

Node shutdown using Kill -9 may result in DBS being inconsistent, so this PR only works with a graceful shutdown? Just for a confirm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A check for currentBlockNum with block header is needed?

Good question, but how might it be inconsistent? Yes ,a graceful shutdown is required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two questions, with no relation. A check just for a double check, seems it introduces no more benefit.

how might it be inconsistent

The whole shutdown process is complicated, still thinking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the empty blocks will make different, a check is not required.

// currentFilterIndex records the index of the active filter
private volatile int currentFilterIndex = 0;

Expand All @@ -57,6 +73,12 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
// replace persistentStore and optimizes startup performance
private RecentTransactionStore recentTransactionStore;

private final Path cacheFile0;
private final Path cacheFile1;
private final Path cacheProperties;
private final Path cacheDir;
private AtomicBoolean isValid = new AtomicBoolean(false);

public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
this.name = name;
this.TRANSACTION_COUNT =
Expand Down Expand Up @@ -85,6 +107,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
this.bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
cacheDir = Paths.get(CommonParameter.getInstance().getOutputDirectory(), ".cache");
this.cacheFile0 = Paths.get(cacheDir.toString(), "bloomFilters_0");
this.cacheFile1 = Paths.get(cacheDir.toString(), "bloomFilters_1");
this.cacheProperties = Paths.get(cacheDir.toString(), "txCache.properties");

}

Expand All @@ -110,6 +136,10 @@ private void initCache() {
}

public void init() {
if (recovery()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Recovery failure will affect the initialization logic
  2. Some exceptions are caught, and some are not. Why is this happening
  3. Logically speaking, as long as there is an exception, it is a failure, so there is no need to handle those exceptions, so you only need to handle the exception at the outermost layer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

isValid.set(true);
return;
}
long size = recentTransactionStore.size();
if (size != MAX_BLOCK_SIZE) {
// 0. load from persistentStore
Expand All @@ -129,6 +159,7 @@ public void init() {
logger.info("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms.",
bloomFilters[1].approximateElementCount(), bloomFilters[1].expectedFpp(),
System.currentTimeMillis() - start);
isValid.set(true);
}

@Override
Expand Down Expand Up @@ -172,7 +203,7 @@ public void put(byte[] key, byte[] value) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
}
bloomFilters[currentFilterIndex].put(key);

currentBlockNum = blockNum;
if (lastMetricBlock != blockNum) {
lastMetricBlock = blockNum;
Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE,
Expand Down Expand Up @@ -208,13 +239,15 @@ public Iterator<Entry<byte[], byte[]>> iterator() {
}

@Override
public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
public synchronized void flush(Map<WrappedByteArray, WrappedByteArray> batch) {
isValid.set(false);
batch.forEach((k, v) -> this.put(k.getBytes(), v.getBytes()));
isValid.set(true);
}

@Override
public void close() {
reset();
dump();
bloomFilters[0] = null;
bloomFilters[1] = null;
persistentStore.close();
Expand All @@ -224,6 +257,116 @@ public void close() {
public void reset() {
}

private boolean recovery() {
FileUtil.createDirIfNotExists(this.cacheDir.toString());
logger.info("recovery bloomFilters start.");
CompletableFuture<Boolean> loadProperties = CompletableFuture.supplyAsync(this::loadProperties);
CompletableFuture<Boolean> tk0 = loadProperties.thenApplyAsync(
v -> recovery(0, this.cacheFile0));
CompletableFuture<Boolean> tk1 = loadProperties.thenApplyAsync(
v -> recovery(1, this.cacheFile1));

return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(this::handleException).join();
}

private boolean recovery(int index, Path file) {
try (InputStream in = new BufferedInputStream(Files.newInputStream(file,
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DELETE_ON_CLOSE means the bloomFilters files only work once, the situation that node recovery finished but shutdown immediately is ignored, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill -9 or kill -15 ? if kill -15 , bloomFilters will be dump again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the situation that node recovery finished but shutdown immediately is ignored, right?

Sorry, i've resubmitted it to avoid dump wrong cache, please recheck it.

logger.info("recovery bloomFilter[{}] from file.", index);
long start = System.currentTimeMillis();
bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel());
logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean handleException(Throwable e) {
bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
try {
Files.deleteIfExists(this.cacheFile0);
Files.deleteIfExists(this.cacheFile1);
} catch (Exception ignored) {

}
logger.info("recovery bloomFilters failed. {}", e.getMessage());
logger.info("rollback to previous mode.");
return false;
}

private void dump() {
if (!isValid.get()) {
logger.info("bloomFilters is not valid.");
}
FileUtil.createDirIfNotExists(this.cacheDir.toString());
logger.info("dump bloomFilters start.");
CompletableFuture<Void> task0 = CompletableFuture.runAsync(
() -> dump(0, this.cacheFile0));
CompletableFuture<Void> task1 = CompletableFuture.runAsync(
() -> dump(1, this.cacheFile1));
CompletableFuture.allOf(task0, task1).thenRun(() -> {
writeProperties();
logger.info("dump bloomFilters done.");

}).exceptionally(e -> {
logger.info("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
}

private void dump(int index, Path file) {
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
bloomFilters[index].writeTo(out);
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean loadProperties() {
try (Reader r = new InputStreamReader(new BufferedInputStream(Files.newInputStream(
this.cacheProperties, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)),
StandardCharsets.UTF_8)) {
Properties properties = new Properties();
properties.load(r);
filterStartBlock = Long.parseLong(properties.getProperty("filterStartBlock"));
currentBlockNum = Long.parseLong(properties.getProperty("currentBlockNum"));
currentFilterIndex = Integer.parseInt(properties.getProperty("currentFilterIndex"));
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
return true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void writeProperties() {
try (Writer w = Files.newBufferedWriter(this.cacheProperties, StandardCharsets.UTF_8)) {
Properties properties = new Properties();
properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock));
properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum));
properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex));
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.",
filterStartBlock, currentBlockNum, currentFilterIndex);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public TxCacheDB newInstance() {
return new TxCacheDB(name, recentTransactionStore);
Expand Down
90 changes: 90 additions & 0 deletions framework/src/test/java/org/tron/core/db/TxCacheDBInitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.tron.core.db;

import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.tron.common.application.TronApplicationContext;
import org.tron.common.utils.ByteArray;
import org.tron.core.Constant;
import org.tron.core.capsule.BytesCapsule;
import org.tron.core.config.DefaultConfig;
import org.tron.core.config.args.Args;
import org.tron.keystore.Wallet;

@Slf4j
public class TxCacheDBInitTest {

private static TronApplicationContext context;

@ClassRule
public static final TemporaryFolder temporaryFolder = new TemporaryFolder();

private final byte[][] hash = new byte[140000][64];

@AfterClass
public static void destroy() {
context.destroy();
Args.clearParam();
}

/**
* Init data.
*/
@BeforeClass
public static void init() throws IOException {
Args.setParam(new String[]{"--output-directory", temporaryFolder.newFolder().toString(),
"--p2p-disable", "true"}, Constant.TEST_CONF);
context = new TronApplicationContext(DefaultConfig.class);
}

@Test
public void reload() {
TransactionCache db = context.getBean(TransactionCache.class);
db.initCache();
putTransaction();
DefaultListableBeanFactory defaultListableBeanFactory =
(DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
queryTransaction();
defaultListableBeanFactory.destroySingleton("transactionCache");
TransactionCache transactionCache = new TransactionCache("transactionCache",
context.getBean(RecentTransactionStore.class));
transactionCache.initCache();
defaultListableBeanFactory.registerSingleton("transactionCache",transactionCache);
queryTransaction();
}

private void putTransaction() {
TransactionCache db = context.getBean(TransactionCache.class);
for (int i = 1; i < 140000; i++) {
hash[i] = Wallet.generateRandomBytes(64);
db.put(hash[i], new BytesCapsule(ByteArray.fromLong(i)));
}
}

private void queryTransaction() {
TransactionCache db = context.getBean(TransactionCache.class);
// [1,65537] are expired
for (int i = 1; i < 65538; i++) {
try {
Assert.assertFalse("index = " + i, db.has(hash[i]));
} catch (Exception e) {
Assert.fail("transaction should be expired index = " + i);
}
}
// [65538,140000] are in cache
for (int i = 65538; i < 140000; i++) {
try {
Assert.assertTrue("index = " + i, db.has(hash[i]));
} catch (Exception e) {
Assert.fail("transaction should not be expired index = " + i);
}
}
}

}