Skip to content

Commit

Permalink
feat(db): Optimize for bloomFilter initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 committed Aug 2, 2023
1 parent 10223cc commit eab8d2c
Showing 1 changed file with 166 additions and 2 deletions.
168 changes: 166 additions & 2 deletions chainbase/src/main/java/org/tron/core/db2/common/TxCacheDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,24 @@
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.google.common.primitives.Longs;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.bouncycastle.util.encoders.Hex;
Expand Down Expand Up @@ -42,6 +56,7 @@ public class TxCacheDB implements DB<byte[], byte[]>, Flusher {
private BloomFilter<byte[]>[] bloomFilters = new BloomFilter[2];
// filterStartBlock record the start block of the active filter
private volatile long filterStartBlock = INVALID_BLOCK;
private volatile long currentBlockNum = INVALID_BLOCK;
// currentFilterIndex records the index of the active filter
private volatile int currentFilterIndex = 0;

Expand Down Expand Up @@ -110,6 +125,9 @@ private void initCache() {
}

public void init() {
if (recovery()) {
return;
}
long size = recentTransactionStore.size();
if (size != MAX_BLOCK_SIZE) {
// 0. load from persistentStore
Expand Down Expand Up @@ -172,7 +190,7 @@ public void put(byte[] key, byte[] value) {
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
}
bloomFilters[currentFilterIndex].put(key);

currentBlockNum = blockNum;
if (lastMetricBlock != blockNum) {
lastMetricBlock = blockNum;
Metrics.gaugeSet(MetricKeys.Gauge.TX_CACHE,
Expand Down Expand Up @@ -214,7 +232,7 @@ public void flush(Map<WrappedByteArray, WrappedByteArray> batch) {

@Override
public void close() {
reset();
dump();
bloomFilters[0] = null;
bloomFilters[1] = null;
persistentStore.close();
Expand All @@ -224,6 +242,152 @@ public void close() {
public void reset() {
}

private boolean recovery() {
logger.info("recovery bloomFilters start.");
final Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(),
"bloomFilters_0");
final Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(),
"bloomFilters_1");
Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(),
"txCache.properties");
Map<String, String> properties = readProperties(txCacheProperties);

if (properties == null || properties.size() != 3) {
logger.info("properties is corrupted.");
try {
Files.deleteIfExists(file0);
Files.deleteIfExists(file1);
} catch (IOException e) {
logger.warn("recovery bloomFilters failed. {}", e.getMessage());
}
logger.info("rollback to previous mode.");
return false;
}

filterStartBlock = Long.parseLong(properties.get("filterStartBlock"));
currentBlockNum = Long.parseLong(properties.get("currentBlockNum"));
currentFilterIndex = Integer.parseInt(properties.get("currentFilterIndex"));
CompletableFuture<Boolean> tk0 = CompletableFuture.supplyAsync(() -> recovery(0, file0));
CompletableFuture<Boolean> tk1 = CompletableFuture.supplyAsync(() -> recovery(1, file1));

return CompletableFuture.allOf(tk0, tk1).thenApply(v -> {
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}",
filterStartBlock, currentBlockNum, currentFilterIndex);
logger.info("recovery bloomFilters success.");
return true;
}).exceptionally(e -> {
bloomFilters[0] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
bloomFilters[1] = BloomFilter.create(Funnels.byteArrayFunnel(),
MAX_BLOCK_SIZE * TRANSACTION_COUNT);
logger.warn("recovery bloomFilters failed. {}", e.getMessage());
logger.info("rollback to previous mode.");
return false;
}).join();
}

private boolean recovery(int index, Path file) {
try (InputStream in = new BufferedInputStream(Files.newInputStream(file,
StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE))) {
logger.info("recovery bloomFilter[{}] from file.", index);
long start = System.currentTimeMillis();
bloomFilters[index] = BloomFilter.readFrom(in, Funnels.byteArrayFunnel());
logger.info("recovery bloomFilter[{}] from file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
return true;
} catch (IOException e) {
logger.warn("recovery bloomFilter[{}] failed.", index, e);
throw new RuntimeException(e);
}
}

private void dump() {
logger.info("dump bloomFilters start.");
Path file0 = Paths.get(CommonParameter.getInstance().getOutputDirectory(),
"bloomFilters_0");
Path file1 = Paths.get(CommonParameter.getInstance().getOutputDirectory(),
"bloomFilters_1");
Path txCacheProperties = Paths.get(CommonParameter.getInstance().getOutputDirectory(),
"txCache.properties");
CompletableFuture<Void> task0 = CompletableFuture.runAsync(() -> dump(0, file0));
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> dump(1, file1));
CompletableFuture.allOf(task0, task1).thenRun(() -> {
logger.info("filterStartBlock: {}, currentBlockNum: {}, currentFilterIndex: {}",
filterStartBlock, currentBlockNum, currentFilterIndex);
Map<String, String> properties = new HashMap<>();
properties.put("filterStartBlock", String.valueOf(filterStartBlock));
properties.put("currentBlockNum", String.valueOf(currentBlockNum));
properties.put("currentFilterIndex", String.valueOf(currentFilterIndex));
writeProperties(txCacheProperties, properties);
logger.info("dump bloomFilters done.");

}).exceptionally(e -> {
logger.warn("dump bloomFilters to file failed. {}", e.getMessage());
return null;
}).join();
}

private void dump(int index, Path file) {
try {
Files.deleteIfExists(file);
} catch (IOException e) {
logger.warn("dump bloomFilters[{}] failed.", index, e);
throw new RuntimeException(e);
}
try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(file,
StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE))) {
logger.info("dump bloomFilters[{}] to file.", index);
long start = System.currentTimeMillis();
bloomFilters[index].writeTo(out);
logger.info("dump bloomFilters[{}] to file done,filter: {}, filter-fpp: {}, cost {} ms.",
index, bloomFilters[index].approximateElementCount(), bloomFilters[index].expectedFpp(),
System.currentTimeMillis() - start);
} catch (IOException e) {
logger.warn("dump bloomFilters[{}] failed. {}", index, e.getMessage());
throw new RuntimeException(e);
}
}

private Map<String, String> readProperties(Path file) {
try (BufferedReader r = Files.newBufferedReader(file, StandardCharsets.UTF_8)) {
Properties prop = new Properties();
prop.load(r);
HashMap<String, String> map = new HashMap<>();
prop.forEach((k, v) -> {
String key = new String(k.toString().getBytes(StandardCharsets.ISO_8859_1),
StandardCharsets.UTF_8);
String value = new String(v.toString().getBytes(StandardCharsets.ISO_8859_1),
StandardCharsets.UTF_8);
map.put(key, value);
});
map.entrySet().removeIf(entry -> entry.getValue() == null || entry.getValue().isEmpty());
return map;
} catch (IOException e) {
logger.warn("readProperties. {}", e.getMessage());
return null;
} finally {
try {
Files.deleteIfExists(file);
} catch (IOException e) {
logger.warn("readProperties. {}", e.getMessage());
}
}
}

private boolean writeProperties(Path file, Map<String, String> kvMap) {
try (BufferedWriter w = Files.newBufferedWriter(file);
BufferedReader r = Files.newBufferedReader(file)) {
Properties properties = new Properties();
properties.load(r);
kvMap.forEach(properties::setProperty);
properties.store(w, "Generated by the application. PLEASE DO NOT EDIT! ");
} catch (IOException e) {
throw new RuntimeException(e);
}
return true;
}

@Override
public TxCacheDB newInstance() {
return new TxCacheDB(name, recentTransactionStore);
Expand Down

0 comments on commit eab8d2c

Please sign in to comment.