Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix blank node correlation (closes #795) #803

Merged
merged 10 commits into from
Mar 29, 2021
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { BlankNodeBindingsScoped } from '@comunica/data-factory';
jacoscaz marked this conversation as resolved.
Show resolved Hide resolved
import type { Bindings, BindingsStream } from '@comunica/types';
import type { AsyncIterator } from 'asynciterator';
import { ArrayIterator, MultiTransformIterator } from 'asynciterator';
Expand Down
1 change: 1 addition & 0 deletions packages/actor-query-operation-construct/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"index.js"
],
"dependencies": {
"@comunica/data-factory": "1.17.0",
"@comunica/types": "^1.19.2",
"@types/rdf-js": "*",
"asynciterator": "^3.0.3",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { Bindings } from '@comunica/bus-query-operation';
import { BlankNodeBindingsScoped } from '@comunica/data-factory';
import { ArrayIterator } from 'asynciterator';
import { DataFactory } from 'rdf-data-factory';

import { BindingsToQuadsIterator } from '../lib/BindingsToQuadsIterator';

const DF = new DataFactory();

const arrayifyStream = require('arrayify-stream');
Expand Down Expand Up @@ -278,6 +280,11 @@ describe('BindingsToQuadsIterator', () => {
expect(BindingsToQuadsIterator.localizeBlankNode(1, DF.blankNode('abc')))
.toEqual(DF.blankNode('abc1'));
});

it('should localize a blank node scoped to a single set of bindings', () => {
expect(BindingsToQuadsIterator.localizeBlankNode(0, new BlankNodeBindingsScoped('abc')))
.toEqual(DF.blankNode('abc0'));
});
});

describe('#localizeQuad', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class ActorQueryOperationExtend extends ActorQueryOperationTypedMediated<
);

const extendKey = termToString(variable);
const config = { ...ActorQueryOperation.getExpressionContext(context, this.mediatorQueryOperation) };
const config = { ...ActorQueryOperation.getAsyncExpressionContext(context, this.mediatorQueryOperation) };
jacoscaz marked this conversation as resolved.
Show resolved Hide resolved
const evaluator = new AsyncEvaluator(expression, config);

