Skip to content

Commit

Permalink
Data source adapter for the Ethereum blockchain (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
earthshakira authored Sep 30, 2021
1 parent 9ef2d02 commit 4633566
Show file tree
Hide file tree
Showing 14 changed files with 1,355 additions and 0 deletions.
1 change: 1 addition & 0 deletions dbms/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
implementation project(":cassandra-adapter")
implementation project(":cottontail-adapter")
implementation project(":csv-adapter")
implementation project(":ethereum-adapter")
implementation project(":file-adapter")
implementation project(":jdbc-adapter")
implementation project(":mongodb-adapter")
Expand Down
61 changes: 61 additions & 0 deletions ethereum-adapter/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

group "org.polypheny"


version = versionMajor + "." + versionMinor + versionQualifier


dependencies {
implementation project(":core")

implementation group: "net.sf.opencsv", name: "opencsv", version: opencsv_version // Apache 2.0
implementation group: "org.web3j", name: "core", version: web3j_version // Apache 2.0
implementation group: "commons-io", name: "commons-io", version: commons_io_version // Apache 2.0


// --- Test Compile ---
testImplementation project(path: ":core", configuration: "tests")

testImplementation group: "junit", name: "junit", version: junit_version
testImplementation group: "org.hamcrest", name: "hamcrest-core", version: hamcrest_core_version // BSD 3-clause
}


sourceSets {
main {
java {
srcDirs = ["src/main/java"]
outputDir = file(project.buildDir.absolutePath + "/classes")
}
resources {
srcDirs = ["src/main/resources"]
}
output.resourcesDir = file(project.buildDir.absolutePath + "/classes")
}
test {
java {
srcDirs = ["src/test/java"]
outputDir = file(project.buildDir.absolutePath + "/test-classes")
}
resources {
srcDirs = ["src/test/resources"]
}
output.resourcesDir = file(project.buildDir.absolutePath + "/test-classes")
}
}


/**
* JARs
*/
jar {
manifest {
attributes "Manifest-Version": "1.0"
attributes "Copyright": "The Polypheny Project (polypheny.org)"
attributes "Version": "$project.version"
}
}
java {
withJavadocJar()
withSourcesJar()
}
2 changes: 2 additions & 0 deletions ethereum-adapter/lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
config.stopBubbling = true
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2019-2021 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.adapter.ethereum;


import java.io.Closeable;
import java.io.IOException;
import java.math.BigInteger;
import java.util.function.Predicate;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.http.HttpService;

class BlockReader implements Closeable {

protected final Web3j web3j;
protected final Predicate<BigInteger> blockNumberPredicate;
protected int blockReads;
protected BigInteger currentBlock;


BlockReader( String clientUrl, int blocks, Predicate<BigInteger> blockNumberPredicate ) {
this.web3j = Web3j.build( new HttpService( clientUrl ) );
this.blockReads = blocks;
this.blockNumberPredicate = blockNumberPredicate;
try {
this.currentBlock = web3j.ethBlockNumber().send().getBlockNumber();
} catch ( IOException e ) {
throw new RuntimeException( "Unable to connect to server: " + clientUrl );
}
}


public String[] readNext() throws IOException {
if ( this.blockReads <= 0 ) {
return null;
}
EthBlock.Block block = null;
while ( this.currentBlock.compareTo( BigInteger.ZERO ) == 1 && block == null ) {
if ( blockNumberPredicate.test( this.currentBlock ) ) {
block = web3j
.ethGetBlockByNumber( DefaultBlockParameter.valueOf( this.currentBlock ), false )
.send()
.getBlock();
blockReads--;
}
this.currentBlock = this.currentBlock.subtract( BigInteger.ONE );
}
return block == null ? null : EthereumMapper.BLOCK.map( block );
}


/**
* Closes the underlying reader.
*
* @throws IOException if the close fails
*/
@Override
public void close() throws IOException {
this.web3j.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright 2019-2021 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.adapter.ethereum;


import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.adapter.Adapter.AdapterProperties;
import org.polypheny.db.adapter.Adapter.AdapterSettingBoolean;
import org.polypheny.db.adapter.Adapter.AdapterSettingInteger;
import org.polypheny.db.adapter.Adapter.AdapterSettingString;
import org.polypheny.db.adapter.DataSource;
import org.polypheny.db.adapter.DeployMode;
import org.polypheny.db.catalog.entity.CatalogColumnPlacement;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.information.InformationGroup;
import org.polypheny.db.information.InformationTable;
import org.polypheny.db.jdbc.Context;
import org.polypheny.db.schema.Schema;
import org.polypheny.db.schema.SchemaPlus;
import org.polypheny.db.schema.Table;
import org.polypheny.db.transaction.PolyXid;
import org.polypheny.db.type.PolyType;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.http.HttpService;


@Slf4j
@AdapterProperties(
name = "Ethereum",
description = "An adapter for querying the Ethereum blockchain. It uses the ethereum JSON-RPC API. Currently, this adapter only supports read operations.",
usedModes = DeployMode.REMOTE)
@AdapterSettingString(name = "ClientUrl", description = "The URL of the ethereum JSON RPC client", defaultValue = "https://mainnet.infura.io/v3/4d06589e97064040b5da99cf4051ef04", position = 1)
@AdapterSettingInteger(name = "Blocks", description = "The number of Blocks to fetch when processing a query", defaultValue = 10, position = 2, modifiable = true)
@AdapterSettingBoolean(name = "ExperimentalFiltering", description = "Experimentally filter Past Block", defaultValue = false, position = 3, modifiable = true)
public class EthereumDataSource extends DataSource {

private String clientURL;
@Getter
private int blocks;
@Getter
private boolean experimentalFiltering;
private EthereumSchema currentSchema;


public EthereumDataSource( final int storeId, final String uniqueName, final Map<String, String> settings ) {
super( storeId, uniqueName, settings, true );
setClientURL( settings.get( "ClientUrl" ) );
this.blocks = Integer.parseInt( settings.get( "Blocks" ) );
this.experimentalFiltering = Boolean.parseBoolean( settings.get( "ExperimentalFiltering" ) );
createInformationPage();
enableInformationPage();
}


private void setClientURL( String clientURL ) {
Web3j web3j = Web3j.build( new HttpService( clientURL ) );
try {
BigInteger latest = web3j.ethBlockNumber().send().getBlockNumber();
} catch ( Exception e ) {
throw new RuntimeException( "Unable to connect the client URL '" + clientURL + "'" );
}
web3j.shutdown();
this.clientURL = clientURL;
}


@Override
public void createNewSchema( SchemaPlus rootSchema, String name ) {
currentSchema = new EthereumSchema( this.clientURL );
}


@Override
public Table createTableSchema( CatalogTable catalogTable, List<CatalogColumnPlacement> columnPlacementsOnStore ) {
return currentSchema.createBlockchainTable( catalogTable, columnPlacementsOnStore, this );
}


@Override
public Schema getCurrentSchema() {
return currentSchema;
}


@Override
public void truncate( Context context, CatalogTable table ) {
throw new RuntimeException( "Blockchain adapter does not support truncate" );
}


@Override
public Map<String, List<ExportedColumn>> getExportedColumns() {
Map<String, List<ExportedColumn>> map = new HashMap<>();
String[] blockColumns = { "number", "hash", "parentHash", "nonce", "sha3Uncles", "logsBloom", "transactionsRoot", "stateRoot", "receiptsRoot", "author", "miner", "mixHash", "difficulty", "totalDifficulty", "extraData", "size", "gasLimit", "gasUsed", "timestamp" };
PolyType[] blockTypes = { PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.BIGINT, PolyType.TIMESTAMP };
String[] transactionColumns = { "hash", "nonce", "blockHash", "blockNumber", "transactionIndex", "from", "to", "value", "gasPrice", "gas", "input", "creates", "publicKey", "raw", "r", "s" };
PolyType[] transactionTypes = { PolyType.VARCHAR, PolyType.BIGINT, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.BIGINT, PolyType.BIGINT, PolyType.BIGINT, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR, PolyType.VARCHAR };

PolyType type = PolyType.VARCHAR;
PolyType collectionsType = null;
Integer length = 300;
Integer scale = null;
Integer dimension = null;
Integer cardinality = null;
int position = 0;
List<ExportedColumn> blockCols = new ArrayList<>();
for ( String blockCol : blockColumns ) {
blockCols.add( new ExportedColumn(
blockCol,
blockTypes[position],
collectionsType,
length,
scale,
dimension,
cardinality,
false,
"public",
"block",
blockCol,
position,
position == 0 ) );
position++;

}
map.put( "block", blockCols );
List<ExportedColumn> transactCols = new ArrayList<>();
position = 0;
for ( String transactCol : transactionColumns ) {
transactCols.add( new ExportedColumn(
transactCol,
transactionTypes[position],
collectionsType,
length,
scale,
dimension,
cardinality,
false,
"public",
"transaction",
transactCol,
position,
position == 0 ) );
position++;
}
map.put( "transaction", transactCols );
return map;
}


@Override
public boolean prepare( PolyXid xid ) {
log.debug( "Blockchain Store does not support prepare()." );
return true;
}


@Override
public void commit( PolyXid xid ) {
log.debug( "Blockchain Store does not support commit()." );
}


@Override
public void rollback( PolyXid xid ) {
log.debug( "Blockchain Store does not support rollback()." );
}


@Override
public void shutdown() {
removeInformationPage();
}


@Override
protected void reloadSettings( List<String> updatedSettings ) {
if ( updatedSettings.contains( "ClientUrl" ) ) {
setClientURL( settings.get( "ClientUrl" ) );
}
if ( updatedSettings.contains( "Blocks" ) ) {
this.blocks = Integer.parseInt( settings.get( "Blocks" ) );
}
if ( updatedSettings.contains( "ExperimentalFiltering" ) ) {
this.experimentalFiltering = Boolean.parseBoolean( settings.get( "ExperimentalFiltering" ) );
}
}


protected void createInformationPage() {
for ( Map.Entry<String, List<ExportedColumn>> entry : getExportedColumns().entrySet() ) {
InformationGroup group = new InformationGroup(
informationPage,
entry.getValue().get( 0 ).physicalSchemaName + "." + entry.getValue().get( 0 ).physicalTableName );

InformationTable table = new InformationTable(
group,
Arrays.asList( "Position", "Column Name", "Type", "Primary" ) );
for ( ExportedColumn exportedColumn : entry.getValue() ) {
table.addRow(
exportedColumn.physicalPosition,
exportedColumn.name,
exportedColumn.getDisplayType(),
exportedColumn.primary ? "✔" : ""
);
}
informationElements.add( table );
informationGroups.add( group );
}
}

}
Loading

0 comments on commit 4633566

Please sign in to comment.