Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support expressions in the arg of @aggregate #5242

Merged
merged 8 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 40 additions & 10 deletions docs/aggregations.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ _dimensions_, and fields with the `@aggregate` directive are called
_aggregates_. A timeseries type really represents many timeseries, one for
each combination of values for the dimensions.

The same timeseries can be used for multiple aggregations. For example, the
`Stats` aggregation could also be formed by aggregating over the `TokenData`
timeseries. Since `Stats` doesn't have a `token` dimension, all aggregates
will be formed across all tokens.

Each `@aggregate` by default starts at 0 for each new bucket and therefore
just aggregates over the time interval for the bucket. The `@aggregate`
directive also accepts a boolean flag `cumulative` that indicates whether
Expand All @@ -97,10 +102,6 @@ the entire timeseries up to the end of the time interval for the bucket.
aggregations, and it doesn't seem like it used in practice, we won't
initially support it. (same for variance, stddev etc.)

**TODO** The timeseries type can be simplified for some situations if
aggregations can be done over expressions, for example over `priceUSD *
amount` to track `totalVolumeUSD`

**TODO** It might be necessary to allow `@aggregate` fields that are only
used for some intervals. We could allow that with syntax like
`@aggregate(fn: .., arg: .., interval: "day")`
Expand Down Expand Up @@ -132,7 +133,10 @@ annotation. These attributes must be of a numeric type (`Int`, `Int8`,
`BigInt`, or `BigDecimal`) The annotation must have two arguments:

- `fn`: the name of an aggregation function
- `arg`: the name of an attribute in the timeseries type
- `arg`: the name of an attribute in the timeseries type, or an expression
using only constants and attributes of the timeseries type

#### Aggregation functions

The following aggregation functions are currently supported:

Expand All @@ -149,16 +153,42 @@ The `first` and `last` aggregation function calculate the first and last
value in an interval by sorting the data by `id`; `graph-node` enforces
correctness here by automatically setting the `id` for timeseries entities.

#### Aggregation expressions

The `arg` can be the name of any attribute in the timeseries type, or an
expression using only constants and attributes of the timeseries type such
as `price * amount` or `greatest(amount0, amount1)`. Expressions use SQL
syntax and support a subset of builtin SQL functions, operators, and other
constructs.

Supported operators are `+`, `-`, `*`, `/`, `%`, `^`, `=`, `!=`, `<`, `<=`,
`>`, `>=`, `<->`, `and`, `or`, and `not`. In addition the operators `is
[not] {null|true|false}`, and `is [not] distinct from` are supported.

