Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Sep 19, 2021
2 parents 7527fd0 + db305da commit 37e7a3f
Show file tree
Hide file tree
Showing 112 changed files with 6,375 additions and 416 deletions.
35 changes: 34 additions & 1 deletion .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,24 @@ defaults:
working-directory: ./python

jobs:
generate-license:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Generate license file
run: python ../dev/create_license.py
- uses: actions/upload-artifact@v2
with:
name: python-wheel-license
path: python/LICENSE.txt

build-python-mac-win:
needs: [generate-license]
name: Mac/Win
runs-on: ${{ matrix.os }}
strategy:
Expand All @@ -50,6 +67,13 @@ jobs:
python -m pip install --upgrade pip
pip install maturin==0.11.2
- run: rm LICENSE.txt
- name: Download LICENSE.txt
uses: actions/download-artifact@v2
with:
name: python-wheel-license
path: python

- name: Build Python package
run: maturin build --release --no-sdist --strip

Expand All @@ -68,13 +92,22 @@ jobs:
path: python/target/wheels/*

build-manylinux:
needs: [generate-license]
name: Manylinux
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: rm LICENSE.txt
- name: Download LICENSE.txt
uses: actions/download-artifact@v2
with:
name: python-wheel-license
path: python
- run: cat LICENSE.txt
- name: Build wheels
run: |
docker run --rm -v $(pwd):/io \
docker run --rm -v $(pwd)/..:/io \
--workdir /io/python \
konstin2/maturin:v0.11.2 \
build --release --manylinux 2010
- name: Archive wheels
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
python -m venv venv
source venv/bin/activate
pip install -r python/requirements.txt
pip install -r python/requirements-39.txt
- name: Run Linters
run: |
source venv/bin/activate
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,14 @@ jobs:
run: |
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
# run tests on all workspace members with default feature list
cargo test
# run tests on all workspace members with default feature list + avro
RUST_MIN_STACK=10485760 cargo test --features=avro
# test datafusion examples
cd datafusion-examples
cargo test --no-default-features
cargo run --example csv_sql
cargo run --example parquet_sql
cargo run --example avro_sql --features=datafusion/avro
env:
CARGO_HOME: "/github/home/.cargo"
CARGO_TARGET_DIR: "/github/home/target"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

# DataFusion

<img src="datafusion/docs/images/DataFusion-Logo-Background-White.svg" width="256"/>
<img src="docs/source/_static/images/DataFusion-Logo-Background-White.svg" width="256"/>

DataFusion is an extensible query execution framework, written in
Rust, that uses [Apache Arrow](https://arrow.apache.org) as its
Expand Down
54 changes: 49 additions & 5 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion::dataframe::DataFrame;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::sql::parser::FileType;

Expand Down Expand Up @@ -65,7 +66,9 @@ impl BallistaContextState {
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
info!("Running in local mode. Scheduler will be run in-proc");
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;

log::info!("Running in local mode. Scheduler will be run in-proc");

let addr = ballista_scheduler::new_standalone_scheduler().await?;

Expand All @@ -77,8 +80,8 @@ impl BallistaContextState {
.await
{
Err(_) => {
tokio::time::sleep(Duration::from_millis(100)).await;
info!("Attempting to connect to in-proc scheduler...");
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to in-proc scheduler...");
}
Ok(scheduler) => break scheduler,
}
Expand Down Expand Up @@ -125,6 +128,30 @@ impl BallistaContext {
})
}

/// Create a DataFrame representing an Avro table scan
pub fn read_avro(
&self,
path: &str,
options: AvroReadOptions,
) -> Result<Arc<dyn DataFrame>> {
// convert to absolute path because the executor likely has a different working directory
let path = PathBuf::from(path);
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock().unwrap();
create_df_ctx_with_ballista_query_planner(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options)?;
Ok(df)
}

/// Create a DataFrame representing a Parquet table scan
pub fn read_parquet(&self, path: &str) -> Result<Arc<dyn DataFrame>> {
Expand Down Expand Up @@ -193,6 +220,17 @@ impl BallistaContext {
self.register_table(name, df.as_ref())
}

pub fn register_avro(
&self,
name: &str,
path: &str,
options: AvroReadOptions,
) -> Result<()> {
let df = self.read_avro(path, options)?;
self.register_table(name, df.as_ref())?;
Ok(())
}

/// Create a DataFrame from a SQL statement
pub fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
Expand Down Expand Up @@ -240,6 +278,10 @@ impl BallistaContext {
self.register_parquet(name, location)?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
Expand All @@ -257,8 +299,10 @@ mod tests {
#[cfg(feature = "standalone")]
async fn test_standalone_mode() {
use super::*;
let context = BallistaContext::standalone(1).await.unwrap();
let context = BallistaContext::standalone(&BallistaConfig::new().unwrap(), 1)
.await
.unwrap();
let df = context.sql("SELECT 1;").unwrap();
context.collect(&df.to_logical_plan()).await.unwrap();
df.collect().await.unwrap();
}
}
24 changes: 24 additions & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ enum ScalarFunction {
SHA384 = 32;
SHA512 = 33;
LN = 34;
TOTIMESTAMPMILLIS = 35;
}

message ScalarFunctionNode {
Expand Down Expand Up @@ -253,6 +254,7 @@ message LogicalPlanNode {
WindowNode window = 13;
AnalyzeNode analyze = 14;
CrossJoinNode cross_join = 15;
AvroTableScanNode avro_scan = 16;
}
}

Expand Down Expand Up @@ -296,6 +298,15 @@ message ParquetTableScanNode {
repeated LogicalExprNode filters = 4;
}

message AvroTableScanNode {
string table_name = 1;
string path = 2;
string file_extension = 3;
ProjectionColumns projection = 4;
Schema schema = 5;
repeated LogicalExprNode filters = 6;
}

message ProjectionNode {
LogicalPlanNode input = 1;
repeated LogicalExprNode expr = 2;
Expand Down Expand Up @@ -340,6 +351,7 @@ enum FileType{
NdJson = 0;
Parquet = 1;
CSV = 2;
Avro = 3;
}

message AnalyzeNode {
Expand Down Expand Up @@ -456,6 +468,7 @@ message PhysicalPlanNode {
WindowAggExecNode window = 17;
ShuffleWriterExecNode shuffle_writer = 18;
CrossJoinExecNode cross_join = 19;
AvroScanExecNode avro_scan = 20;
}
}

Expand Down Expand Up @@ -610,6 +623,17 @@ message CsvScanExecNode {
repeated string filename = 8;
}

message AvroScanExecNode {
string path = 1;
repeated uint32 projection = 2;
Schema schema = 3;
string file_extension = 4;
uint32 batch_size = 5;

// partition filenames
repeated string filename = 8;
}

enum PartitionMode {
COLLECT_LEFT = 0;
PARTITIONED = 1;
Expand Down
29 changes: 29 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use datafusion::logical_plan::{
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::avro::AvroReadOptions;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;
use datafusion::scalar::ScalarValue;
Expand Down Expand Up @@ -169,6 +170,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
.build()
.map_err(|e| e.into())
}
LogicalPlanType::AvroScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;
let options = AvroReadOptions {
schema: Some(Arc::new(schema.clone())),
file_extension: &scan.file_extension,
};

let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}

LogicalPlanBuilder::scan_avro_with_name(
&scan.path,
options,
projection,
&scan.table_name,
)?
.build()
.map_err(|e| e.into())
}
LogicalPlanType::Sort(sort) => {
let input: LogicalPlan = convert_box_required!(sort.input)?;
let sort_expr: Vec<Expr> = sort
Expand Down Expand Up @@ -1191,6 +1218,7 @@ impl TryFrom<i32> for protobuf::FileType {
_x if _x == FileType::NdJson as i32 => Ok(FileType::NdJson),
_x if _x == FileType::Parquet as i32 => Ok(FileType::Parquet),
_x if _x == FileType::Csv as i32 => Ok(FileType::Csv),
_x if _x == FileType::Avro as i32 => Ok(FileType::Avro),
invalid => Err(BallistaError::General(format!(
"Attempted to convert invalid i32 to protobuf::Filetype: {}",
invalid
Expand All @@ -1207,6 +1235,7 @@ impl Into<datafusion::sql::parser::FileType> for protobuf::FileType {
protobuf::FileType::NdJson => FileType::NdJson,
protobuf::FileType::Parquet => FileType::Parquet,
protobuf::FileType::Csv => FileType::CSV,
protobuf::FileType::Avro => FileType::Avro,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions ballista/rust/core/src/serde/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,12 @@ mod roundtrip_tests {

let df_schema_ref = schema.to_dfschema_ref()?;

let filetypes: [FileType; 3] =
[FileType::NdJson, FileType::Parquet, FileType::CSV];
let filetypes: [FileType; 4] = [
FileType::NdJson,
FileType::Parquet,
FileType::CSV,
FileType::Avro,
];

for file in filetypes.iter() {
let create_table_node = LogicalPlan::CreateExternalTable {
Expand Down
Loading

0 comments on commit 37e7a3f

Please sign in to comment.