diff --git a/package.json b/package.json index 63630b8e..d44bf2a6 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,7 @@ "npm-run-all": "^4.1.2", "nyc": "^11.3.0", "raw-loader": "^0.5.1", - "rxjs": "^5.5.5", + "rxjs": "^5.5.6", "shelljs": "^0.8.0", "shx": "^0.2.2", "sinon": "^4.1.3", diff --git a/src/operators.ts b/src/operators.ts index 5be6c3d1..0ab94fcb 100644 --- a/src/operators.ts +++ b/src/operators.ts @@ -4,19 +4,3 @@ import 'rxjs/add/observable/from' import 'rxjs/add/observable/fromPromise' import 'rxjs/add/observable/of' import 'rxjs/add/observable/throw' - -// rxjs Operators -import 'rxjs/add/operator/catch' -import 'rxjs/add/operator/combineAll' -import 'rxjs/add/operator/concatMap' -import 'rxjs/add/operator/concatMapTo' -import 'rxjs/add/operator/debounceTime' -import 'rxjs/add/operator/do' -import 'rxjs/add/operator/map' -import 'rxjs/add/operator/mergeMap' -import 'rxjs/add/operator/publishReplay' -import 'rxjs/add/operator/skip' -import 'rxjs/add/operator/skipWhile' -import 'rxjs/add/operator/switchMap' -import 'rxjs/add/operator/reduce' -import 'rxjs/add/operator/take' diff --git a/src/proxy/index.ts b/src/proxy/index.ts index f993914a..671f641e 100644 --- a/src/proxy/index.ts +++ b/src/proxy/index.ts @@ -1,10 +1,3 @@ -import 'rxjs/add/operator/combineAll' -import 'rxjs/add/operator/do' -import 'rxjs/add/operator/map' -import 'rxjs/add/operator/publishReplay' -import 'rxjs/add/operator/switchMap' -import 'rxjs/add/operator/take' - export { QueryToken, SelectorMeta } from '../storage/modules/QueryToken' export { ProxySelector } from '../storage/modules/ProxySelector' export * from '../interface' diff --git a/src/storage/Database.ts b/src/storage/Database.ts index a6a4f1a5..72873459 100644 --- a/src/storage/Database.ts +++ b/src/storage/Database.ts @@ -2,6 +2,9 @@ import { Observable } from 'rxjs/Observable' import { ErrorObservable } from 'rxjs/observable/ErrorObservable' import { Subscription } from 'rxjs/Subscription' import { ConnectableObservable } from 'rxjs/observable/ConnectableObservable' +import { concatMap } from 'rxjs/operators/concatMap' +import { map } from 'rxjs/operators/map' +import { tap } from 'rxjs/operators/tap' import * as lf from 'lovefield' import * as Exception from '../exception' import * as typeDefinition from './helper/definition' @@ -91,27 +94,28 @@ export class Database { } dump() { - return this.database$.concatMap(db => db.export()) + const dump = (db: lf.Database) => db.export() + return this.database$.pipe(concatMap(dump)) } load(data: any) { assert(!this.connected, Exception.DatabaseIsNotEmpty()) - return this.database$ - .concatMap(db => { + const load = (db: lf.Database) => { + forEach(data.tables, (entities: any[], name: string) => { + const schema = this.findSchema(name) + entities.forEach((entity: any) => + this.storedIds.add(fieldIdentifier(name, entity[schema.pk]))) + }) + return db.import(data).catch(() => { forEach(data.tables, (entities: any[], name: string) => { const schema = this.findSchema(name) entities.forEach((entity: any) => - this.storedIds.add(fieldIdentifier(name, entity[schema.pk]))) - }) - return db.import(data).catch(() => { - forEach(data.tables, (entities: any[], name: string) => { - const schema = this.findSchema(name) - entities.forEach((entity: any) => - this.storedIds.delete(fieldIdentifier(name, entity[schema.pk]))) - }) + this.storedIds.delete(fieldIdentifier(name, entity[schema.pk]))) }) }) + } + return this.database$.pipe(concatMap(load)) } insert(tableName: string, raw: T[]): Observable @@ -121,52 +125,54 @@ export class Database { insert(tableName: string, raw: T | T[]): Observable insert(tableName: string, raw: T | T[]): Observable { - return this.database$ - .concatMap(db => { - const schema = this.findSchema(tableName) - const pk = schema.pk - const columnMapper = schema.mapper - const [ table ] = Database.getTables(db, tableName) - const muts: Mutation[] = [] - const entities = clone(raw) - - const iterator = Array.isArray(entities) ? entities : [entities] + const insert = (db: lf.Database) => { + const schema = this.findSchema(tableName) + const pk = schema.pk + const columnMapper = schema.mapper + const [ table ] = Database.getTables(db, tableName) + const muts: Mutation[] = [] + const entities = clone(raw) - iterator.forEach((entity: any) => { - const mut = new Mutation(db, table) - const hiddenPayload = Object.create(null) + const iterator = Array.isArray(entities) ? entities : [entities] - columnMapper.forEach((mapper, key) => { - // cannot create a hidden column for primary key - if (!hasOwn(entity, key) || key === pk) { - return - } + iterator.forEach((entity: any) => { + const mut = new Mutation(db, table) + const hiddenPayload = Object.create(null) - const val = entity[key] - hiddenPayload[key] = mapper(val) - hiddenPayload[hiddenColName(key)] = val - }) + columnMapper.forEach((mapper, key) => { + // cannot create a hidden column for primary key + if (!hasOwn(entity, key) || key === pk) { + return + } - mut.patch({ ...entity, ...hiddenPayload }) - mut.withId(pk, entity[pk]) - muts.push(mut) + const val = entity[key] + hiddenPayload[key] = mapper(val) + hiddenPayload[hiddenColName(key)] = val }) - const { contextIds, queries } = Mutation.aggregate(db, muts, []) - contextIds.forEach(id => this.storedIds.add(id)) - const onError = { error: () => contextIds.forEach(id => this.storedIds.delete(id)) } + mut.patch({ ...entity, ...hiddenPayload }) + mut.withId(pk, entity[pk]) + muts.push(mut) + }) - if (this.inTransaction) { - this.attachTx(onError) - return this.executor(db, queries) - } + const { contextIds, queries } = Mutation.aggregate(db, muts, []) + contextIds.forEach(id => this.storedIds.add(id)) + const onError = { error: () => contextIds.forEach(id => this.storedIds.delete(id)) } - return this.executor(db, queries).do(onError) - }) + if (this.inTransaction) { + this.attachTx(onError) + return this.executor(db, queries) + } + + return this.executor(db, queries).pipe(tap(onError)) + } + return this.database$.pipe(concatMap(insert)) } get(tableName: string, query: Query = {}, mode: JoinMode = JoinMode.imlicit): QueryToken { - const selector$ = this.buildSelector(tableName, query, mode) + const selector$ = this.database$.pipe( + map(db => this.buildSelector(db, tableName, query, mode)) + ) return new QueryToken(selector$) } @@ -181,41 +187,41 @@ export class Database { return Observable.throw(err) } - return this.database$ - .concatMap(db => { - const entity = clone(raw) - const [ table ] = Database.getTables(db, tableName) - const columnMapper = schema!.mapper - const hiddenPayload = Object.create(null) - - columnMapper.forEach((mapper, key) => { - // cannot create a hidden column for primary key - if (!hasOwn(entity, key) || key === schema!.pk) { - return - } + const update = (db: lf.Database) => { + const entity = clone(raw) + const [ table ] = Database.getTables(db, tableName) + const columnMapper = schema!.mapper + const hiddenPayload = Object.create(null) - const val = (entity as any)[key] - hiddenPayload[key] = mapper(val) - hiddenPayload[hiddenColName(key)] = val - }) + columnMapper.forEach((mapper, key) => { + // cannot create a hidden column for primary key + if (!hasOwn(entity, key) || key === schema!.pk) { + return + } - const mut = { ...(entity as any), ...hiddenPayload } - const predicate = createPredicate(table, clause) - const query = predicatableQuery(db, table, predicate!, StatementType.Update) - - forEach(mut, (val, key) => { - const column = table[key] - if (key === schema!.pk) { - warn(`Primary key is not modifiable.`) - } else if (!column) { - warn(`Column: ${key} is not existent on table:${tableName}`) - } else { - query.set(column, val) - } - }) + const val = (entity as any)[key] + hiddenPayload[key] = mapper(val) + hiddenPayload[hiddenColName(key)] = val + }) - return this.executor(db, [query]) + const mut = { ...(entity as any), ...hiddenPayload } + const predicate = createPredicate(table, clause) + const query = predicatableQuery(db, table, predicate!, StatementType.Update) + + forEach(mut, (val, key) => { + const column = table[key] + if (key === schema!.pk) { + warn(`Primary key is not modifiable.`) + } else if (!column) { + warn(`Column: ${key} is not existent on table:${tableName}`) + } else { + query.set(column, val) + } }) + + return this.executor(db, [query]) + } + return this.database$.pipe(concatMap(update)) } delete(tableName: string, clause: Predicate = {}): Observable { @@ -224,35 +230,38 @@ export class Database { return Observable.throw(err) } - return this.database$ - .concatMap(db => { - const [ table ] = Database.getTables(db, tableName) - const column = table[pk!] - const provider = new PredicateProvider(table, clause) - const prefetch = - predicatableQuery(db, table, provider.getPredicate(), StatementType.Select, column) + const deletion = (db: lf.Database): Observable => { + const [ table ] = Database.getTables(db, tableName) + const column = table[pk!] + const provider = new PredicateProvider(table, clause) + const prefetch = + predicatableQuery(db, table, provider.getPredicate(), StatementType.Select, column) + const deleteByScopedIds = (scopedIds: Object[]) => { + const query = predicatableQuery(db, table, provider.getPredicate(), StatementType.Delete) + + scopedIds.forEach((entity) => + this.storedIds.delete(fieldIdentifier(tableName, entity[pk!]))) + + const onError = { + error: () => { + scopedIds.forEach((entity: object) => + this.storedIds.add(fieldIdentifier(tableName, entity[pk!]))) + } + } - return Observable.fromPromise(prefetch.exec()) - .concatMap((scopedIds) => { - const query = predicatableQuery(db, table, provider.getPredicate(), StatementType.Delete) + if (this.inTransaction) { + this.attachTx(onError) + return this.executor(db, [query]) + } - scopedIds.forEach((entity: object) => - this.storedIds.delete(fieldIdentifier(tableName, entity[pk!]))) - const onError = { - error: () => { - scopedIds.forEach((entity: object) => - this.storedIds.add(fieldIdentifier(tableName, entity[pk!]))) - } - } + return this.executor(db, [query]).pipe(tap(onError)) + } - if (this.inTransaction) { - this.attachTx(onError) - return this.executor(db, [query]) - } + return Observable.fromPromise(prefetch.exec()) + .pipe(concatMap(deleteByScopedIds)) + } - return this.executor(db, [query]).do(onError) - }) - }) + return this.database$.pipe(concatMap(deletion)) } upsert(tableName: string, raw: T): Observable @@ -262,7 +271,7 @@ export class Database { upsert(tableName: string, raw: T | T[]): Observable upsert(tableName: string, raw: T | T[]): Observable { - return this.database$.concatMap(db => { + const upsert = (db: lf.Database) => { const sharing = new Map() const insert: Mutation[] = [] const update: Mutation[] = [] @@ -278,11 +287,12 @@ export class Database { return this.executor(db, queries) } - return this.executor(db, queries).do(onError) + return this.executor(db, queries).pipe(tap(onError)) } else { return Observable.of({ result: false, insert: 0, update: 0, delete: 0, select: 0 }) } - }) + } + return this.database$.pipe(concatMap(upsert)) } remove(tableName: string, clause: Clause = {}): Observable { @@ -292,7 +302,7 @@ export class Database { } const disposeHandler = schema!.dispose - return this.database$.concatMap((db) => { + const remove = (db: lf.Database) => { const [ table ] = Database.getTables(db, tableName) const predicate = createPredicate(table, clause.where) @@ -300,40 +310,44 @@ export class Database { const removedIds: any = [] queries.push(predicatableQuery(db, table, predicate!, StatementType.Delete)) - const prefetch = predicatableQuery(db, table, predicate!, StatementType.Select) - return Observable.fromPromise(prefetch.exec()) - .concatMap((rootEntities) => { - rootEntities.forEach(entity => { - removedIds.push(fieldIdentifier(tableName, entity[schema!.pk])) - }) + const removeByRootEntities = (rootEntities: Object[]) => { + rootEntities.forEach(entity => { + removedIds.push(fieldIdentifier(tableName, entity[schema!.pk])) + }) + + const onError = { + error: () => removedIds.forEach((id: string) => this.storedIds.add(id)) + } - const onError = { - error: () => removedIds.forEach((id: string) => this.storedIds.add(id)) + if (disposeHandler) { + const scope = this.createScopedHandler(db, queries, removedIds) + return disposeHandler(rootEntities, scope).pipe( + tap(() => removedIds.forEach((id: string) => this.storedIds.delete(id))), + concatMap(() => { + if (this.inTransaction) { + this.attachTx(onError) + return this.executor(db, queries) + } + return this.executor(db, queries).pipe(tap(onError)) + }) + ) + } else { + removedIds.forEach((id: string) => this.storedIds.delete(id)) + if (this.inTransaction) { + this.attachTx(onError) + return this.executor(db, queries) } + return this.executor(db, queries).pipe(tap(onError)) + } + } - if (disposeHandler) { - const scope = this.createScopedHandler(db, queries, removedIds) - return disposeHandler(rootEntities, scope) - .do(() => removedIds.forEach((id: string) => this.storedIds.delete(id))) - .concatMap(() => { - if (this.inTransaction) { - this.attachTx(onError) - return this.executor(db, queries) - } - return this.executor(db, queries).do(onError) - }) - } else { - removedIds.forEach((id: string) => this.storedIds.delete(id)) - - if (this.inTransaction) { - this.attachTx(onError) - return this.executor(db, queries) - } + const prefetch = predicatableQuery(db, table, predicate!, StatementType.Select) + return Observable.fromPromise(prefetch.exec()).pipe( + concatMap(removeByRootEntities) + ) + } - return this.executor(db, queries).do(onError) - } - }) - }) + return this.database$.pipe(concatMap(remove)) } dispose(): ErrorObservable | Observable { @@ -341,19 +355,20 @@ export class Database { return Observable.throw(Exception.NotConnected()) } - const cleanup = this.database$.map(db => - db.getSchema().tables().map(t => db.delete().from(t))) - - return this.database$.concatMap(db => { - return cleanup.concatMap(queries => this.executor(db, queries)) - .do(() => { + const cleanUp = (db: lf.Database) => { + const deletions = db.getSchema().tables().map(t => db.delete().from(t)) + return this.executor(db, deletions).pipe( + tap(() => { db.close() this.schemas.clear() this.storedIds.clear() this.schemaBuilder = null this.subscription.unsubscribe() }) - }) + ) + } + + return this.database$.pipe(concatMap(cleanUp)) } attachTx(_: TransactionEffects) { @@ -363,20 +378,21 @@ export class Database { executor(db: lf.Database, queries: lf.query.Builder[]) { const tx = db.createTransaction() - return Observable.fromPromise(tx.exec(queries)) - .do(transactionErrorHandler) - .map((ret) => { + return Observable.fromPromise(tx.exec(queries)).pipe( + tap(transactionErrorHandler), + map((ret) => { return { result: true, ...mergeTransactionResult(queries, ret) } }) + ) } transaction(): Observable> { type ProxyProperty = Pick - return this.database$.map(db => { + return this.database$.pipe(map(db => { const tx = db.createTransaction() const transactionQueries: lf.query.Builder[] = [] const effects: TransactionEffects[] = [] @@ -407,14 +423,14 @@ export class Database { const customTx = { commit: () => { return effects.reduce((acc, curr) => { - return acc.do(curr) - }, Observable.from(tx.exec(transactionQueries))) - .map((r) => { + return acc.pipe(tap(curr)) + }, Observable.from(tx.exec(transactionQueries))).pipe( + map((r) => { return { result: true, ...mergeTransactionResult(transactionQueries, r) } - }) + })) }, abort: () => { effects.length = 0 @@ -428,7 +444,7 @@ export class Database { ] return ret - }) + })) } private buildTables() { @@ -518,45 +534,44 @@ export class Database { } private buildSelector( + db: lf.Database, tableName: string, clause: Query, mode: JoinMode ) { - return this.database$.map((db) => { - const schema = this.findSchema(tableName) - const pk = schema.pk - const containFields = !!clause.fields - - const containKey = containFields ? contains(pk, clause.fields!) : true - const fields: Set = containFields ? new Set(clause.fields) : new Set(schema.columns.keys()) - const { table, columns, joinInfo, definition } = - this.traverseQueryFields(db, tableName, fields, containKey, !containFields, [], {}, mode) - const query = - predicatableQuery(db, table!, null, StatementType.Select, ...columns) - - joinInfo.forEach((info: JoinInfo) => - query.leftOuterJoin(info.table, info.predicate)) - - const orderDesc = (clause.orderBy || []).map(desc => { - return { - column: table![desc.fieldName], - orderBy: !desc.orderBy ? null : lf.Order[desc.orderBy] - } - }) - - const matcher = { - pk: { - name: pk, - queried: containKey - }, - definition, - mainTable: table! + const schema = this.findSchema(tableName) + const pk = schema.pk + const containFields = !!clause.fields + + const containKey = containFields ? contains(pk, clause.fields!) : true + const fields: Set = containFields ? new Set(clause.fields) : new Set(schema.columns.keys()) + const { table, columns, joinInfo, definition } = + this.traverseQueryFields(db, tableName, fields, containKey, !containFields, [], {}, mode) + const query = + predicatableQuery(db, table!, null, StatementType.Select, ...columns) + + joinInfo.forEach((info: JoinInfo) => + query.leftOuterJoin(info.table, info.predicate)) + + const orderDesc = (clause.orderBy || []).map(desc => { + return { + column: table![desc.fieldName], + orderBy: !desc.orderBy ? null : lf.Order[desc.orderBy] } - const { limit, skip } = clause - const provider = new PredicateProvider(table!, clause.where) - - return new Selector(db, query, matcher, provider, limit, skip, orderDesc) }) + + const matcher = { + pk: { + name: pk, + queried: containKey + }, + definition, + mainTable: table! + } + const { limit, skip } = clause + const provider = new PredicateProvider(table!, clause.where) + + return new Selector(db, query, matcher, provider, limit, skip, orderDesc) } private createColumn( diff --git a/src/storage/modules/ProxySelector.ts b/src/storage/modules/ProxySelector.ts index 95bb3e2f..ca180509 100644 --- a/src/storage/modules/ProxySelector.ts +++ b/src/storage/modules/ProxySelector.ts @@ -1,4 +1,6 @@ import { Observable } from 'rxjs/Observable' +import { OperatorFunction } from 'rxjs/interfaces' +import { map } from 'rxjs/operators/map' import { Query } from '../../interface' import { mapFn } from './mapFn' @@ -13,13 +15,9 @@ export class ProxySelector { public query: Query, public tableName: string ) { - this.request$ = request$.map((r: T | T[]) => { - if (Array.isArray(r)) { - return r - } else { - return [ r ] - } - }) + this.request$ = request$.pipe( + map(r => Array.isArray(r) ? r : [ r ]) + ) } values() { @@ -30,7 +28,7 @@ export class ProxySelector { return this.mapFn(this.request$) } - map(fn: (stream$: Observable) => Observable) { + map(fn: OperatorFunction) { this.mapFn = fn return this as any as ProxySelector } diff --git a/src/storage/modules/QueryToken.ts b/src/storage/modules/QueryToken.ts index 3db1c8ae..e588d169 100644 --- a/src/storage/modules/QueryToken.ts +++ b/src/storage/modules/QueryToken.ts @@ -1,4 +1,13 @@ import { Observable } from 'rxjs/Observable' +import { OperatorFunction } from 'rxjs/interfaces' +import { combineAll } from 'rxjs/operators/combineAll' +import { map } from 'rxjs/operators/map' +import { publishReplay } from 'rxjs/operators/publishReplay' +import { refCount } from 'rxjs/operators/refCount' +import { skipWhile } from 'rxjs/operators/skipWhile' +import { switchMap } from 'rxjs/operators/switchMap' +import { take } from 'rxjs/operators/take' +import { tap } from 'rxjs/operators/tap' import { Selector } from './Selector' import { ProxySelector } from './ProxySelector' import { assert } from '../../utils/assert' @@ -6,19 +15,25 @@ import { TokenConsumed } from '../../exception/token' export type SelectorMeta = Selector | ProxySelector +const skipWhileProxySelector = + skipWhile(v => v instanceof ProxySelector) as (x: Observable>) => Observable> + export class QueryToken { selector$: Observable> private consumed = false constructor(selector$: Observable>) { - this.selector$ = selector$.publishReplay(1) - .refCount() + this.selector$ = selector$.pipe( + publishReplay(1), + refCount() + ) } - map(fn: (stream$: Observable) => Observable) { - this.selector$ = this.selector$ - .do(selector => (selector as any).map(fn) ) + map(fn: OperatorFunction) { + this.selector$ = this.selector$.pipe( + tap(selector => (selector as any).map(fn)) + ) return this as any as QueryToken } @@ -26,44 +41,48 @@ export class QueryToken { assert(!this.consumed, TokenConsumed()) this.consumed = true - return (this.selector$ as Observable>) - .switchMap(s => s.values()) - .take(1) + return this.selector$.pipe( + switchMap(s => s.values()), + take(1) + ) } changes(): Observable { assert(!this.consumed, TokenConsumed()) this.consumed = true - return (this.selector$ as Observable>) - .switchMap(s => s.changes()) + return this.selector$.pipe( + switchMap(s => s.changes()) + ) } concat(...tokens: QueryToken[]) { tokens.unshift(this) - const newSelector$ = Observable.from(tokens) - .map(token => token.selector$.skipWhile(v => v instanceof ProxySelector)) - .combineAll() - .map((r: Selector[]) => { + const newSelector$ = Observable.from(tokens).pipe( + map(token => token.selector$.pipe(skipWhileProxySelector)), + combineAll>, Selector[]>(), + map((r) => { const first = r.shift() return first!.concat(...r) }) + ) return new QueryToken(newSelector$) } combine(...tokens: QueryToken[]) { tokens.unshift(this) - const newSelector$ = Observable.from(tokens) - .map(token => token.selector$.skipWhile(v => v instanceof ProxySelector)) - .combineAll() - .map((r: Selector[]) => { + const newSelector$ = Observable.from(tokens).pipe( + map(token => token.selector$.pipe(skipWhileProxySelector)), + combineAll>, Selector[]>(), + map((r) => { const first = r.shift() - return first!.combine(...r) + return first!.concat(...r) }) + ) return new QueryToken(newSelector$) } toString() { - return this.selector$.map(r => r.toString()) + return this.selector$.pipe(map(r => r.toString())) } } diff --git a/src/storage/modules/Selector.ts b/src/storage/modules/Selector.ts index f4c0570b..00bdb4f1 100644 --- a/src/storage/modules/Selector.ts +++ b/src/storage/modules/Selector.ts @@ -1,5 +1,14 @@ import { Observer } from 'rxjs/Observer' import { Observable } from 'rxjs/Observable' +import { OperatorFunction } from 'rxjs/interfaces' +import { combineAll } from 'rxjs/operators/combineAll' +import { debounceTime } from 'rxjs/operators/debounceTime' +import { map } from 'rxjs/operators/map' +import { mergeMap } from 'rxjs/operators/mergeMap' +import { publishReplay } from 'rxjs/operators/publishReplay' +import { reduce } from 'rxjs/operators/reduce' +import { refCount } from 'rxjs/operators/refCount' +import { switchMap } from 'rxjs/operators/switchMap' import { async } from 'rxjs/scheduler/async' import * as lf from 'lovefield' import * as Exception from '../../exception' @@ -38,19 +47,21 @@ export class Selector { const fakeQuery = { toSql: identity } // 初始化一个空的 QuerySelector,然后在初始化以后替换它上面的属性和方法 const dist = new Selector(originalToken.db, fakeQuery as any, { } as any) - dist.change$ = Observable.from(metaDatas) - .map(metas => metas.mapFn(metas.change$)) - .combineAll() - .map((r: U[][]) => r.reduce((acc, val) => acc.concat(val))) - .debounceTime(0, async) - .publishReplay(1) - .refCount() + dist.change$ = Observable.from(metaDatas).pipe( + map(metas => metas.mapFn(metas.change$)), + combineAll, U[][]>(), + map(r => r.reduce((acc, val) => acc.concat(val))), + debounceTime(0, async), + publishReplay(1), + refCount() + ) dist.values = () => { assert(!dist.consumed, Exception.TokenConsumed()) dist.consumed = true - return Observable.from(metaDatas) - .flatMap(metaData => metaData.values()) - .reduce((acc: U[], val: U[]) => acc.concat(val)) + return Observable.from(metaDatas).pipe( + mergeMap(metaData => metaData.values()), + reduce((acc, val) => acc.concat(val)) + ) } dist.toString = () => { const querys = metaDatas.map(m => m.toString()) @@ -100,10 +111,11 @@ export class Selector { }) as Observable const changesOnQuery = limit || skip - ? this.buildPrefetchingObserve() - .switchMap((pks) => + ? this.buildPrefetchingObserve().pipe( + switchMap((pks) => observeOn(this.getQuery(this.inPKs(pks))) ) + ) : observeOn(this.getQuery()) return lfIssueFix(changesOnQuery) @@ -222,7 +234,7 @@ export class Selector { return this.mapFn(this.change$) } - map(fn: (stream$: Observable) => Observable) { + map(fn: OperatorFunction) { this.mapFn = fn return this as any as Selector } diff --git a/src/utils/valid.ts b/src/utils/valid.ts index 62826020..e7e0bfc7 100644 --- a/src/utils/valid.ts +++ b/src/utils/valid.ts @@ -1,4 +1,5 @@ import { Observable } from 'rxjs/Observable' +import { skip } from 'rxjs/operators/skip' import { ErrorObservable } from 'rxjs/observable/ErrorObservable' // think it as asynchronous assert @@ -7,5 +8,5 @@ export function valid(condition: any, error: Error): ErrorObservable | Observ return Observable.throw(error) } - return Observable.empty().skip(1) + return Observable.empty().pipe(skip(1)) } diff --git a/yarn.lock b/yarn.lock index 13d2ec46..ab864a59 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3145,7 +3145,7 @@ macaddress@^0.2.8: version "0.2.8" resolved "https://registry.npmjs.org/macaddress/-/macaddress-0.2.8.tgz#5904dc537c39ec6dbefeae902327135fa8511f12" -madge@3.0.0: +madge@^3.0.0: version "3.0.0" resolved "https://registry.npmjs.org/madge/-/madge-3.0.0.tgz#438e107588babfcc42323bc3fe72f7d88d356140" dependencies: @@ -4749,9 +4749,9 @@ ripemd160@^2.0.0, ripemd160@^2.0.1: hash-base "^2.0.0" inherits "^2.0.1" -rxjs@^5.5.5: - version "5.5.5" - resolved "https://registry.npmjs.org/rxjs/-/rxjs-5.5.5.tgz#e164f11d38eaf29f56f08c3447f74ff02dd84e97" +rxjs@^5.5.6: + version "5.5.6" + resolved "https://registry.npmjs.org/rxjs/-/rxjs-5.5.6.tgz#e31fb96d6fd2ff1fd84bcea8ae9c02d007179c02" dependencies: symbol-observable "1.0.1"