Skip to content

Commit

Permalink
Implement LRU Reference Delegate for Memory Persistence (#1237)
Browse files Browse the repository at this point in the history
* Implement lru reference delegate for memory persistence
  • Loading branch information
Greg Soltis authored Sep 21, 2018
1 parent 80733ab commit 7f03511
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 34 deletions.
1 change: 1 addition & 0 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ export class IndexedDbMutationQueue implements MutationQueue {
this.removeCachedMutationKeys(batch.batchId);
if (this.garbageCollector !== null) {
for (const key of removedDocuments) {
// TODO(gsoltis): tell reference delegate that mutation was ack'd
this.garbageCollector.addPotentialGarbageKey(key);
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/src/local/memory_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ export class MemoryMutationQueue implements MutationQueue {
if (this.garbageCollector !== null) {
this.garbageCollector.addPotentialGarbageKey(key);
}
// TODO(gsoltis): tell reference delegate that mutation was ack'd

const ref = new DocReference(key, batch.batchId);
references = references.delete(ref);
Expand Down
181 changes: 173 additions & 8 deletions packages/firestore/src/local/memory_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@
*/

import { User } from '../auth/user';
import { DocumentKey } from '../model/document_key';
import { debug } from '../util/log';
import { ObjectMap } from '../util/obj_map';
import { encode } from './encoded_resource_path';
import {
ActiveTargets,
LruDelegate,
LruGarbageCollector
} from './lru_garbage_collector';

import { MemoryMutationQueue } from './memory_mutation_queue';
import { MemoryQueryCache } from './memory_query_cache';
Expand All @@ -24,14 +32,16 @@ import { MutationQueue } from './mutation_queue';
import {
Persistence,
PersistenceTransaction,
PrimaryStateListener
PrimaryStateListener,
ReferenceDelegate
} from './persistence';
import { PersistencePromise } from './persistence_promise';
import { QueryCache } from './query_cache';
import { RemoteDocumentCache } from './remote_document_cache';
import { QueryData } from './query_data';
import { ReferenceSet } from './reference_set';
import { ClientId } from './shared_client_state';
import { ListenSequenceNumber } from '../core/types';
import { ListenSequence } from '../core/listen_sequence';
import * as obj from '../util/obj';

const LOG_TAG = 'MemoryPersistence';

Expand All @@ -49,23 +59,39 @@ export class MemoryPersistence implements Persistence {
*/
private mutationQueues: { [user: string]: MutationQueue } = {};
private remoteDocumentCache = new MemoryRemoteDocumentCache();
private queryCache = new MemoryQueryCache();
private readonly queryCache: MemoryQueryCache; // = new MemoryQueryCache();
private readonly listenSequence = new ListenSequence(0);

private _started = false;

// TODO(gsoltis): remove option to be null once eager delegate is implemented.
private _referenceDelegate: ReferenceDelegate | null;

static createLruPersistence(clientId: ClientId): MemoryPersistence {
const persistence = new MemoryPersistence(clientId);
persistence._referenceDelegate = new MemoryLruDelegate(persistence);
return persistence;
}

constructor(private readonly clientId: ClientId) {
this._started = true;
this.queryCache = new MemoryQueryCache(this);
}

async shutdown(deleteData?: boolean): Promise<void> {
shutdown(deleteData?: boolean): Promise<void> {
// No durable state to ensure is closed on shutdown.
this._started = false;
return Promise.resolve();
}

get started(): boolean {
return this._started;
}

get referenceDelegate(): ReferenceDelegate {
return this._referenceDelegate!;
}

async getActiveClients(): Promise<ClientId[]> {
return [this.clientId];
}
Expand All @@ -90,11 +116,11 @@ export class MemoryPersistence implements Persistence {
return queue;
}

getQueryCache(): QueryCache {
getQueryCache(): MemoryQueryCache {
return this.queryCache;
}

getRemoteDocumentCache(): RemoteDocumentCache {
getRemoteDocumentCache(): MemoryRemoteDocumentCache {
return this.remoteDocumentCache;
}

Expand All @@ -107,9 +133,20 @@ export class MemoryPersistence implements Persistence {
): Promise<T> {
debug(LOG_TAG, 'Starting transaction:', action);
return transactionOperation(
new MemoryTransaction(ListenSequence.INVALID)
new MemoryTransaction(this.listenSequence.next())
).toPromise();
}

mutationQueuesContainKey(
transaction: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<boolean> {
return PersistencePromise.or(
obj
.values(this.mutationQueues)
.map(queue => () => queue.containsKey(transaction, key))
);
}
}

/**
Expand All @@ -119,3 +156,131 @@ export class MemoryPersistence implements Persistence {
export class MemoryTransaction implements PersistenceTransaction {
constructor(readonly currentSequenceNumber: ListenSequenceNumber) {}
}

export class MemoryLruDelegate implements ReferenceDelegate, LruDelegate {
private additionalReferences: ReferenceSet | null;
private orphanedSequenceNumbers: ObjectMap<
DocumentKey,
ListenSequenceNumber
> = new ObjectMap(k => encode(k.path));

readonly garbageCollector: LruGarbageCollector;

constructor(private readonly persistence: MemoryPersistence) {
this.garbageCollector = new LruGarbageCollector(this);
}

forEachTarget(
txn: PersistenceTransaction,
f: (q: QueryData) => void
): PersistencePromise<void> {
return this.persistence.getQueryCache().forEachTarget(txn, f);
}

getTargetCount(txn: PersistenceTransaction): PersistencePromise<number> {
return this.persistence.getQueryCache().getTargetCount(txn);
}

forEachOrphanedDocumentSequenceNumber(
txn: PersistenceTransaction,
f: (sequenceNumber: ListenSequenceNumber) => void
): PersistencePromise<void> {
this.orphanedSequenceNumbers.forEach((_, sequenceNumber) =>
f(sequenceNumber)
);
return PersistencePromise.resolve();
}

setInMemoryPins(inMemoryPins: ReferenceSet): void {
this.additionalReferences = inMemoryPins;
}

removeTargets(
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber,
activeTargetIds: ActiveTargets
): PersistencePromise<number> {
return this.persistence
.getQueryCache()
.removeTargets(txn, upperBound, activeTargetIds);
}

removeOrphanedDocuments(
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber
): PersistencePromise<number> {
let count = 0;
const cache = this.persistence.getRemoteDocumentCache();
const p = cache.forEachDocumentKey(txn, key => {
return this.isPinned(txn, key, upperBound).next(isPinned => {
if (isPinned) {
return PersistencePromise.resolve();
} else {
count++;
return cache.removeEntry(txn, key);
}
});
});
return p.next(() => count);
}

removeMutationReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
this.orphanedSequenceNumbers.set(key, txn.currentSequenceNumber);
return PersistencePromise.resolve();
}

removeTarget(
txn: PersistenceTransaction,
queryData: QueryData
): PersistencePromise<void> {
const updated = queryData.copy({
sequenceNumber: txn.currentSequenceNumber
});
return this.persistence.getQueryCache().updateQueryData(txn, updated);
}

addReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
this.orphanedSequenceNumbers.set(key, txn.currentSequenceNumber);
return PersistencePromise.resolve();
}

removeReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
this.orphanedSequenceNumbers.set(key, txn.currentSequenceNumber);
return PersistencePromise.resolve();
}

updateLimboDocument(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
this.orphanedSequenceNumbers.set(key, txn.currentSequenceNumber);
return PersistencePromise.resolve();
}

private isPinned(
txn: PersistenceTransaction,
key: DocumentKey,
upperBound: ListenSequenceNumber
): PersistencePromise<boolean> {
return PersistencePromise.or([
() => this.persistence.mutationQueuesContainKey(txn, key),
() => this.additionalReferences!.containsKey(txn, key),
() => this.persistence.getQueryCache().containsKey(txn, key),
() => {
const orphanedAt = this.orphanedSequenceNumbers.get(key);
return PersistencePromise.resolve(
orphanedAt !== undefined && orphanedAt > upperBound
);
}
]);
}
}
56 changes: 54 additions & 2 deletions packages/firestore/src/local/memory_query_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import { DocumentKey } from '../model/document_key';
import { ObjectMap } from '../util/obj_map';

import { GarbageCollector } from './garbage_collector';
import { ActiveTargets } from './lru_garbage_collector';
import { MemoryPersistence } from './memory_persistence';
import { PersistenceTransaction } from './persistence';
import { PersistencePromise } from './persistence_promise';
import { QueryCache } from './query_cache';
Expand Down Expand Up @@ -52,6 +54,20 @@ export class MemoryQueryCache implements QueryCache {

private targetIdGenerator = TargetIdGenerator.forQueryCache();

constructor(private readonly persistence: MemoryPersistence) {}

getTargetCount(txn: PersistenceTransaction): PersistencePromise<number> {
return PersistencePromise.resolve(this.targetCount);
}

forEachTarget(
txn: PersistenceTransaction,
f: (q: QueryData) => void
): PersistencePromise<void> {
this.queries.forEach((_, queryData) => f(queryData));
return PersistencePromise.resolve();
}

getLastRemoteSnapshotVersion(
transaction: PersistenceTransaction
): PersistencePromise<SnapshotVersion> {
Expand Down Expand Up @@ -134,6 +150,28 @@ export class MemoryQueryCache implements QueryCache {
return PersistencePromise.resolve();
}

removeTargets(
transaction: PersistenceTransaction,
upperBound: ListenSequenceNumber,
activeTargetIds: ActiveTargets
): PersistencePromise<number> {
let count = 0;
const removals: Array<PersistencePromise<void>> = [];
this.queries.forEach((key, queryData) => {
if (
queryData.sequenceNumber <= upperBound &&
!activeTargetIds[queryData.targetId]
) {
this.queries.delete(key);
removals.push(
this.removeMatchingKeysForTargetId(transaction, queryData.targetId)
);
count++;
}
});
return PersistencePromise.waitFor(removals).next(() => count);
}

getQueryCount(
transaction: PersistenceTransaction
): PersistencePromise<number> {
Expand Down Expand Up @@ -163,7 +201,14 @@ export class MemoryQueryCache implements QueryCache {
targetId: TargetId
): PersistencePromise<void> {
this.references.addReferences(keys, targetId);
return PersistencePromise.resolve();
const referenceDelegate = this.persistence.referenceDelegate;
const promises: Array<PersistencePromise<void>> = [];
if (referenceDelegate) {
keys.forEach(key => {
promises.push(referenceDelegate.addReference(txn, key));
});
}
return PersistencePromise.waitFor(promises);
}

removeMatchingKeys(
Expand All @@ -172,7 +217,14 @@ export class MemoryQueryCache implements QueryCache {
targetId: TargetId
): PersistencePromise<void> {
this.references.removeReferences(keys, targetId);
return PersistencePromise.resolve();
const referenceDelegate = this.persistence.referenceDelegate;
const promises: Array<PersistencePromise<void>> = [];
if (referenceDelegate) {
keys.forEach(key => {
promises.push(referenceDelegate.removeReference(txn, key));
});
}
return PersistencePromise.waitFor(promises);
}

removeMatchingKeysForTargetId(
Expand Down
11 changes: 11 additions & 0 deletions packages/firestore/src/local/memory_remote_document_cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
return PersistencePromise.resolve(results);
}

forEachDocumentKey(
transaction: PersistenceTransaction,
f: (key: DocumentKey) => PersistencePromise<void>
): PersistencePromise<void> {
const promises: Array<PersistencePromise<void>> = [];
this.docs.forEach(key => {
promises.push(f(key));
});
return PersistencePromise.waitFor(promises);
}

getNewDocumentChanges(
transaction: PersistenceTransaction
): PersistencePromise<MaybeDocumentMap> {
Expand Down
2 changes: 2 additions & 0 deletions packages/firestore/src/local/persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ export interface Persistence {
*/
readonly started: boolean;

readonly referenceDelegate: ReferenceDelegate;

/**
* Releases any resources held during eager shutdown.
*
Expand Down
Loading

0 comments on commit 7f03511

Please sign in to comment.