-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(db/trans-cache): optimize for bloomFilter initialization #5394
Changes from all commits
eab8d2c
080e76c
11c26ba
00fac09
2c6fdac
6cd5190
7d1eac9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,24 @@ | |
import com.google.common.hash.BloomFilter; | ||
import com.google.common.hash.Funnels; | ||
import com.google.common.primitives.Longs; | ||
import java.io.BufferedInputStream; | ||
import java.io.BufferedOutputStream; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.io.OutputStream; | ||
import java.io.Reader; | ||
import java.io.Writer; | ||
import java.nio.charset.StandardCharsets; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.nio.file.StandardOpenOption; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.Properties; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.commons.lang3.ArrayUtils; | ||
import org.bouncycastle.util.encoders.Hex; | ||
|
@@ -17,6 +31,7 @@ | |
import org.tron.common.storage.leveldb.LevelDbDataSourceImpl; | ||
import org.tron.common.storage.rocksdb.RocksDbDataSourceImpl; | ||
import org.tron.common.utils.ByteArray; | ||
import org.tron.common.utils.FileUtil; | ||
import org.tron.common.utils.JsonUtil; | ||
import org.tron.common.utils.StorageUtils; | ||
import org.tron.core.capsule.BytesCapsule; | ||
|
@@ -42,6 +57,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher { | |
private BloomFilter<byte[]>[] bloomFilters = new BloomFilter[2]; | ||
// filterStartBlock record the start block of the active filter | ||
private volatile long filterStartBlock = INVALID_BLOCK; | ||
private volatile long currentBlockNum = INVALID_BLOCK; | ||
// currentFilterIndex records the index of the active filter | ||
private volatile int currentFilterIndex = 0; | ||
|
||
|
@@ -57,6 +73,12 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher { | |
// replace persistentStore and optimizes startup performance | ||
private RecentTransactionStore recentTransactionStore; | ||
|
||
private final Path cacheFile0; | ||
private final Path cacheFile1; | ||
private final Path cacheProperties; | ||
private final Path cacheDir; | ||
private AtomicBoolean isValid = new AtomicBoolean(false); | ||
|
||
public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { | ||
this.name = name; | ||
this.TRANSACTION_COUNT = | ||
|
@@ -85,6 +107,10 @@ public TxCacheDB(String name, RecentTransactionStore recentTransactionStore) { | |
MAX_BLOCK_SIZE * TRANSACTION_COUNT); | ||
this.bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), | ||
MAX_BLOCK_SIZE * TRANSACTION_COUNT); | ||
cacheDir = Paths.get(CommonParameter.getInstance().getOutputDirectory(), ".cache"); | ||
this.cacheFile0 = Paths.get(cacheDir.toString(), "bloomFilters_0"); | ||
this.cacheFile1 = Paths.get(cacheDir.toString(), "bloomFilters_1"); | ||
this.cacheProperties = Paths.get(cacheDir.toString(), "txCache.properties"); | ||
|
||
} | ||
|
||
|
@@ -110,6 +136,10 @@ private void initCache() { | |
} | ||
|
||
public void init() { | ||
if (recovery()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated! |
||
isValid.set(true); | ||
return; | ||
} | ||
long size = recentTransactionStore.size(); | ||
if (size != MAX_BLOCK_SIZE) { | ||
// 0. load from persistentStore | ||
|
@@ -129,6 +159,7 @@ public void init() { | |
logger.info("Load cache from recentTransactionStore, filter: {}, filter-fpp: {}, cost: {} ms.", | ||
bloomFilters[1].approximateElementCount(), bloomFilters[1].expectedFpp(), | ||
System.currentTimeMillis() - start); | ||
isValid.set(true); | ||
} | ||
|
||
@Override | ||
|
@@ -172,7 +203,7 @@ public void put(byte[] key, byte[] value) { | |
MAX_BLOCK_SIZE * TRANSACTION_COUNT); | ||
} | ||
bloomFilters[currentFilterIndex].put(key); | ||
|
||
currentBlockNum = blockNum; | ||
if (lastMetricBlock != blockNum) { | ||
lastMetricBlock = blockNum; | ||
Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE, | ||
|
@@ -208,13 +239,15 @@ public Iterator<Entry<byte[], byte[]>> iterator() { | |
} | ||
|
||
@Override | ||
public void flush(Map<WrappedByteArray, WrappedByteArray> batch) { | ||
public synchronized void flush(Map<WrappedByteArray, WrappedByteArray> batch) { | ||
isValid.set(false); | ||
batch.forEach((k, v) -> this.put(k.getBytes(), v.getBytes())); | ||
isValid.set(true); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
reset(); | ||
dump(); | ||
bloomFilters[0] = null; | ||
bloomFilters[1] = null; | ||
persistentStore.close(); | ||
|
@@ -224,6 +257,116 @@ public void close() { | |
public void reset() { | ||
} | ||
|
||
private boolean recovery() { | ||
FileUtil.createDirIfNotExists(this.cacheDir.toString()); | ||
logger.info("recovery bloomFilters start."); | ||
CompletableFuture<Boolean> loadProperties = CompletableFuture.supplyAsync(this::loadProperties); | ||
CompletableFuture<Boolean> tk0 = loadProperties.thenApplyAsync( | ||
v -> recovery(0, this.cacheFile0)); | ||
CompletableFuture<Boolean> tk1 = loadProperties.thenApplyAsync( | ||
v -> recovery(1, this.cacheFile1)); | ||
|
||
return CompletableFuture.allOf(tk0, tk1).thenApply(v -> { | ||
logger.info("recovery bloomFilters success."); | ||
return true; | ||
}).exceptionally(this::handleException).join(); | ||
} | ||
|
||
private boolean recovery(int index, Path file) { | ||
try (InputStream in = new BufferedInputStream(Files.newInputStream(file, | ||
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sorry, i've resubmitted it to avoid dump wrong cache, please recheck it. |
||
logger.info("recovery bloomFilter[{}] from file.", index); | ||
long start = System.currentTimeMillis(); | ||
bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel()); | ||
logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.", | ||
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), | ||
System.currentTimeMillis() - start); | ||
return true; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private boolean handleException(Throwable e) { | ||
bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(), | ||
MAX_BLOCK_SIZE * TRANSACTION_COUNT); | ||
bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), | ||
MAX_BLOCK_SIZE * TRANSACTION_COUNT); | ||
try { | ||
Files.deleteIfExists(this.cacheFile0); | ||
Files.deleteIfExists(this.cacheFile1); | ||
} catch (Exception ignored) { | ||
|
||
} | ||
logger.info("recovery bloomFilters failed. {}", e.getMessage()); | ||
logger.info("rollback to previous mode."); | ||
return false; | ||
} | ||
|
||
private void dump() { | ||
if (!isValid.get()) { | ||
logger.info("bloomFilters is not valid."); | ||
} | ||
FileUtil.createDirIfNotExists(this.cacheDir.toString()); | ||
logger.info("dump bloomFilters start."); | ||
CompletableFuture<Void> task0 = CompletableFuture.runAsync( | ||
() -> dump(0, this.cacheFile0)); | ||
CompletableFuture<Void> task1 = CompletableFuture.runAsync( | ||
() -> dump(1, this.cacheFile1)); | ||
CompletableFuture.allOf(task0, task1).thenRun(() -> { | ||
writeProperties(); | ||
logger.info("dump bloomFilters done."); | ||
|
||
}).exceptionally(e -> { | ||
logger.info("dump bloomFilters to file failed. {}", e.getMessage()); | ||
return null; | ||
}).join(); | ||
} | ||
|
||
private void dump(int index, Path file) { | ||
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file))) { | ||
logger.info("dump bloomFilters[{}] to file.", index); | ||
long start = System.currentTimeMillis(); | ||
bloomFilters[index].writeTo(out); | ||
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.", | ||
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), | ||
System.currentTimeMillis() - start); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private boolean loadProperties() { | ||
try (Reader r = new InputStreamReader(new BufferedInputStream(Files.newInputStream( | ||
this.cacheProperties, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)), | ||
StandardCharsets.UTF_8)) { | ||
Properties properties = new Properties(); | ||
properties.load(r); | ||
filterStartBlock = Long.parseLong(properties.getProperty("filterStartBlock")); | ||
currentBlockNum = Long.parseLong(properties.getProperty("currentBlockNum")); | ||
currentFilterIndex = Integer.parseInt(properties.getProperty("currentFilterIndex")); | ||
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, load done.", | ||
filterStartBlock, currentBlockNum, currentFilterIndex); | ||
return true; | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
private void writeProperties() { | ||
try (Writer w = Files.newBufferedWriter(this.cacheProperties, StandardCharsets.UTF_8)) { | ||
Properties properties = new Properties(); | ||
properties.setProperty("filterStartBlock", String.valueOf(filterStartBlock)); | ||
properties.setProperty("currentBlockNum", String.valueOf(currentBlockNum)); | ||
properties.setProperty("currentFilterIndex", String.valueOf(currentFilterIndex)); | ||
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! "); | ||
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}, write done.", | ||
filterStartBlock, currentBlockNum, currentFilterIndex); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public TxCacheDB newInstance() { | ||
return new TxCacheDB(name, recentTransactionStore); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
package org.tron.core.db; | ||
|
||
import java.io.IOException; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.junit.AfterClass; | ||
import org.junit.Assert; | ||
import org.junit.BeforeClass; | ||
import org.junit.ClassRule; | ||
import org.junit.Test; | ||
import org.junit.rules.TemporaryFolder; | ||
import org.springframework.beans.factory.support.DefaultListableBeanFactory; | ||
import org.tron.common.application.TronApplicationContext; | ||
import org.tron.common.utils.ByteArray; | ||
import org.tron.core.Constant; | ||
import org.tron.core.capsule.BytesCapsule; | ||
import org.tron.core.config.DefaultConfig; | ||
import org.tron.core.config.args.Args; | ||
import org.tron.keystore.Wallet; | ||
|
||
@Slf4j | ||
public class TxCacheDBInitTest { | ||
|
||
private static TronApplicationContext context; | ||
|
||
@ClassRule | ||
public static final TemporaryFolder temporaryFolder = new TemporaryFolder(); | ||
|
||
private final byte[][] hash = new byte[140000][64]; | ||
|
||
@AfterClass | ||
public static void destroy() { | ||
context.destroy(); | ||
Args.clearParam(); | ||
} | ||
|
||
/** | ||
* Init data. | ||
*/ | ||
@BeforeClass | ||
public static void init() throws IOException { | ||
Args.setParam(new String[]{"--output-directory", temporaryFolder.newFolder().toString(), | ||
"--p2p-disable", "true"}, Constant.TEST_CONF); | ||
context = new TronApplicationContext(DefaultConfig.class); | ||
} | ||
|
||
@Test | ||
public void reload() { | ||
TransactionCache db = context.getBean(TransactionCache.class); | ||
db.initCache(); | ||
putTransaction(); | ||
DefaultListableBeanFactory defaultListableBeanFactory = | ||
(DefaultListableBeanFactory) context.getAutowireCapableBeanFactory(); | ||
queryTransaction(); | ||
defaultListableBeanFactory.destroySingleton("transactionCache"); | ||
TransactionCache transactionCache = new TransactionCache("transactionCache", | ||
context.getBean(RecentTransactionStore.class)); | ||
transactionCache.initCache(); | ||
defaultListableBeanFactory.registerSingleton("transactionCache",transactionCache); | ||
queryTransaction(); | ||
} | ||
|
||
private void putTransaction() { | ||
TransactionCache db = context.getBean(TransactionCache.class); | ||
for (int i = 1; i < 140000; i++) { | ||
hash[i] = Wallet.generateRandomBytes(64); | ||
db.put(hash[i], new BytesCapsule(ByteArray.fromLong(i))); | ||
} | ||
} | ||
|
||
private void queryTransaction() { | ||
TransactionCache db = context.getBean(TransactionCache.class); | ||
// [1,65537] are expired | ||
for (int i = 1; i < 65538; i++) { | ||
try { | ||
Assert.assertFalse("index = " + i, db.has(hash[i])); | ||
} catch (Exception e) { | ||
Assert.fail("transaction should be expired index = " + i); | ||
} | ||
} | ||
// [65538,140000] are in cache | ||
for (int i = 65538; i < 140000; i++) { | ||
try { | ||
Assert.assertTrue("index = " + i, db.has(hash[i])); | ||
} catch (Exception e) { | ||
Assert.fail("transaction should not be expired index = " + i); | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A check for
currentBlockNum
withblock header
is needed?Node shutdown using
Kill -9
may result in DBS being inconsistent, so this PR only works with a graceful shutdown? Just for a confirmThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, but how might it be inconsistent? Yes ,a graceful shutdown is required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two questions, with no relation.
A check
just for a double check, seems it introduces no more benefit.The whole shutdown process is complicated, still thinking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the empty blocks will make different, a check is not required.