Skip to content

Commit

Permalink
many improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Stefan Bittmann committed Nov 20, 2021
1 parent bb09090 commit 915c91a
Show file tree
Hide file tree
Showing 26 changed files with 647 additions and 344 deletions.
280 changes: 128 additions & 152 deletions lib/CodeInterpreter.js

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions lib/Database.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Table from "./Table.js";
import Backup from "./Backup.js";
import Events from "./Events.js"
import { open } from 'lmdb-store'
import { open } from 'lmdb'
import fs from "fs/promises";
import { fileURLToPath } from 'url';
import path from "path"
Expand Down Expand Up @@ -48,7 +48,7 @@ export default class Database extends Events {
return this._db.get("meta");
}

#loadedTables;
#loadedTables = {};
table(name) {
if(!name) {
throw new Error("no tableName provided");
Expand Down Expand Up @@ -165,8 +165,6 @@ export default class Database extends Events {
for(let i = 0; i < this.#plugins.length; i++) {
await this.#plugins[i].start(this);
}

this.#loadedTables = {};
}

get backup() {
Expand Down
21 changes: 15 additions & 6 deletions lib/Query.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export default class Query extends Readable {
this.#stream.on("data", (data) => {
this.push(data);
})
this.#stream.on("error", (err) => {
this.destroy(err);
})
this.#stream.on("end", () => {
this.push(null)
})
Expand All @@ -55,22 +58,28 @@ export default class Query extends Readable {
return this.#stream.query;
}