The supported SQL functions are the [math
functions](https://www.postgresql.org/docs/current/functions-math.html)
`abs`, `ceil`, `ceiling`, `div`, `floor`, `gcd`, `lcm`, `mod`, `power`,
`sign`, and the [conditional
functions](https://www.postgresql.org/docs/current/functions-conditional.html)
`coalesce`, `nullif`, `greatest`, and `least`.

The
[statement](https://www.postgresql.org/docs/current/functions-conditional.html#FUNCTIONS-CASE)
`case when .. else .. end` is also supported.

Some examples of valid expressions, assuming the underlying timeseries
contains the mentioned fields:

- Aggregate the value of a token: `@aggregate(fn: "sum", arg: "priceUSD * amount")`
- Aggregate the maximum positive amount of two different amounts:
`@aggregate(fn: "max", arg: "greatest(amount0, amount1, 0)")`
- Conditionally sum an amount: `@aggregate(fn: "sum", arg: "case when amount0 > amount1 then amount0 else 0 end")`

## Querying

_This section is not implemented yet, and will require a bit more thought
about details_

**TODO** As written, timeseries points like `TokenData` can be queried like
any other entity. It would be nice to restrict how these data points can be
queried, maybe even forbid it, as that would give us more latitude in how we
store that data.

We create a toplevel query field for each aggregation. That query field
accepts the following arguments:

Expand Down
1 change: 1 addition & 0 deletions graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-pat
"arbitrary_precision",
] }
serde_plain = "1.0.2"
sqlparser = "0.43.1"

[dev-dependencies]
clap = { version = "3.2.25", features = ["derive", "env"] }
Expand Down
1 change: 1 addition & 0 deletions graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub use petgraph;
pub use prometheus;
pub use semver;
pub use slog;
pub use sqlparser;
pub use stable_hash;
pub use stable_hash_legacy;
pub use tokio;
Expand Down
5 changes: 1 addition & 4 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use crate::{
util::intern::Atom,
};

use super::{
input_schema::{ObjectType, POI_OBJECT},
EntityKey, Field, InputSchema, InterfaceType,
};
use super::{EntityKey, Field, InputSchema, InterfaceType, ObjectType, POI_OBJECT};

/// A reference to a type in the input schema. It should mostly be the
/// reference to a concrete entity type, either one declared with `@entity`
Expand Down
126 changes: 59 additions & 67 deletions graph/src/schema/input_schema.rs → graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ use crate::prelude::{s, DeploymentHash};
use crate::schema::api::api_schema;
use crate::util::intern::{Atom, AtomPool};

use super::fulltext::FulltextDefinition;
use super::{ApiSchema, AsEntityTypeName, EntityType, Schema};
use crate::schema::fulltext::FulltextDefinition;
use crate::schema::{ApiSchema, AsEntityTypeName, EntityType, Schema};

pub mod sqlexpr;

/// The name of the PoI entity type
pub(crate) const POI_OBJECT: &str = "Poi$";
Expand Down Expand Up @@ -753,42 +755,26 @@ impl AggregationMapping {
}
}

#[derive(Clone, PartialEq, Debug)]
pub struct Arg {
pub name: Word,
pub value_type: ValueType,
}

impl Arg {
fn new(name: Word, src_type: &s::ObjectType) -> Self {
let value_type = src_type
.field(&name)
.unwrap()
.field_type
.value_type()
.unwrap();
Self { name, value_type }
}
}

/// The `@aggregate` annotation in an aggregation. The annotation controls
/// how values from the source table are aggregated
#[derive(PartialEq, Debug)]
pub struct Aggregate {
/// The name of the aggregate field in the aggregation
pub name: Word,
/// The function used to aggregate the values
pub func: AggregateFn,
pub arg: Arg,
/// The field to aggregate in the source table
pub arg: Word,
/// The type of the field `name` in the aggregation
pub field_type: s::Type,
/// The `ValueType` corresponding to `field_type`
pub value_type: ValueType,
/// Whether the aggregation is cumulative
pub cumulative: bool,
}

