Skip to content

Commit

Permalink
Implement IndexedDB LRU Reference Delegate, add LRU tests (#1224)
Browse files Browse the repository at this point in the history
Implement IndexedDB LRU Reference Delegate, add LRU tests
  • Loading branch information
Greg Soltis authored Sep 19, 2018
1 parent 03a78c7 commit eaeeb2a
Show file tree
Hide file tree
Showing 8 changed files with 1,451 additions and 32 deletions.
55 changes: 42 additions & 13 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,19 +525,7 @@ export class IndexedDbMutationQueue implements MutationQueue {
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<boolean> {
const indexKey = DbDocumentMutation.prefixForPath(this.userId, key.path);
const encodedPath = indexKey[1];
const startRange = IDBKeyRange.lowerBound(indexKey);
let containsKey = false;
return documentMutationsStore(txn)
.iterate({ range: startRange, keysOnly: true }, (key, value, control) => {
const [userID, keyPath, /*batchID*/ _] = key;
if (userID === this.userId && keyPath === encodedPath) {
containsKey = true;
}
control.done();
})
.next(() => containsKey);
return mutationQueueContainsKey(txn, this.userId, key);
}

// PORTING NOTE: Multi-tab only (state is held in memory in other clients).
Expand All @@ -560,6 +548,47 @@ export class IndexedDbMutationQueue implements MutationQueue {
}
}

/**
* @return true if the mutation queue for the given user contains a pending mutation for the given key.
*/
function mutationQueueContainsKey(
txn: PersistenceTransaction,
userId: string,
key: DocumentKey
): PersistencePromise<boolean> {
const indexKey = DbDocumentMutation.prefixForPath(userId, key.path);
const encodedPath = indexKey[1];
const startRange = IDBKeyRange.lowerBound(indexKey);
let containsKey = false;
return documentMutationsStore(txn)
.iterate({ range: startRange, keysOnly: true }, (key, value, control) => {
const [userID, keyPath, /*batchID*/ _] = key;
if (userID === userId && keyPath === encodedPath) {
containsKey = true;
}
control.done();
})
.next(() => containsKey);
}

/** Returns true if any mutation queue contains the given document. */
export function mutationQueuesContainKey(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<boolean> {
let found = false;
return mutationQueuesStore(txn)
.iterateSerial(userId => {
return mutationQueueContainsKey(txn, userId, docKey).next(containsKey => {
if (containsKey) {
found = true;
}
return PersistencePromise.resolve(!containsKey);
});
})
.next(() => found);
}

/**
* Delete a mutation batch and the associated document mutations.
* @return A PersistencePromise of the document mutations that were removed.
Expand Down
250 changes: 243 additions & 7 deletions packages/firestore/src/local/indexeddb_persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import { assert, fail } from '../util/assert';
import { Code, FirestoreError } from '../util/error';
import * as log from '../util/log';

import { IndexedDbMutationQueue } from './indexeddb_mutation_queue';
import {
IndexedDbMutationQueue,
mutationQueuesContainKey
} from './indexeddb_mutation_queue';
import {
IndexedDbQueryCache,
getHighestListenSequenceNumber
getHighestListenSequenceNumber,
documentTargetStore
} from './indexeddb_query_cache';
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
import {
Expand All @@ -35,24 +39,35 @@ import {
DbPrimaryClientKey,
SCHEMA_VERSION,
DbTargetGlobal,
SchemaConverter
SchemaConverter,
DbTargetDocument
} from './indexeddb_schema';
import { LocalSerializer } from './local_serializer';
import { MutationQueue } from './mutation_queue';
import {
Persistence,
PersistenceTransaction,
PrimaryStateListener
PrimaryStateListener,
ReferenceDelegate
} from './persistence';
import { PersistencePromise } from './persistence_promise';
import { QueryCache } from './query_cache';
import { RemoteDocumentCache } from './remote_document_cache';
import { SimpleDb, SimpleDbStore, SimpleDbTransaction } from './simple_db';
import { Platform } from '../platform/platform';
import { AsyncQueue, TimerId } from '../util/async_queue';
import { ClientId } from './shared_client_state';
import { CancelablePromise } from '../util/promise';
import { ListenSequence, SequenceNumberSyncer } from '../core/listen_sequence';
import { ReferenceSet } from './reference_set';
import { QueryData } from './query_data';
import { DocumentKey } from '../model/document_key';
import { decode, encode, EncodedResourcePath } from './encoded_resource_path';
import {
ActiveTargets,
LruDelegate,
LruGarbageCollector
} from './lru_garbage_collector';
import { ListenSequenceNumber, TargetId } from '../core/types';

const LOG_TAG = 'IndexedDbPersistence';

Expand Down Expand Up @@ -247,6 +262,7 @@ export class IndexedDbPersistence implements Persistence {
private remoteDocumentCache: IndexedDbRemoteDocumentCache;
private readonly webStorage: Storage;
private listenSequence: ListenSequence;
readonly referenceDelegate: IndexedDbLruDelegate;

// Note that `multiClientParams` must be present to enable multi-client support while multi-tab
// is still experimental. When multi-client is switched to always on, `multiClientParams` will
Expand All @@ -265,11 +281,15 @@ export class IndexedDbPersistence implements Persistence {
UNSUPPORTED_PLATFORM_ERROR_MSG
);
}
this.referenceDelegate = new IndexedDbLruDelegate(this);
this.dbName = persistenceKey + IndexedDbPersistence.MAIN_DATABASE;
this.serializer = new LocalSerializer(serializer);
this.document = platform.document;
this.allowTabSynchronization = multiClientParams !== undefined;
this.queryCache = new IndexedDbQueryCache(this.serializer);
this.queryCache = new IndexedDbQueryCache(
this.referenceDelegate,
this.serializer
);
this.remoteDocumentCache = new IndexedDbRemoteDocumentCache(
this.serializer,
/*keepDocumentChangeLog=*/ this.allowTabSynchronization
Expand Down Expand Up @@ -694,7 +714,7 @@ export class IndexedDbPersistence implements Persistence {
return IndexedDbMutationQueue.forUser(user, this.serializer);
}

getQueryCache(): QueryCache {
getQueryCache(): IndexedDbQueryCache {
assert(
this.started,
'Cannot initialize QueryCache before persistence is started.'
Expand Down Expand Up @@ -1026,3 +1046,219 @@ function clientMetadataStore(
DbClientMetadata.store
);
}

/** Provides LRU functionality for IndexedDB persistence. */
export class IndexedDbLruDelegate implements ReferenceDelegate, LruDelegate {
private inMemoryPins: ReferenceSet | null;

readonly garbageCollector: LruGarbageCollector;

constructor(private readonly db: IndexedDbPersistence) {
this.garbageCollector = new LruGarbageCollector(this);
}

getTargetCount(txn: PersistenceTransaction): PersistencePromise<number> {
return this.db.getQueryCache().getQueryCount(txn);
}

forEachTarget(
txn: PersistenceTransaction,
f: (q: QueryData) => void
): PersistencePromise<void> {
return this.db.getQueryCache().forEachTarget(txn, f);
}

forEachOrphanedDocumentSequenceNumber(
txn: PersistenceTransaction,
f: (sequenceNumber: ListenSequenceNumber) => void
): PersistencePromise<void> {
return this.forEachOrphanedDocument(txn, (docKey, sequenceNumber) =>
f(sequenceNumber)
);
}

setInMemoryPins(inMemoryPins: ReferenceSet): void {
this.inMemoryPins = inMemoryPins;
}

addReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

removeReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

removeTargets(
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber,
activeTargetIds: ActiveTargets
): PersistencePromise<number> {
return this.db
.getQueryCache()
.removeTargets(txn, upperBound, activeTargetIds);
}

removeMutationReference(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

/**
* Returns true if anything would prevent this document from being garbage
* collected, given that the document in question is not present in any
* targets and has a sequence number less than or equal to the upper bound for
* the collection run.
*/
private isPinned(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<boolean> {
return this.inMemoryPins!.containsKey(txn, docKey).next(isPinned => {
if (isPinned) {
return PersistencePromise.resolve(true);
} else {
return mutationQueuesContainKey(txn, docKey);
}
});
}

removeOrphanedDocuments(
txn: PersistenceTransaction,
upperBound: ListenSequenceNumber
): PersistencePromise<number> {
let count = 0;
const promises: Array<PersistencePromise<void>> = [];
const iteration = this.forEachOrphanedDocument(
txn,
(docKey, sequenceNumber) => {
if (sequenceNumber <= upperBound) {
const p = this.isPinned(txn, docKey).next(isPinned => {
if (!isPinned) {
count++;
return this.removeOrphanedDocument(txn, docKey);
}
});
promises.push(p);
}
}
);
// Wait for iteration first to make sure we have a chance to add all of the
// removal promises to the array.
return iteration
.next(() => PersistencePromise.waitFor(promises))
.next(() => count);
}

/**
* Clears a document from the cache. The document is assumed to be orphaned, so target-document
* associations are not queried. We remove it from the remote document cache, as well as remove
* its sentinel row.
*/
private removeOrphanedDocument(
txn: PersistenceTransaction,
docKey: DocumentKey
): PersistencePromise<void> {
return PersistencePromise.waitFor([
documentTargetStore(txn).delete(sentinelKey(docKey)),
this.db.getRemoteDocumentCache().removeEntry(txn, docKey)
]);
}

removeTarget(
txn: PersistenceTransaction,
queryData: QueryData
): PersistencePromise<void> {
const updated = queryData.copy({
sequenceNumber: txn.currentSequenceNumber
});
return this.db.getQueryCache().updateQueryData(txn, updated);
}

updateLimboDocument(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return writeSentinelKey(txn, key);
}

/**
* Call provided function for each document in the cache that is 'orphaned'. Orphaned
* means not a part of any target, so the only entry in the target-document index for
* that document will be the sentinel row (targetId 0), which will also have the sequence
* number for the last time the document was accessed.
*/
private forEachOrphanedDocument(
txn: PersistenceTransaction,
f: (docKey: DocumentKey, sequenceNumber: ListenSequenceNumber) => void
): PersistencePromise<void> {
const store = documentTargetStore(txn);
let nextToReport: ListenSequenceNumber = ListenSequence.INVALID;
let nextPath: EncodedResourcePath;
return store
.iterate(
{
index: DbTargetDocument.documentTargetsIndex
},
([targetId, docKey], { path, sequenceNumber }) => {
if (targetId === 0) {
// if nextToReport is valid, report it, this is a new key so the
// last one must not be a member of any targets.
if (nextToReport !== ListenSequence.INVALID) {
f(new DocumentKey(decode(nextPath)), nextToReport);
}
// set nextToReport to be this sequence number. It's the next one we
// might report, if we don't find any targets for this document.
// Note that the sequence number must be defined when the targetId
// is 0.
nextToReport = sequenceNumber!;
nextPath = path;
} else {
// set nextToReport to be invalid, we know we don't need to report
// this one since we found a target for it.
nextToReport = ListenSequence.INVALID;
}
}
)
.next(() => {
// Since we report sequence numbers after getting to the next key, we
// need to check if the last key we iterated over was an orphaned
// document and report it.
if (nextToReport !== ListenSequence.INVALID) {
f(new DocumentKey(decode(nextPath)), nextToReport);
}
});
}
}

function sentinelKey(key: DocumentKey): [TargetId, EncodedResourcePath] {
return [0, encode(key.path)];
}

/**
* @return A value suitable for writing a sentinel row in the target-document
* store.
*/
function sentinelRow(
key: DocumentKey,
sequenceNumber: ListenSequenceNumber
): DbTargetDocument {
return new DbTargetDocument(0, encode(key.path), sequenceNumber);
}

function writeSentinelKey(
txn: PersistenceTransaction,
key: DocumentKey
): PersistencePromise<void> {
return documentTargetStore(txn).put(
sentinelRow(key, txn.currentSequenceNumber)
);
}
Loading

0 comments on commit eaeeb2a

Please sign in to comment.