async then(cb) {
async then(result, error) {
let recordSet = [];

recordSet.getQuery = () => {
return this.#stream.query;
}

for await (let row of this) {
recordSet.push(row);
}

try {
for await (let row of this) {
recordSet.push(row);
}
} catch (e) {
error(e)
return;
}

if(this.#reduced) {
cb(recordSet[0])
result(recordSet[0])
}


cb(recordSet);
result(recordSet);
}
}
15 changes: 10 additions & 5 deletions lib/Query/Filter.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ import VirtualMaschine from "../VirtualMaschine.js";
export default class Filter extends Transform {
#vm;

constructor(query, context) {
constructor(query, context, virtualDB) {
super({readableObjectMode: true, writableObjectMode: true});
this.#vm = new VirtualMaschine(query, context)
this.#vm = new VirtualMaschine(query, context, virtualDB)
}

_transform(data, _, cb) {
let result = this.#vm.run(data);
if(result) {
this.push(data)
try{
let result = this.#vm.run(data);
if(result) {
this.push(data)
}
}
catch(e) {
this.destroy(e)
}

cb();
Expand Down
15 changes: 10 additions & 5 deletions lib/Query/Map.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@ export default class Map extends Transform {
#vm;

#index = 0;
constructor(query, context) {
constructor(query, context, virtualDB) {
super({readableObjectMode: true, writableObjectMode: true});
this.#vm = new VirtualMaschine(query, context)
this.#vm = new VirtualMaschine(query, context, virtualDB)
}

_transform(data, _, cb) {
this.push(this.#vm.run(data, this.#index))
this.#index++;
cb();
try {
this.push(this.#vm.run(data, this.#index))
this.#index++;
cb();
}
catch(e) {
this.destroy(e)
}
}

_flush(cb) {
Expand Down
13 changes: 9 additions & 4 deletions lib/Query/Reduce.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ export default class Reduce extends Transform {
#vm
#val;

constructor(query, context, initVal) {
constructor(query, context, initVal, virtualDB) {
super({readableObjectMode: true, writableObjectMode: true});
this.#vm = new VirtualMaschine(query, context);
this.#vm = new VirtualMaschine(query, context, virtualDB);

this.#val = initVal;
}

_transform(data, _, cb) {
this.#val = this.#vm.run(this.#val, data)
cb();
try {
this.#val = this.#vm.run(this.#val, data)
cb();
} catch(e) {
this.destroy(e)
}

}

_flush(cb) {
Expand Down
20 changes: 12 additions & 8 deletions lib/Query/Sort.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ export default class Sort extends Transform {

#array = [];

constructor(query, context) {
constructor(query, context, virtualDB) {
super({readableObjectMode: true, writableObjectMode: true});
this.#vm = new VirtualMaschine(query, context);
this.#vm = new VirtualMaschine(query, context, virtualDB);
}

_transform(data, _, cb) {
Expand All @@ -17,12 +17,16 @@ export default class Sort extends Transform {
}

_flush(cb) {
let sortedArray = this.#array.sort((a, b) => this.#vm.run(a, b));
let length = sortedArray.length;
for (let i = 0; i < length; i++) {
this.push(sortedArray[i]);
try {
let sortedArray = this.#array.sort((a, b) => this.#vm.run(a, b));
let length = sortedArray.length;
for (let i = 0; i < length; i++) {
this.push(sortedArray[i]);
}
this.push(null);
cb();
} catch(e) {
this.destroy(e)
}
this.push(null);
cb();
}
}
22 changes: 11 additions & 11 deletions lib/Table.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { nanoid } from "nanoid/async";
import * as utils from "./utils.js"
import { tableKey, indexKey } from "./utils.js"
import Events from "./Events.js"
import Query from "./Query.js"

Expand Down Expand Up @@ -29,19 +29,19 @@ export default class Table extends Events {
return this.#meta;
}

#databaseKey(id) {
return utils.tableKey(this.#meta.name, id)
#tableKey(id) {
return tableKey(this.#meta.name, id)
}

#indexKey(indexName, value, id) {
return utils.indexKey(this.#meta.name, indexName, value, id)
return indexKey(this.#meta.name, indexName, value, id)
}

async get(id) {
let before = await this.emitBefore("get", {id})
let data = await this.#db.get(this.#databaseKey(id));
let data = await this.#db.get(this.#tableKey(id));
let after = await this.emitAfter("get", {id, data})
return data || null;
return data;
}

async ensureIndex(name) {
Expand Down Expand Up @@ -81,7 +81,7 @@ export default class Table extends Events {
dbname: this.#dbname,
cache: this.#cache,
});

let result = null;
for await (let row of query) {
query.destroy();
Expand Down Expand Up @@ -121,7 +121,7 @@ export default class Table extends Events {

if(data._id) {
hadOld = true;
old = this.#db.get(this.#databaseKey(data._id));
old = this.#db.get(this.#tableKey(data._id));
}

for(let prop in this.meta.indexes) {
Expand All @@ -143,7 +143,7 @@ export default class Table extends Events {
}
}

this.#db.put(this.#databaseKey(id), data)
this.#db.put(this.#tableKey(id), data)
return id;
})
let after = await this.emitAfter("save", {id, data: {_id, ...data}})
Expand All @@ -154,7 +154,7 @@ export default class Table extends Events {
let before = await this.emitBefore("remove", {id})
let old;
await this.#db.transactionAsync(() => {
old = this.#db.get(this.#databaseKey(id));
old = this.#db.get(this.#tableKey(id));
if(old) {
for(let prop in this.meta.indexes) {
let val = old[prop];
Expand All @@ -163,7 +163,7 @@ export default class Table extends Events {
}
}
}
this.#db.remove(this.#databaseKey(id));
this.#db.remove(this.#tableKey(id));
})
let after = await this.emitAfter("remove", { id, data: old })
}
Expand Down
17 changes: 0 additions & 17 deletions lib/TableIndex.js

This file was deleted.

82 changes: 49 additions & 33 deletions lib/Thread.js
Original file line number Diff line number Diff line change
@@ -1,54 +1,69 @@
import { expose } from "threads/worker"
import { Observable } from "observable-fns"
import CodeInterpreter from "./CodeInterpreter.js"
import { open } from 'lmdb-store'

import { open } from 'lmdb'
import path from "path"
import { Readable } from "stream"
import CodeInterpreter from "./CodeInterpreter.js"
import * as utils from "./utils.js"
import Filter from "./Query/Filter.js"
import Sort from "./Query/Sort.js"
import aMap from "./Query/Map.js"
import Reduce from "./Query/Reduce.js"
import VirtualDatabase from "./Virtual/VirtualDatabase.js"

let db;
let virtualDB;

expose({
query({query, context, meta, path, dbname, cache, actions = []}) {
startDb(path, dbname, cache);
return new Observable((observer) => {
let cp = new CodeInterpreter(query, context, db, meta);

let result = cp.interprete();
for(let i = 0; i < actions.length; i++) {
switch (actions[i].type) {
case utils.actionTypes.FILTER:
result = result.pipe(new Filter(actions[i].data.query, actions[i].data.context))
break;
case utils.actionTypes.SORT:
result = result.pipe(new Sort(actions[i].data.query, actions[i].data.context))
break;
case utils.actionTypes.MAP:
result = result.pipe(new aMap(actions[i].data.query, actions[i].data.context))
break;
case utils.actionTypes.REDUCE:
result = result.pipe(new Reduce(actions[i].data.query, actions[i].data.context, actions[i].data.initVal))
break;
try {
let cp = new CodeInterpreter(query, context, db, meta, virtualDB);
let cpInt = cp.interprete();
let result = Readable.from(cpInt);
result.query = cpInt.query;

for(let i = 0; i < actions.length; i++) {
switch (actions[i].type) {
case utils.actionTypes.FILTER:
result = result.pipe(new Filter(actions[i].data.query, actions[i].data.context, virtualDB))
break;
case utils.actionTypes.SORT:
result = result.pipe(new Sort(actions[i].data.query, actions[i].data.context, virtualDB))
break;
case utils.actionTypes.MAP:
result = result.pipe(new aMap(actions[i].data.query, actions[i].data.context, virtualDB))
break;
case utils.actionTypes.REDUCE:
result = result.pipe(new Reduce(actions[i].data.query, actions[i].data.context, actions[i].data.initVal, virtualDB))
break;
}
}
}
result.on("error", (e) => {
observer.next({error: { message: e.message }});
})
result.on("data", (data) => {
if(data === Object(data)) {
observer.next({data: {... data}, query: result.query});
return;
}
observer.next({data: data, query: result.query});

});
result.on("end", () => {
if(cpInt.error) {
observer.next({error: { message: cpInt.error.message }});
}
observer.complete();
})

result.on("data", (data) => {
if(data === Object(data)) {
observer.next({data: {... data}, query: result.query});
return;
return () => {
result.close();
}
observer.next({data: data, query: result.query});

});
result.on("end", () => {
observer.complete();
})

return () => {
result.close();
} catch (e) {
observer.next({error: { message: e.message }});
}
})
},
Expand Down Expand Up @@ -90,5 +105,6 @@ function startDb(dbpath, name, cache) {
path: path.join(dbpath, name),
cache: cache
});
virtualDB = new VirtualDatabase(db);
}
}
Loading

0 comments on commit 915c91a

Please sign in to comment.