From eab8d2cc37105172e214f694bc059d2602552dff Mon Sep 17 00:00:00 2001 From: halibobo1205 Date: Thu, 8 Jun 2023 14:00:54 +0800 Subject: [PATCH] feat(db): Optimize for bloomFilter initialization --- .../org/tron/core/db2/common/TxCacheDB.java | 168 +++++++++++++++++- 1 file changed, 166 insertions(+), 2 deletions(-) diff --git a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java index 71cf361b06a..ce98c3db5ec 100644 --- a/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java +++ b/chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java @@ -3,10 +3,24 @@ import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import com.google.common.primitives.Longs; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; import org.bouncycastle.util.encoders.Hex; @@ -42,6 +56,7 @@ public class TxCacheDB implements DB, Flusher { private BloomFilter[] bloomFilters = new BloomFilter[2]; // filterStartBlock record the start block of the active filter private volatile long filterStartBlock = INVALID_BLOCK; + private volatile long currentBlockNum = INVALID_BLOCK; // currentFilterIndex records the index of the active filter private volatile int currentFilterIndex = 0; @@ -110,6 +125,9 @@ private void initCache() { } public void init() { + if (recovery()) { + return; + } long size = recentTransactionStore.size(); if (size != MAX_BLOCK_SIZE) { // 0. load from persistentStore @@ -172,7 +190,7 @@ public void put(byte[] key, byte[] value) { MAX_BLOCK_SIZE * TRANSACTION_COUNT); } bloomFilters[currentFilterIndex].put(key); - + currentBlockNum = blockNum; if (lastMetricBlock != blockNum) { lastMetricBlock = blockNum; Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE, @@ -214,7 +232,7 @@ public void flush(Map batch) { @Override public void close() { - reset(); + dump(); bloomFilters[0] = null; bloomFilters[1] = null; persistentStore.close(); @@ -224,6 +242,152 @@ public void close() { public void reset() { } + private boolean recovery() { + logger.info("recovery bloomFilters start."); + final Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_0"); + final Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_1"); + Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "txCache.properties"); + Map properties = readProperties(txCacheProperties); + + if (properties == null || properties.size() != 3) { + logger.info("properties is corrupted."); + try { + Files.deleteIfExists(file0); + Files.deleteIfExists(file1); + } catch (IOException e) { + logger.warn("recovery bloomFilters failed. {}", e.getMessage()); + } + logger.info("rollback to previous mode."); + return false; + } + + filterStartBlock = Long.parseLong(properties.get("filterStartBlock")); + currentBlockNum = Long.parseLong(properties.get("currentBlockNum")); + currentFilterIndex = Integer.parseInt(properties.get("currentFilterIndex")); + CompletableFuture tk0 = CompletableFuture.supplyAsync(() -> recovery(0, file0)); + CompletableFuture tk1 = CompletableFuture.supplyAsync(() -> recovery(1, file1)); + + return CompletableFuture.allOf(tk0, tk1).thenApply(v -> { + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", + filterStartBlock, currentBlockNum, currentFilterIndex); + logger.info("recovery bloomFilters success."); + return true; + }).exceptionally(e -> { + bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(), + MAX_BLOCK_SIZE * TRANSACTION_COUNT); + logger.warn("recovery bloomFilters failed. {}", e.getMessage()); + logger.info("rollback to previous mode."); + return false; + }).join(); + } + + private boolean recovery(int index, Path file) { + try (InputStream in = new BufferedInputStream(Files.newInputStream(file, + StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) { + logger.info("recovery bloomFilter[{}] from file.", index); + long start = System.currentTimeMillis(); + bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel()); + logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.", + index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), + System.currentTimeMillis() - start); + return true; + } catch (IOException e) { + logger.warn("recovery bloomFilter[{}] failed.", index, e); + throw new RuntimeException(e); + } + } + + private void dump() { + logger.info("dump bloomFilters start."); + Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_0"); + Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "bloomFilters_1"); + Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(), + "txCache.properties"); + CompletableFuture task0 = CompletableFuture.runAsync(() -> dump(0, file0)); + CompletableFuture task1 = CompletableFuture.runAsync(() -> dump(1, file1)); + CompletableFuture.allOf(task0, task1).thenRun(() -> { + logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}", + filterStartBlock, currentBlockNum, currentFilterIndex); + Map properties = new HashMap<>(); + properties.put("filterStartBlock", String.valueOf(filterStartBlock)); + properties.put("currentBlockNum", String.valueOf(currentBlockNum)); + properties.put("currentFilterIndex", String.valueOf(currentFilterIndex)); + writeProperties(txCacheProperties, properties); + logger.info("dump bloomFilters done."); + + }).exceptionally(e -> { + logger.warn("dump bloomFilters to file failed. {}", e.getMessage()); + return null; + }).join(); + } + + private void dump(int index, Path file) { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + logger.warn("dump bloomFilters[{}] failed.", index, e); + throw new RuntimeException(e); + } + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file, + StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))) { + logger.info("dump bloomFilters[{}] to file.", index); + long start = System.currentTimeMillis(); + bloomFilters[index].writeTo(out); + logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.", + index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(), + System.currentTimeMillis() - start); + } catch (IOException e) { + logger.warn("dump bloomFilters[{}] failed. {}", index, e.getMessage()); + throw new RuntimeException(e); + } + } + + private Map readProperties(Path file) { + try (BufferedReader r = Files.newBufferedReader(file, StandardCharsets.UTF_8)) { + Properties prop = new Properties(); + prop.load(r); + HashMap map = new HashMap<>(); + prop.forEach((k, v) -> { + String key = new String(k.toString().getBytes(StandardCharsets.ISO_8859_1), + StandardCharsets.UTF_8); + String value = new String(v.toString().getBytes(StandardCharsets.ISO_8859_1), + StandardCharsets.UTF_8); + map.put(key, value); + }); + map.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty()); + return map; + } catch (IOException e) { + logger.warn("readProperties. {}", e.getMessage()); + return null; + } finally { + try { + Files.deleteIfExists(file); + } catch (IOException e) { + logger.warn("readProperties. {}", e.getMessage()); + } + } + } + + private boolean writeProperties(Path file, Map kvMap) { + try (BufferedWriter w = Files.newBufferedWriter(file); + BufferedReader r = Files.newBufferedReader(file)) { + Properties properties = new Properties(); + properties.load(r); + kvMap.forEach(properties::setProperty); + properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! "); + } catch (IOException e) { + throw new RuntimeException(e); + } + return true; + } + @Override public TxCacheDB newInstance() { return new TxCacheDB(name, recentTransactionStore);