From ab28ce2ca9da0e5d946d5deb22a98d146a764df8 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Thu, 24 Oct 2019 11:40:43 -0600 Subject: [PATCH] [PAN-3249] Use Bloombits for Logs queries (#127) Use the bloombits for logs queries, so we only have to walk headers and not every receipt on a large query. Signed-off-by: Danno Ferrin Signed-off-by: edwardmack --- .../ethereum/api/query/BlockchainQueries.java | 11 +++-- .../besu/ethereum/api/query/LogsQuery.java | 46 +++++++++++++++++-- .../filter/FilterManagerLogFilterTest.java | 11 ++--- .../internal/filter/LogsQueryTest.java | 11 +++++ .../internal/methods/EthNewFilterTest.java | 17 +++---- .../jsonrpc/eth/eth_getLogs_nullParam.json | 34 ++++++++------ .../besu/ethereum/core/LogsBloomFilter.java | 30 +++++++++++- 7 files changed, 124 insertions(+), 36 deletions(-) diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java index 331eeb76edd..eb6b7e3bdd2 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/BlockchainQueries.java @@ -486,11 +486,16 @@ public Optional transactionReceiptByTransactionH */ public List matchingLogs( final long fromBlockNumber, final long toBlockNumber, final LogsQuery query) { + // rangeClosed handles the inverted from/to situations automatically with zero results. return LongStream.rangeClosed(fromBlockNumber, toBlockNumber) - .mapToObj(blockchain::getBlockHashByNumber) + .mapToObj(blockchain::getBlockHeader) + // Use takeWhile instead of clamping on toBlockNumber/headBlockNumber because it may get an + // extra block or two for a query that has a toBlockNumber past chain head. Similarly this + // handles the case when fromBlockNumber is past chain head. .takeWhile(Optional::isPresent) - .flatMap(Optional::stream) - .flatMap(hash -> matchingLogs(hash, query).stream()) + .map(Optional::get) + .filter(header -> query.couldMatch(header.getLogsBloom())) + .flatMap(header -> matchingLogs(header.getHash(), query).stream()) .collect(Collectors.toList()); } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogsQuery.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogsQuery.java index 932024f970b..3157ba9be82 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogsQuery.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/LogsQuery.java @@ -19,9 +19,12 @@ import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Log; import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogsBloomFilter; import java.util.Arrays; import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import com.google.common.collect.Lists; @@ -29,10 +32,33 @@ public class LogsQuery { private final List
queryAddresses; private final List> queryTopics; + private final List addressBlooms; + private final List> topicsBlooms; + + private LogsQuery(final List
queryAddresses, final List> queryTopics) { + this.queryAddresses = queryAddresses; + this.queryTopics = queryTopics; + addressBlooms = + this.queryAddresses.stream() + .map(LogsBloomFilter::computeBytes) + .collect(Collectors.toList()); + topicsBlooms = + this.queryTopics.stream() + .map( + topics -> + topics.stream() + .filter(Objects::nonNull) + .map(LogsBloomFilter::computeBytes) + .collect(Collectors.toList())) + .collect(Collectors.toList()); + } - private LogsQuery(final List
addresses, final List> topics) { - this.queryAddresses = addresses; - this.queryTopics = topics; + public boolean couldMatch(final LogsBloomFilter bloom) { + return (addressBlooms.isEmpty() || addressBlooms.stream().anyMatch(bloom::couldContain)) + && (topicsBlooms.isEmpty() + || topicsBlooms.stream() + .allMatch( + topics -> topics.isEmpty() || topics.stream().anyMatch(bloom::couldContain))); } public boolean matches(final Log log) { @@ -62,6 +88,20 @@ private boolean matchesTopic(final LogTopic topic, final List matchCri return matchCriteria.contains(null) || matchCriteria.contains(topic); } + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final LogsQuery logsQuery = (LogsQuery) o; + return Objects.equals(queryAddresses, logsQuery.queryAddresses) + && Objects.equals(queryTopics, logsQuery.queryTopics); + } + + @Override + public int hashCode() { + return Objects.hash(queryAddresses, queryTopics); + } + public static class Builder { private final List
queryAddresses = Lists.newArrayList(); private final List> queryTopics = Lists.newArrayList(); diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java index e62079b7f04..3355b10806b 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/FilterManagerLogFilterTest.java @@ -18,7 +18,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -86,7 +85,7 @@ public void shouldCheckMatchingLogsWhenRecordedNewBlockEvent() { filterManager.installLogFilter(latest(), latest(), logsQuery()); recordNewBlockEvent(); - verify(blockchainQueries).matchingLogs(eq(100L), eq(100L), refEq(logsQuery())); + verify(blockchainQueries).matchingLogs(eq(100L), eq(100L), eq(logsQuery())); } @Test @@ -96,14 +95,14 @@ public void shouldUseHeadBlockAsFromBlockNumberWhenCheckingLogsForChanges() { filterManager.installLogFilter(blockNum(1L), blockNum(10L), logsQuery()); recordNewBlockEvent(); - verify(blockchainQueries).matchingLogs(eq(3L), eq(10L), refEq(logsQuery())); + verify(blockchainQueries).matchingLogs(eq(3L), eq(10L), eq(logsQuery())); } @Test public void shouldReturnLogWhenLogFilterMatches() { final LogWithMetadata log = logWithMetadata(); when(blockchainQueries.headBlockNumber()).thenReturn(100L); - when(blockchainQueries.matchingLogs(eq(100L), eq(100L), refEq(logsQuery()))) + when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery()))) .thenReturn(Lists.newArrayList(log)); final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery()); @@ -163,13 +162,13 @@ public void getLogsForAbsentFilterReturnsNull() { public void getLogsForExistingFilterReturnsResults() { final LogWithMetadata log = logWithMetadata(); when(blockchainQueries.headBlockNumber()).thenReturn(100L); - when(blockchainQueries.matchingLogs(eq(100L), eq(100L), refEq(logsQuery()))) + when(blockchainQueries.matchingLogs(eq(100L), eq(100L), eq(logsQuery()))) .thenReturn(Lists.newArrayList(log)); final String filterId = filterManager.installLogFilter(latest(), latest(), logsQuery()); final List retrievedLogs = filterManager.logs(filterId); - assertThat(retrievedLogs).isEqualToComparingFieldByFieldRecursively(Lists.newArrayList(log)); + assertThat(retrievedLogs).usingRecursiveComparison().isEqualTo(Lists.newArrayList(log)); } @Test diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogsQueryTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogsQueryTest.java index d477c04dbd8..b35c11d8190 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogsQueryTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/filter/LogsQueryTest.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.core.Address; import org.hyperledger.besu.ethereum.core.Log; import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogsBloomFilter; import org.hyperledger.besu.util.bytes.BytesValue; import java.util.ArrayList; @@ -41,6 +42,7 @@ public void wildcardQueryAddressTopicReturnTrue() { final List topics = new ArrayList<>(); final Log log = new Log(address, data, topics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -52,6 +54,7 @@ public void univariateAddressMatchReturnsTrue() { final List topics = new ArrayList<>(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), topics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -77,6 +80,7 @@ public void multivariateAddressQueryMatchReturnsTrue() { final List topics = new ArrayList<>(); final Log log = new Log(address1, BytesValue.fromHexString("0x0102"), topics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -129,6 +133,7 @@ public void univariateTopicQueryMatchReturnTrue() { final LogsQuery query = new LogsQuery.Builder().address(address).topics(topicsQuery).build(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), Lists.newArrayList(topic)); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -214,6 +219,7 @@ public void multivariateSurplusTopicMatchMultivariateNullQueryReturnTrue() { new LogsQuery.Builder().address(address1).topics(queryParameter).build(); final Log log = new Log(address1, BytesValue.fromHexString("0x0102"), logTopics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -243,6 +249,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_00() { final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -275,6 +282,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_01() { final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -308,6 +316,7 @@ public void multivariateSurplusTopicMatchMultivariateQueryReturnTrue_02() { final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -344,6 +353,7 @@ public void redundantUnivariateTopicMatchMultivariateQueryReturnTrue() { final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } @@ -417,6 +427,7 @@ public void multivariateTopicMatchMultivariateQueryReturnTrue() { final LogsQuery query = new LogsQuery.Builder().address(address).topics(queryParameter).build(); final Log log = new Log(address, BytesValue.fromHexString("0x0102"), logTopics); + assertThat(query.couldMatch(LogsBloomFilter.compute(List.of(log)))).isTrue(); assertThat(query.matches(log)).isTrue(); } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthNewFilterTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthNewFilterTest.java index 0a6a131ebba..7fd7a9472ad 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthNewFilterTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/internal/methods/EthNewFilterTest.java @@ -16,6 +16,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.refEq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -85,14 +86,14 @@ public void newFilterWithoutAddressAndTopicsParamsInstallsEmptyLogFilter() { final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), "0x1"); final LogsQuery expectedLogsQuery = new LogsQuery.Builder().build(); - when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1"); + when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1"); final JsonRpcResponse actualResponse = method.response(request); assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse); verify(filterManager) .installLogFilter( - refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery)); + refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery)); } @Test @@ -104,14 +105,14 @@ public void newFilterWithTopicsOnlyParamInstallsExpectedLogFilter() { final LogsQuery expectedLogsQuery = new LogsQuery.Builder().topics(new TopicsParameter(topics)).build(); - when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1"); + when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1"); final JsonRpcResponse actualResponse = method.response(request); assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse); verify(filterManager) .installLogFilter( - refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery)); + refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery)); } @Test @@ -122,14 +123,14 @@ public void newFilterWithAddressOnlyParamInstallsExpectedLogFilter() { final JsonRpcResponse expectedResponse = new JsonRpcSuccessResponse(request.getId(), "0x1"); final LogsQuery expectedLogsQuery = new LogsQuery.Builder().address(address).build(); - when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1"); + when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1"); final JsonRpcResponse actualResponse = method.response(request); assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse); verify(filterManager) .installLogFilter( - refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery)); + refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery)); } @Test @@ -142,14 +143,14 @@ public void newFilterWithAddressAndTopicsParamInstallsExpectedLogFilter() { final LogsQuery expectedLogsQuery = new LogsQuery.Builder().address(address).topics(new TopicsParameter(topics)).build(); - when(filterManager.installLogFilter(any(), any(), refEq(expectedLogsQuery))).thenReturn("0x1"); + when(filterManager.installLogFilter(any(), any(), eq(expectedLogsQuery))).thenReturn("0x1"); final JsonRpcResponse actualResponse = method.response(request); assertThat(actualResponse).isEqualToComparingFieldByField(expectedResponse); verify(filterManager) .installLogFilter( - refEq(blockParamLatest()), refEq(blockParamLatest()), refEq(expectedLogsQuery)); + refEq(blockParamLatest()), refEq(blockParamLatest()), eq(expectedLogsQuery)); } private List> topics() { diff --git a/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_nullParam.json b/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_nullParam.json index 6c5e697ad75..c5ab156e9db 100644 --- a/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_nullParam.json +++ b/ethereum/api/src/test/resources/org/hyperledger/besu/ethereum/api/jsonrpc/eth/eth_getLogs_nullParam.json @@ -4,26 +4,32 @@ "jsonrpc": "2.0", "method": "eth_getLogs", "params": [{ - "fromBlock": "0x17", - "toBlock": "0x17", + "fromBlock": "0x20", + "toBlock": "0x20", "address": [], - "topics": [["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", null]] + "topics": [null, ["0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b"]] }] }, "response": { "jsonrpc": "2.0", "id": 406, - "result" : [{ - "logIndex" : "0x0", - "removed": false, - "blockNumber" : "0x17", - "blockHash" : "0x3c419f39b340a4c35cc27b8f7880b779dc1abb9814ad13a2a5a55b885cc8ec2d", - "transactionHash" : "0x97a385bf570ced7821c6495b3877ddd2afd5c452f350f0d4876e98d9161389c6", - "transactionIndex" : "0x0", - "address" : "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f", - "data" : "0x000000000000000000000000000000000000000000000000000000000000002a", - "topics" : ["0x65c9ac8011e286e89d02a269890f41d67ca2cc597b2c76c7c69321ff492be580"] - }] + "result": [ + { + "logIndex": "0x0", + "removed": false, + "blockNumber": "0x20", + "blockHash": "0x71d59849ddd98543bdfbe8548f5eed559b07b8aaf196369f39134500eab68e53", + "transactionHash": "0xcef53f2311d7c80e9086d661e69ac11a5f3d081e28e02a9ba9b66749407ac310", + "transactionIndex": "0x0", + "address": "0x6295ee1b4f6dd65047762f924ecd367c17eabf8f", + "data": "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffe9000000000000000000000000000000000000000000000000000000000000002a", + "topics": [ + "0x0000000000000000000000000000000000000000000000000000000000000001", + "0x000000000000000000000000a94f5374fce5edbc8e2a8697c15331677e6ebf0b", + "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + ] + } + ] }, "statusCode": 200 } \ No newline at end of file diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogsBloomFilter.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogsBloomFilter.java index 5f1e43bd5d7..4524e07352c 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogsBloomFilter.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogsBloomFilter.java @@ -76,6 +76,12 @@ public static LogsBloomFilter compute(final Collection logs) { return bloom; } + public static LogsBloomFilter computeBytes(final BytesValue bytesValue) { + final LogsBloomFilter bloom = new LogsBloomFilter(); + bloom.insertBytesValue(bytesValue); + return bloom; + } + /** * Creates a bloom filter from the given RLP-encoded input. * @@ -129,13 +135,17 @@ public BytesValue getBytes() { } public void insertLog(final Log log) { - setBits(keccak256(log.getLogger())); + insertBytesValue(log.getLogger()); for (final LogTopic topic : log.getTopics()) { - setBits(keccak256(topic)); + insertBytesValue(topic); } } + private void insertBytesValue(final BytesValue bytesValue) { + setBits(keccak256(bytesValue)); + } + private void setBit(final int index) { final int byteIndex = BYTE_SIZE - 1 - index / 8; final int bitIndex = index % 8; @@ -148,6 +158,22 @@ public void digest(final LogsBloomFilter other) { } } + public boolean couldContain(final LogsBloomFilter subset) { + if (subset == null) { + return true; + } + if (subset.size() != data.size()) { + return false; + } + for (int i = 0; i < data.size(); i++) { + final byte subsetValue = subset.data.get(i); + if ((data.get(i) & subsetValue) != subsetValue) { + return false; + } + } + return true; + } + @Override public String toString() { return data.toString();