Skip to content

Commit

Permalink
WIP: fix transaction bug
Browse files Browse the repository at this point in the history
  • Loading branch information
jshcrowthe committed Jun 16, 2017
1 parent 6935085 commit bd03a98
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 50 deletions.
2 changes: 2 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { Reference } from "./database/api/Reference";
import { enableLogging } from "./database/core/util/util";
import { RepoManager } from "./database/core/RepoManager";
import * as INTERNAL from './database/api/internal';
import * as TEST_ACCESS from './database/api/test_access';

export function registerDatabase(instance) {
// Register the Database Service with the 'firebase' namespace.
Expand All @@ -36,6 +37,7 @@ export function registerDatabase(instance) {
enableLogging,
INTERNAL,
ServerValue: Database.ServerValue,
TEST_ACCESS
}
);
}
Expand Down
45 changes: 18 additions & 27 deletions src/database/core/CompoundWrite.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ImmutableTree } from "./util/ImmutableTree";
import { Path } from "./util/Path";
import { forEach } from "../../utils/obj";
import { NamedNode } from "./snap/Node";
import { Node, NamedNode } from "./snap/Node";
import { PRIORITY_INDEX } from "./snap/indexes/PriorityIndex";
import { assert } from "../../utils/assert";

Expand All @@ -15,32 +15,25 @@ import { assert } from "../../utils/assert";
* @param {!ImmutableTree.<!Node>} writeTree
*/
export class CompoundWrite {
writeTree_;
constructor(writeTree) {
/**
* @type {!ImmutableTree.<!Node>}
* @private
*/
this.writeTree_ = writeTree;
};
constructor(private writeTree_: ImmutableTree) {};
/**
* @type {!CompoundWrite}
*/
static Empty = new CompoundWrite(
/** @type {!ImmutableTree.<!Node>} */ (new ImmutableTree(null))
);
static Empty = new CompoundWrite(new ImmutableTree(null));

/**
* @param {!Path} path
* @param {!Node} node
* @return {!CompoundWrite}
*/
addWrite(path, node) {
addWrite(path: Path, node: Node): CompoundWrite {
if (path.isEmpty()) {
return new CompoundWrite(new ImmutableTree(node));
} else {
var rootmost = this.writeTree_.findRootMostValueAndPath(path);
if (rootmost != null) {
var rootMostPath = rootmost.path, value = rootmost.value;
var rootMostPath = rootmost.path
var value = rootmost.value;
var relativePath = Path.relativePath(rootMostPath, path);
value = value.updateChild(relativePath, node);
return new CompoundWrite(this.writeTree_.set(rootMostPath, value));
Expand All @@ -57,8 +50,8 @@ export class CompoundWrite {
* @param {!Object.<string, !Node>} updates
* @return {!CompoundWrite}
*/
addWrites(path, updates) {
var newWrite = <any>this;
addWrites(path: Path, updates: { [name: string]: Node }): CompoundWrite {
var newWrite = <CompoundWrite>this;
forEach(updates, function(childKey, node) {
newWrite = newWrite.addWrite(path.child(childKey), node);
});
Expand All @@ -72,7 +65,7 @@ export class CompoundWrite {
* @param {!Path} path The path at which a write and all deeper writes should be removed
* @return {!CompoundWrite} The new CompoundWrite with the removed path
*/
removeWrite(path) {
removeWrite(path: Path): CompoundWrite {
if (path.isEmpty()) {
return CompoundWrite.Empty;
} else {
Expand All @@ -88,7 +81,7 @@ export class CompoundWrite {
* @param {!Path} path The path to check for
* @return {boolean} Whether there is a complete write at that path
*/
hasCompleteWrite(path) {
hasCompleteWrite(path: Path): boolean {
return this.getCompleteNode(path) != null;
};

Expand All @@ -99,11 +92,10 @@ export class CompoundWrite {
* @param {!Path} path The path to get a complete write
* @return {?Node} The node if complete at that path, or null otherwise.
*/
getCompleteNode(path) {
getCompleteNode(path: Path): Node {
var rootmost = this.writeTree_.findRootMostValueAndPath(path);
if (rootmost != null) {
return this.writeTree_.get(rootmost.path).getChild(
Path.relativePath(rootmost.path, path));
return this.writeTree_.get(rootmost.path).getChild(Path.relativePath(rootmost.path, path));
} else {
return null;
}
Expand All @@ -114,7 +106,7 @@ export class CompoundWrite {
*
* @return {!Array.<NamedNode>} A list of all complete children.
*/
getCompleteChildren() {
getCompleteChildren(): Array<NamedNode> {
var children = [];
var node = this.writeTree_.value;
if (node != null) {
Expand All @@ -139,7 +131,7 @@ export class CompoundWrite {
* @param {!Path} path
* @return {!CompoundWrite}
*/
childCompoundWrite(path) {
childCompoundWrite(path: Path) {
if (path.isEmpty()) {
return this;
} else {
Expand All @@ -166,7 +158,7 @@ export class CompoundWrite {
* @param {!Node} node The node to apply this CompoundWrite to
* @return {!Node} The node with all writes applied
*/
apply(node) {
apply(node: Node) {
return CompoundWrite.applySubtreeWrite_(Path.Empty, this.writeTree_, node);
};

Expand All @@ -177,7 +169,7 @@ export class CompoundWrite {
* @return {!Node}
* @private
*/
static applySubtreeWrite_ = function(relativePath, writeTree, node) {
static applySubtreeWrite_ = function(relativePath: Path, writeTree: ImmutableTree, node: Node) {
if (writeTree.value != null) {
// Since there a write is always a leaf, we're done here
return node.updateChild(relativePath, writeTree.value);
Expand All @@ -195,8 +187,7 @@ export class CompoundWrite {
});
// If there was a priority write, we only apply it if the node is not empty
if (!node.getChild(relativePath).isEmpty() && priorityWrite !== null) {
node = node.updateChild(relativePath.child('.priority'),
/** @type {!Node} */ (priorityWrite));
node = node.updateChild(relativePath.child('.priority'), priorityWrite);
}
return node;
}
Expand Down
62 changes: 40 additions & 22 deletions src/database/core/Repo_transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DataSnapshot } from "../api/DataSnapshot";
import { Path } from "./util/Path";
import { Tree } from "./util/Tree";
import { PRIORITY_INDEX } from "./snap/indexes/PriorityIndex";
import { Node } from "./snap/Node";
import {
LUIDGenerator,
warn,
Expand Down Expand Up @@ -94,6 +95,22 @@ declare module './Repo' {
}
}

type Transaction = {
path: Path,
update: Function,
onComplete: Function,
status: number,
order: number,
applyLocally: boolean,
retryCount: number,
unwatcher: Function,
abortReason: any,
currentWriteId: any,
currentInputSnapshot: any,
currentOutputSnapshotRaw: any,
currentOutputSnapshotResolved: any
}

/**
* Creates a new transaction, adds it to the transactions we're tracking, and sends it to the server if possible.
*
Expand All @@ -102,7 +119,10 @@ declare module './Repo' {
* @param {?function(?Error, boolean, ?DataSnapshot)} onComplete Completion callback.
* @param {boolean} applyLocally Whether or not to make intermediate results visible
*/
(Repo.prototype as any).startTransaction = function(path, transactionUpdate, onComplete, applyLocally) {
(Repo.prototype as any).startTransaction = function(path: Path,
transactionUpdate: () => any,
onComplete: (Error, boolean, DataSnapshot) => any,
applyLocally: boolean) {
this.log_('transaction on ' + path);

// Add a watch to make sure we get server updates.
Expand All @@ -112,7 +132,7 @@ declare module './Repo' {
var unwatcher = function() { watchRef.off('value', valueCallback); };

// Initialize transaction.
var transaction = /** @type {Transaction} */ ({
var transaction: Transaction = {
path: path,
update: transactionUpdate,
onComplete: onComplete,
Expand Down Expand Up @@ -142,7 +162,7 @@ declare module './Repo' {
currentOutputSnapshotRaw: null,

currentOutputSnapshotResolved: null
});
};


// Run transaction initially.
Expand All @@ -156,9 +176,8 @@ declare module './Repo' {
transaction.currentOutputSnapshotResolved = null;
if (transaction.onComplete) {
// We just set the input snapshot, so this cast should be safe
var snapshot = new DataSnapshot(/** @type {!Node} */ (transaction.currentInputSnapshot),
new Reference(this, transaction.path), PRIORITY_INDEX);
transaction.onComplete(/*error=*/null, /*committed=*/false, snapshot);
var snapshot = new DataSnapshot(transaction.currentInputSnapshot, new Reference(this, transaction.path), PRIORITY_INDEX);
transaction.onComplete(null, false, snapshot);
}
} else {
validateFirebaseData('transaction failed: Data returned ', newVal, transaction.path);
Expand Down Expand Up @@ -192,8 +211,7 @@ declare module './Repo' {
transaction.currentOutputSnapshotResolved = newNode;
transaction.currentWriteId = this.getNextWriteId_();

var events = this.serverSyncTree_.applyUserOverwrite(path, newNode, transaction.currentWriteId,
transaction.applyLocally);
var events = this.serverSyncTree_.applyUserOverwrite(path, newNode, transaction.currentWriteId, transaction.applyLocally);
this.eventQueue_.raiseEventsForChangedPath(path, events);

this.sendReadyTransactions_();
Expand All @@ -206,7 +224,7 @@ declare module './Repo' {
* @return {Node}
* @private
*/
(Repo.prototype as any).getLatestState_ = function(path, excludeSets) {
(Repo.prototype as any).getLatestState_ = function(path: Path, excludeSets: [number]): Node {
return this.serverSyncTree_.calcCompleteEventCache(path, excludeSets) || ChildrenNode.EMPTY_NODE;
};

Expand All @@ -221,11 +239,11 @@ declare module './Repo' {
* @param {Tree.<Array.<Transaction>>=} opt_node transactionQueueTree node to start at.
* @private
*/
(Repo.prototype as any).sendReadyTransactions_ = function(opt_node) {
var node = /** @type {!Tree.<Array.<!Transaction>>} */ (opt_node || this.transactionQueueTree_);
(Repo.prototype as any).sendReadyTransactions_ = function(node?) {
var node = /** @type {!Tree.<Array.<!Transaction>>} */ (node || this.transactionQueueTree_);

// Before recursing, make sure any completed transactions are removed.
if (!opt_node) {
if (!node) {
this.pruneCompletedTransactionsBelowNode_(node);
}

Expand Down Expand Up @@ -257,7 +275,7 @@ declare module './Repo' {
* @param {!Array.<Transaction>} queue Queue of transactions under the specified location.
* @private
*/
(Repo.prototype as any).sendTransactionQueue_ = function(path, queue) {
(Repo.prototype as any).sendTransactionQueue_ = function(path: Path, queue: Array<Transaction>) {
// Mark transactions as sent and increment retry count!
var setsToIgnore = queue.map(function(txn) { return txn.currentWriteId; });
var latestState = this.getLatestState_(path, setsToIgnore);
Expand Down Expand Up @@ -345,7 +363,7 @@ declare module './Repo' {
* @return {!Path} The rootmost path that was affected by rerunning transactions.
* @private
*/
(Repo.prototype as any).rerunTransactions_ = function(changedPath) {
(Repo.prototype as any).rerunTransactions_ = function(changedPath: Path) {
var rootMostTransactionNode = this.getAncestorTransactionNode_(changedPath);
var path = rootMostTransactionNode.path();

Expand All @@ -363,7 +381,7 @@ declare module './Repo' {
* @param {!Path} path The path the queue is for.
* @private
*/
(Repo.prototype as any).rerunTransactionQueue_ = function(queue, path) {
(Repo.prototype as any).rerunTransactionQueue_ = function(queue: Array<Transaction>, path: Path) {
if (queue.length === 0) {
return; // Nothing to do!
}
Expand Down Expand Up @@ -413,7 +431,7 @@ declare module './Repo' {
transaction.currentOutputSnapshotResolved = newNodeResolved;
transaction.currentWriteId = this.getNextWriteId_();
// Mutates setsToIgnore in place
setsToIgnore.splice(setsToIgnore.indexOf(oldWriteId));
setsToIgnore.splice(setsToIgnore.indexOf(oldWriteId), 1);
events = events.concat(
this.serverSyncTree_.applyUserOverwrite(transaction.path, newNodeResolved, transaction.currentWriteId,
transaction.applyLocally)
Expand Down Expand Up @@ -473,7 +491,7 @@ declare module './Repo' {
* @return {!Tree.<Array.<!Transaction>>} The rootmost node with a transaction.
* @private
*/
(Repo.prototype as any).getAncestorTransactionNode_ = function(path) {
(Repo.prototype as any).getAncestorTransactionNode_ = function(path: Path): Tree {
var front;

// Start at the root and walk deeper into the tree towards path until we find a node with pending transactions.
Expand All @@ -494,7 +512,7 @@ declare module './Repo' {
* @return {Array.<Transaction>} The generated queue.
* @private
*/
(Repo.prototype as any).buildTransactionQueue_ = function(transactionNode) {
(Repo.prototype as any).buildTransactionQueue_ = function(transactionNode: Tree): Array<Transaction> {
// Walk any child transaction queues and aggregate them into a single queue.
var transactionQueue = [];
this.aggregateTransactionQueuesForNode_(transactionNode, transactionQueue);
Expand All @@ -510,7 +528,7 @@ declare module './Repo' {
* @param {Array.<Transaction>} queue
* @private
*/
(Repo.prototype as any).aggregateTransactionQueuesForNode_ = function(node, queue) {
(Repo.prototype as any).aggregateTransactionQueuesForNode_ = function(node: Tree, queue: Array<Transaction>) {
var nodeQueue = node.getValue();
if (nodeQueue !== null) {
for (var i = 0; i < nodeQueue.length; i++) {
Expand All @@ -531,7 +549,7 @@ declare module './Repo' {
* @param {!Tree.<Array.<!Transaction>>} node
* @private
*/
(Repo.prototype as any).pruneCompletedTransactionsBelowNode_ = function(node) {
(Repo.prototype as any).pruneCompletedTransactionsBelowNode_ = function(node: Tree) {
var queue = node.getValue();
if (queue) {
var to = 0;
Expand Down Expand Up @@ -560,7 +578,7 @@ declare module './Repo' {
* @return {!Path}
* @private
*/
(Repo.prototype as any).abortTransactions_ = function(path) {
(Repo.prototype as any).abortTransactions_ = function(path: Path) {
var affectedPath = this.getAncestorTransactionNode_(path).path();

var transactionNode = this.transactionQueueTree_.subTree(path);
Expand All @@ -586,7 +604,7 @@ declare module './Repo' {
* @param {!Tree.<Array.<Transaction>>} node Node to abort transactions for.
* @private
*/
(Repo.prototype as any).abortTransactionsOnNode_ = function(node) {
(Repo.prototype as any).abortTransactionsOnNode_ = function(node: Tree) {
var queue = node.getValue();
if (queue !== null) {

Expand Down
2 changes: 1 addition & 1 deletion src/database/core/snap/nodeFromJSON.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function nodeFromJSON(json, priority?) {
return ChildrenNode.EMPTY_NODE;
}

priority = priority || null;
priority = priority !== undefined ? priority : null;
if (typeof json === 'object' && '.priority' in json) {
priority = json['.priority'];
}
Expand Down

0 comments on commit bd03a98

Please sign in to comment.