impl Aggregate {
fn new(
_schema: &Schema,
src_type: &s::ObjectType,
name: &str,
field_type: &s::Type,
dir: &s::Directive,
) -> Self {
fn new(_schema: &Schema, name: &str, field_type: &s::Type, dir: &s::Directive) -> Self {
let func = dir
.argument("fn")
.unwrap()
Expand All @@ -803,8 +789,7 @@ impl Aggregate {
let arg = dir
.argument("arg")
.map(|arg| Word::from(arg.as_str().unwrap()))
.map(|arg| Arg::new(arg, src_type))
.unwrap_or_else(|| Arg::new(ID.clone(), src_type));
.unwrap_or_else(|| ID.clone());
let cumulative = dir
.argument(kw::CUMULATIVE)
.map(|arg| match arg {
Expand Down Expand Up @@ -861,7 +846,6 @@ impl Aggregation {
.unwrap()
.as_str()
.unwrap();
let src_type = schema.document.get_object_type_definition(source).unwrap();
let source = pool.lookup(source).unwrap();
let fields: Box<[_]> = agg_type
.fields
Expand All @@ -873,9 +857,7 @@ impl Aggregation {
.fields
.iter()
.filter_map(|field| field.find_directive(kw::AGGREGATE).map(|dir| (field, dir)))
.map(|(field, dir)| {
Aggregate::new(schema, src_type, &field.name, &field.field_type, dir)
})
.map(|(field, dir)| Aggregate::new(schema, &field.name, &field.field_type, dir))
.collect();

let obj_types = intervals
Expand Down Expand Up @@ -1675,7 +1657,7 @@ mod validations {
},
prelude::s,
schema::{
input_schema::{kw, AggregateFn, AggregationInterval},
input::{kw, sqlexpr, AggregateFn, AggregationInterval},
FulltextAlgorithm, FulltextLanguage, Schema as BaseSchema, SchemaValidationError,
SchemaValidationError as Err, Strings, SCHEMA_TYPE_NAME,
},
Expand Down Expand Up @@ -2493,46 +2475,56 @@ mod validations {
continue;
}
};
let arg_type = match source.field(arg) {
Some(arg_field) => match arg_field.field_type.value_type() {
Ok(arg_type) if arg_type.is_numeric() => arg_type,
Ok(_) | Err(_) => {
errors.push(Err::AggregationNonNumericArg(
agg_type.name.to_owned(),
field.name.to_owned(),
source.name.to_owned(),
arg.to_owned(),
));
continue;
}
},
None => {
errors.push(Err::AggregationUnknownArg(
let field_type = match field.field_type.value_type() {
Ok(field_type) => field_type,
Err(_) => {
errors.push(Err::NonNumericAggregate(
agg_type.name.to_owned(),
field.name.to_owned(),
arg.to_owned(),
));
continue;
}
};
let field_type = match field.field_type.value_type() {
Ok(field_type) => field_type,
Err(_) => {
errors.push(Err::NonNumericAggregate(
// It would be nicer to use a proper struct here
// and have that implement
// `sqlexpr::ExprVisitor` but we need access to
// a bunch of local variables that would make
// setting up that struct a bit awkward, so we
// use a closure instead
let check_ident = |ident: &str| -> Result<(), SchemaValidationError> {
let arg_type = match source.field(ident) {
Some(arg_field) => match arg_field.field_type.value_type() {
Ok(arg_type) if arg_type.is_numeric() => arg_type,
Ok(_) | Err(_) => {
return Err(Err::AggregationNonNumericArg(
agg_type.name.to_owned(),
field.name.to_owned(),
source.name.to_owned(),
arg.to_owned(),
));
}
},
None => {
return Err(Err::AggregationUnknownArg(
agg_type.name.to_owned(),
field.name.to_owned(),
arg.to_owned(),
));
}
};
if arg_type > field_type {
return Err(Err::AggregationNonMatchingArg(
agg_type.name.to_owned(),
field.name.to_owned(),
arg.to_owned(),
arg_type.to_str().to_owned(),
field_type.to_str().to_owned(),
));
continue;
}
Ok(())
};
if arg_type > field_type {
errors.push(Err::AggregationNonMatchingArg(
agg_type.name.to_owned(),
field.name.to_owned(),
arg.to_owned(),
arg_type.to_str().to_owned(),
field_type.to_str().to_owned(),
));
if let Err(mut errs) = sqlexpr::parse(arg, check_ident) {
errors.append(&mut errs);
}
}
None => {
Expand Down Expand Up @@ -3051,7 +3043,7 @@ type Gravatar @entity {
if errs.iter().any(|err| {
err.to_string().contains(&msg) || format!("{err:?}").contains(&msg)
}) {
println!("{file_name} failed as expected: {errs:?}",)
// println!("{file_name} failed as expected: {errs:?}",)
} else {
let msgs: Vec<_> = errs.iter().map(|err| err.to_string()).collect();
panic!(
Expand All @@ -3060,7 +3052,7 @@ type Gravatar @entity {
}
}
(true, Ok(_)) => {
println!("{file_name} validated as expected")
// println!("{file_name} validated as expected")
}
}
}
Expand All @@ -3074,7 +3066,7 @@ mod tests {
data::store::ID,
prelude::DeploymentHash,
schema::{
input_schema::{POI_DIGEST, POI_OBJECT},
input::{POI_DIGEST, POI_OBJECT},
EntityType,
},
};
Expand Down
Loading
Loading