- Node.js: at least v12.18.2, tested on v14.5.0
- Typescript
- Docker
Because we offload a lot of work to the database, we use Scylla instead of regular Cassandra database. A local Scylla database can be started using docker, see their Docker Hub page and their Best Practices for Running Scylla on Docker page.
Note 1: If, after running docker logs some-scylla | tail
, you errors like:
Could not setup Async I/O: Resource temporarily unavailable. The most common cause is not enough request capacity in /proc/sys/fs/aio-max-nr. Try increasing that number or reducing the amount of logical CPUs available for your application
You'll have to limit the amount of logical cores using --smp
, e.g.:
docker run --name some-scylla --hostname some-scylla -d scylladb/scylla --smp 4
Note 2: By default, this container will consume all available memory. You can restrict its memory consumption during creation using --memory
, e.g.:
docker run --name some-scylla --hostname some-scylla -d scylladb/scylla --memory 8G
Or if the container's already running:
docker update update some-scylla --memory-swap 8G --memory 8G
Start a cqlsh
session:
docker exec -it some-scylla cqlsh
And execute these statements:
DROP KEYSPACE IF EXISTS proto;
CREATE KEYSPACE proto WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE proto.streams (
streamID text,
name text,
timeProperty frozen<list<text>>,
properties text,
status text,
progress text,
PRIMARY KEY (streamID),
);
CREATE TABLE proto.streams_by_name (
streamID text,
name text,
timeProperty frozen<list<text>>,
properties text,
status text,
progress text,
PRIMARY KEY (name),
);
CREATE TABLE proto.fragmentations_by_stream (
streamID text,
name text,
shaclPath frozen<list<text>>,
propertyLabel text,
kind text,
params text,
status text,
PRIMARY KEY (streamID, name)
);
CREATE TABLE proto.roots_by_fragmentation (
streamID text,
fragmentName text,
bucket text,
datatype text,
count counter,
PRIMARY KEY (streamID, fragmentName, bucket, datatype)
);
CREATE TABLE proto.relations_by_fragmentation (
streamID text,
fragmentName text,
bucket text,
nextBucket text,
datatype text,
count counter,
PRIMARY KEY (streamID, fragmentName, bucket, nextBucket, datatype)
);
CREATE TABLE proto.events_by_bucket (
streamID text,
fragmentName text,
bucketValue text,
eventID text,
eventData text,
eventTime timestamp,
PRIMARY KEY ((bucketValue, fragmentName, streamID), eventTime, eventID)
) WITH CLUSTERING ORDER BY (eventTime ASC, eventID ASC);
CREATE TABLE proto.events_by_stream (
streamID text,
eventTime timestamp,
eventID text,
eventData text,
PRIMARY KEY (streamID, eventTime, eventID)
) WITH CLUSTERING ORDER BY (eventTime ASC, eventID ASC);
CREATE TABLE proto.event_levels_by_stream (
streamID text,
eventID text,
eventTime timestamp,
eventLevel text,
PRIMARY KEY ((streamID, eventLevel), eventTime, eventID)
) WITH CLUSTERING ORDER BY (eventTime ASC, eventID ASC);
CREATE TABLE proto.event_levels_by_fragmentation (
streamID text,
fragmentName text,
eventID text,
eventTime timestamp,
eventLevel text,
PRIMARY KEY ((streamID, fragmentName, eventLevel), eventTime, eventID)
) WITH CLUSTERING ORDER BY (eventTime ASC, eventID ASC);
CREATE TABLE proto.state (
key text,
value text,
PRIMARY KEY (key)
);
- Clone this repository wherever you want,
- Run
npm install
to install its dependencies, - Check the Cassandra (i.e., Scylla) credentials in
src/config.ts
- Run
tsc
to compile the Typescript to Javascript - Run
lib/app.js
to start the server
Request:
curl -X GET \
http://localhost:3000/streams \
-H 'cache-control: no-cache'
Response:
[
{
"sourceURI": "https://streams.datapiloten.be/observations",
"name": "observations",
"properties": [
{
"text": "madeBySensor",
"value": [
"http://www.w3.org/ns/sosa/madeBySensor"
]
},
{
"text": "resultTime",
"value": [
"http://www.w3.org/ns/sosa/resultTime"
]
},
{
"text": "hasFeatureOfInterest - asWKT",
"value": [
"http://www.w3.org/ns/sosa/hasFeatureOfInterest",
"http://www.opengis.net/ont/geosparql#asWKT"
]
},
{
"text": "observedProperty",
"value": [
"http://www.w3.org/ns/sosa/observedProperty"
]
},
{
"text": "hasResult - numericValue",
"value": [
"http://www.w3.org/ns/sosa/hasResult",
"http://qudt.org/1.1/schema/qudt#numericValue"
]
},
{
"text": "hasResult - unit",
"value": [
"http://www.w3.org/ns/sosa/hasResult",
"http://qudt.org/1.1/schema/qudt#unit"
]
}
],
"status": "LOADING"
}
]
Request:
curl -X POST \
http://localhost:3000/streams \
-H 'content-type: application/x-www-form-urlencoded' \
-d 'url=https%3A%2F%2Fstreams.datapiloten.be%2Fobservations&name=observations'
Response:
{
"status": "success",
"url": "/streams/observations"
}
Request:
curl -X GET \
http://localhost:3000/streams/observations
Response:
{
"sourceURI": "https://streams.datapiloten.be/observations",
"name": "observations",
"properties": [
{
"text": "madeBySensor",
"value": [
"http://www.w3.org/ns/sosa/madeBySensor"
]
},
{
"text": "resultTime",
"value": [
"http://www.w3.org/ns/sosa/resultTime"
]
},
{
"text": "hasFeatureOfInterest - asWKT",
"value": [
"http://www.w3.org/ns/sosa/hasFeatureOfInterest",
"http://www.opengis.net/ont/geosparql#asWKT"
]
},
{
"text": "observedProperty",
"value": [
"http://www.w3.org/ns/sosa/observedProperty"
]
},
{
"text": "hasResult - numericValue",
"value": [
"http://www.w3.org/ns/sosa/hasResult",
"http://qudt.org/1.1/schema/qudt#numericValue"
]
},
{
"text": "hasResult - unit",
"value": [
"http://www.w3.org/ns/sosa/hasResult",
"http://qudt.org/1.1/schema/qudt#unit"
]
}
],
"status": "LOADING",
"fragmentations": []
}
Request:
curl -X GET \
http://localhost:3000/streams/observations/fragmentations/
Response:
[
{
"kind": "XYZ_TILE",
"streamID": "https://streams.datapiloten.be/observations",
"shaclPath": [
"http://www.opengis.net/ont/geosparql#asWKT"
],
"name": "tiles",
"params": {
"minZoom": "13",
"maxZoom": "15"
},
"status": "LOADING"
}
]
Request:
curl -X POST \
http://localhost:3000/streams/observations/fragmentations \
-H 'content-type: application/x-www-form-urlencoded' \
-d 'name=tiles&property=http%3A%2F%2Fwww.opengis.net%2Font%2Fgeosparql%23asWKT&strategy=XYZ_TILE&minZoom=13&maxZoom=15'
Response:
{
"status": "success",
"url": "/streams/observations/fragmentations/tiles"
}
Request:
curl -X GET \
http://localhost:3000/streams/observations/fragmentations/tiles
Response:
{
"kind": "XYZ_TILE",
"streamID": "https://streams.datapiloten.be/observations",
"shaclPath": [
"http://www.opengis.net/ont/geosparql#asWKT"
],
"name": "tiles",
"params": {
"minZoom": "13",
"maxZoom": "15"
},
"status": "LOADING"
}
Request:
curl -X POST \
http://localhost:3000/streams/observations/fragmentations/tiles/enable \
-H 'content-type: application/x-www-form-urlencoded' \
-d enabled=false
Response:
{
"status": "success"
}