// Transform the stream by extending each Bindings with the expression result
Expand Down
2 changes: 1 addition & 1 deletion packages/actor-query-operation-extend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"@comunica/types": "^1.19.2",
"rdf-string": "^1.5.0",
"sparqlalgebrajs": "^2.4.0",
"sparqlee": "^1.6.0"
"sparqlee": "^1.6.2"
},
"peerDependencies": {
"@comunica/bus-query-operation": "^1.4.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ export class ActorQueryOperationFilterSparqlee extends ActorQueryOperationTypedM

public async testOperation(pattern: Algebra.Filter, context: ActionContext): Promise<IActorTest> {
// Will throw error for unsupported operators
const _ = new AsyncEvaluator(pattern.expression,
ActorQueryOperation.getExpressionContext(context, this.mediatorQueryOperation));
const config = { ...ActorQueryOperation.getAsyncExpressionContext(context, this.mediatorQueryOperation) };
const _ = new AsyncEvaluator(pattern.expression, config);
return true;
}

Expand All @@ -30,7 +30,7 @@ export class ActorQueryOperationFilterSparqlee extends ActorQueryOperationTypedM
ActorQueryOperation.validateQueryOutput(output, 'bindings');
const { variables, metadata } = output;

const config = ActorQueryOperation.getExpressionContext(context, this.mediatorQueryOperation);
const config = { ...ActorQueryOperation.getAsyncExpressionContext(context, this.mediatorQueryOperation) };
const evaluator = new AsyncEvaluator(pattern.expression, config);

const transform = async(item: Bindings, next: any, push: (bindings: Bindings) => void): Promise<void> => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"dependencies": {
"@comunica/types": "^1.19.2",
"sparqlalgebrajs": "^2.4.0",
"sparqlee": "^1.6.0"
"sparqlee": "^1.6.2"
},
"peerDependencies": {
"@comunica/bus-query-operation": "^1.4.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/actor-query-operation-group/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"asynciterator": "^3.0.3",
"rdf-string": "^1.5.0",
"sparqlalgebrajs": "^2.4.0",
"sparqlee": "^1.6.0"
"sparqlee": "^1.6.2"
},
"peerDependencies": {
"@comunica/bus-query-operation": "^1.6.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class ActorQueryOperationLeftJoinNestedLoop extends ActorQueryOperationTy
const right = ActorQueryOperation.getSafeBindings(rightRaw);

// TODO: refactor custom handling of pattern.expression. Should be pushed on the bus instead as a filter operation.
const config = { ...ActorQueryOperation.getExpressionContext(context) };
const config = { ...ActorQueryOperation.getAsyncExpressionContext(context) };
const evaluator = pattern.expression ?
new AsyncEvaluator(pattern.expression, config) :
null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"@comunica/types": "^1.19.2",
"asynciterator": "^3.0.3",
"sparqlalgebrajs": "^2.4.0",
"sparqlee": "^1.6.0"
"sparqlee": "^1.6.2"
},
"peerDependencies": {
"@comunica/bus-query-operation": "^1.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export class ActorQueryOperationOrderBySparqlee extends ActorQueryOperationTyped
const output = ActorQueryOperation.getSafeBindings(outputRaw);

const options = { window: this.window };
const sparqleeConfig = { ...ActorQueryOperation.getExpressionContext(context) };
const sparqleeConfig = { ...ActorQueryOperation.getAsyncExpressionContext(context) };
let { bindingsStream } = output;

// Sorting backwards since the first one is the most important therefore should be ordered last.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"asynciterator": "^3.0.3",
"rdf-string": "^1.5.0",
"sparqlalgebrajs": "^2.4.0",
"sparqlee": "^1.6.0"
"sparqlee": "^1.6.2"
},
"devDependencies": {
"@comunica/actor-query-operation-filter-direct": "^1.19.2",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { IActorQueryOperationTypedMediatedArgs } from '@comunica/bus-query-operation';
import { ActorQueryOperation, ActorQueryOperationTypedMediated } from '@comunica/bus-query-operation';
import type { ActionContext, IActorTest } from '@comunica/core';
import { BlankNodeScoped } from '@comunica/data-factory';
import { BlankNodeBindingsScoped } from '@comunica/data-factory';
import type { Bindings, BindingsStream, IActorQueryOperationOutputBindings } from '@comunica/types';
import { DataFactory } from 'rdf-data-factory';
import type * as RDF from 'rdf-js';
import { termToString } from 'rdf-string';
import type { Algebra } from 'sparqlalgebrajs';
const DF = new DataFactory();
Expand Down Expand Up @@ -57,12 +58,15 @@ export class ActorQueryOperationProject extends ActorQueryOperationTypedMediated
bindingsStream = bindingsStream.transform({
map(bindings: Bindings) {
blankNodeCounter++;
const scopedBlankNodesCache = new Map<string, RDF.BlankNode>();
return <Bindings> bindings.map(term => {
if (term && term.termType === 'BlankNode') {
if (term instanceof BlankNodeScoped) {
return new BlankNodeScoped(`${term.value}${blankNodeCounter}`, term.skolemized);
jacoscaz marked this conversation as resolved.
Show resolved Hide resolved
if (term instanceof BlankNodeBindingsScoped) {
let scopedBlankNode = scopedBlankNodesCache.get(term.value);
if (!scopedBlankNode) {
scopedBlankNode = DF.blankNode(`${term.value}${blankNodeCounter}`);
scopedBlankNodesCache.set(term.value, scopedBlankNode);
}
return DF.blankNode(`${term.value}${blankNodeCounter}`);
return scopedBlankNode;
}
return term;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ActorQueryOperation, Bindings } from '@comunica/bus-query-operation';
import { Bus } from '@comunica/core';
import { BlankNodeScoped } from '@comunica/data-factory';
import { BlankNodeBindingsScoped, BlankNodeScoped } from '@comunica/data-factory';
import type { IActorQueryOperationOutputBindings } from '@comunica/types';
import { ArrayIterator, SingletonIterator } from 'asynciterator';
import { DataFactory } from 'rdf-data-factory';
Expand Down Expand Up @@ -110,9 +110,9 @@ describe('ActorQueryOperationProject', () => {
expect(output.type).toEqual('bindings');
expect(output.canContainUndefs).toEqual(true);
expect(await arrayifyStream(output.bindingsStream)).toEqual([
Bindings({ '?a': DF.blankNode('a1'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a2'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a3'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a'), '?b': DF.literal('b') }),
]);
});
});
Expand All @@ -137,9 +137,65 @@ describe('ActorQueryOperationProject', () => {
expect(output.type).toEqual('bindings');
expect(output.canContainUndefs).toEqual(true);
expect(await arrayifyStream(output.bindingsStream)).toEqual([
Bindings({ '?a': new BlankNodeScoped('a1', DF.namedNode('A')), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeScoped('a2', DF.namedNode('B')), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeScoped('a3', DF.namedNode('C')), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeScoped('a', DF.namedNode('A')), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeScoped('a', DF.namedNode('B')), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeScoped('a', DF.namedNode('C')), '?b': DF.literal('b') }),
]);
});
});

it('should run on a stream with binding-scoped blank nodes across bindings', () => {
mediatorQueryOperation.mediate = (arg: any) => Promise.resolve({
bindingsStream: new ArrayIterator([
Bindings({ '?a': new BlankNodeBindingsScoped('a'), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeBindingsScoped('a'), '?b': DF.literal('b') }),
Bindings({ '?a': new BlankNodeBindingsScoped('a'), '?b': DF.literal('b') }),
]),
metadata: () => 'M',
operated: arg,
type: 'bindings',
variables: [ '?a' ],
canContainUndefs: true,
});
const op = { operation: { type: 'project', input: 'in', variables: [ DF.variable('a') ]}};
return actor.run(op).then(async(output: IActorQueryOperationOutputBindings) => {
expect((<any> output).metadata()).toEqual('M');
expect(output.variables).toEqual([ '?a' ]);
expect(output.type).toEqual('bindings');
expect(output.canContainUndefs).toEqual(true);
expect(await arrayifyStream(output.bindingsStream)).toEqual([
Bindings({ '?a': DF.blankNode('a1'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a2'), '?b': DF.literal('b') }),
Bindings({ '?a': DF.blankNode('a3'), '?b': DF.literal('b') }),
]);
});
});

it('should run on a stream with binding-scoped blank nodes within a single bindings object', () => {
mediatorQueryOperation.mediate = (arg: any) => Promise.resolve({
bindingsStream: new ArrayIterator([
Bindings({
'?a': new BlankNodeBindingsScoped('a'),
'?b': new BlankNodeBindingsScoped('a'),
}),
]),
metadata: () => 'M',
operated: arg,
type: 'bindings',
variables: [ '?a' ],
canContainUndefs: true,
});
const op = { operation: { type: 'project', input: 'in', variables: [ DF.variable('a') ]}};
return actor.run(op).then(async(output: IActorQueryOperationOutputBindings) => {
expect((<any> output).metadata()).toEqual('M');
expect(output.variables).toEqual([ '?a' ]);
expect(output.type).toEqual('bindings');
expect(output.canContainUndefs).toEqual(true);
expect(await arrayifyStream(output.bindingsStream)).toEqual([
Bindings({
'?a': DF.blankNode('a1'),
'?b': DF.blankNode('a1'),
}),
]);
});
});
Expand Down
70 changes: 53 additions & 17 deletions packages/bus-query-operation/lib/ActorQueryOperation.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { KeysInitSparql, KeysQueryOperation } from '@comunica/context-entries';
import type { ActionContext, IActorArgs, IActorTest, Mediator } from '@comunica/core';
import { Actor } from '@comunica/core';
import { BlankNodeBindingsScoped } from '@comunica/data-factory';
import type {
IActionQueryOperation,
IActorQueryOperationOutput,
Expand All @@ -11,6 +12,7 @@ import type {
Bindings,
PatternBindings,
} from '@comunica/types';
import type * as RDF from 'rdf-js';
import type { Algebra } from 'sparqlalgebrajs';
import { materializeOperation } from './Bindings';

Expand Down Expand Up @@ -95,6 +97,14 @@ export const KEY_CONTEXT_BASEIRI = KeysInitSparql.baseIRI;
*/
export const KEY_CONTEXT_QUERY_TIMESTAMP = KeysInitSparql.queryTimestamp;

/**
* A counter that keeps track blank node generated through BNODE() SPARQL
* expressions.
*
* @type {number}
*/
let bnodeCounter = 0;

/**
* A comunica actor for query-operation events.
*
Expand Down Expand Up @@ -167,29 +177,47 @@ export abstract class ActorQueryOperation extends Actor<IActionQueryOperation, I
}
}

protected static getBaseExpressionContext(context: ActionContext): IBaseExpressionContext {
if (context) {
const now: Date = context.get(KeysInitSparql.queryTimestamp);
const baseIRI: string = context.get(KeysInitSparql.baseIRI);
return { now, baseIRI };
}
return {};
}

/**
* Create an options object that can be used to construct a sparqlee evaluator.
* Create an options object that can be used to construct a sparqlee synchronous evaluator.
* @param context An action context.
* @param mediatorQueryOperation An optional query query operation mediator.
* If defined, the existence resolver will be defined as `exists`.
*/
public static getExpressionContext(context: ActionContext, mediatorQueryOperation?: Mediator<
Actor<IActionQueryOperation, IActorTest, IActorQueryOperationOutput>,
IActionQueryOperation, IActorTest, IActorQueryOperationOutput>): IExpressionContext {
if (context) {
const now: Date = context.get(KeysInitSparql.queryTimestamp);
const baseIRI: string = context.get(KeysInitSparql.baseIRI);
return {
now,
baseIRI,
...mediatorQueryOperation ?
{
exists: ActorQueryOperation.createExistenceResolver(context, mediatorQueryOperation),
} :
{},
};
return {
...this.getBaseExpressionContext(context),
bnode: (input?: string) => new BlankNodeBindingsScoped(input || `BNODE_${bnodeCounter++}`),
};
}

/**
* Create an options object that can be used to construct a sparqlee asynchronous evaluator.
* @param context An action context.
* @param mediatorQueryOperation An optional query query operation mediator.
* If defined, the existence resolver will be defined as `exists`.
*/
public static getAsyncExpressionContext(context: ActionContext, mediatorQueryOperation?: Mediator<
Actor<IActionQueryOperation, IActorTest, IActorQueryOperationOutput>,
IActionQueryOperation, IActorTest, IActorQueryOperationOutput>): IAsyncExpressionContext {
const expressionContext: IAsyncExpressionContext = {
...this.getBaseExpressionContext(context),
bnode: (input?: string) => Promise.resolve(new BlankNodeBindingsScoped(input || `BNODE_${bnodeCounter++}`)),
};
if (context && mediatorQueryOperation) {
expressionContext.exists = ActorQueryOperation.createExistenceResolver(context, mediatorQueryOperation);
}
return {};
return expressionContext;
}

/**
Expand Down Expand Up @@ -238,9 +266,17 @@ export function getMetadata(actionOutput: IActorQueryOperationOutputStream): Pro
return actionOutput.metadata();
}

export interface IExpressionContext {
interface IBaseExpressionContext {
now?: Date;
baseIRI?: string;
// Exists?: (expr: Algebra.ExistenceExpression, bindings: Bindings) => Promise<boolean>;
// bnode?: (input?: string) => Promise<RDF.BlankNode>;
}

// TODO: rename to ISyncExpressionContext in next major version
export interface IExpressionContext extends IBaseExpressionContext {
bnode: (input?: string | undefined) => RDF.BlankNode;
}

export interface IAsyncExpressionContext extends IBaseExpressionContext {
bnode: (input?: string | undefined) => Promise<RDF.BlankNode>;
exists?: (expr: Algebra.ExistenceExpression, bindings: Bindings) => Promise<boolean>;
}
1 change: 1 addition & 0 deletions packages/bus-query-operation/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
],
"dependencies": {
"@comunica/context-entries": "^1.0.0",
"@comunica/data-factory": "^1.17.0",
"@comunica/types": "^1.19.2",
"@types/rdf-js": "*",
"asynciterator": "^3.0.3",
Expand Down
Loading