Skip to content

Commit

Permalink
Fix #190
Browse files Browse the repository at this point in the history
  • Loading branch information
richardwilly98 committed Dec 23, 2013
1 parent 1d328a5 commit 7a546ad
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 12 deletions.
44 changes: 44 additions & 0 deletions manual-testing/issues/190/02-map-reduce.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
var status = rs.status()
for (i=0; i < status.members.length; i++) {
if (status.members[i].state == 1) {
db = connect(status.members[i].name + "/local")
}
}

function getRandomInt (min, max) {
return Math.floor(Math.random() * (max - min + 1)) + min;
}

use mydb1900

for (var i=0; i < 20; i++) {
var orderA =
{
"name": "orderA-" + i,
"cust_id": getRandomInt(0, 20),
"amount": getRandomInt(1, 500),
"status": "A"
}
db.orders.save(orderA)
var orderB =
{
"name": "orderB-" + i,
"cust_id": getRandomInt(0, 20),
"amount": getRandomInt(1, 500),
"status": "B"
}
db.orders.save(orderB)
}

db.orders.mapReduce(
function() {
emit(this.cust_id, this.amount);
},
function (key, values) {
return Array.sum( values )
},
{
"query": {"status": "A"},
"out": "order_totals"
}
)
6 changes: 6 additions & 0 deletions manual-testing/issues/190/02_test-issue-190.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
%MONGO_HOME%\bin\mongo < 02-map-reduce.js
pause
rem curl -XPOST "localhost:9200/authors/author/_search?pretty=true" -d @_03-find-book-parent-query.json
pause
rem curl -XPOST "localhost:9200/authors/book/_search?pretty=true" -d @_03-find-chapter-parent-query.json
pause
32 changes: 29 additions & 3 deletions src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
Expand All @@ -53,7 +54,7 @@
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
import org.elasticsearch.script.ScriptService;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCursor;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String OPLOG_COLLECTION = "oplog.rs";
public final static String OPLOG_NAMESPACE = "ns";
public final static String OPLOG_NAMESPACE_COMMAND = "$cmd";
public final static String OPLOG_ADMIN_COMMAND = "admin." + OPLOG_NAMESPACE_COMMAND;
public final static String OPLOG_OBJECT = "o";
public final static String OPLOG_UPDATE = "o2";
public final static String OPLOG_OPERATION = "op";
Expand All @@ -100,6 +102,8 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
public final static String OPLOG_COMMAND_OPERATION = "c";
public final static String OPLOG_DROP_COMMAND_OPERATION = "drop";
public final static String OPLOG_DROP_DATABASE_COMMAND_OPERATION = "dropDatabase";
public final static String OPLOG_RENAME_COLLECTION_COMMAND_OPERATION = "renameCollection";
public final static String OPLOG_TO = "to";
public final static String OPLOG_TIMESTAMP = "ts";
public final static String OPLOG_FROM_MIGRATE = "fromMigrate";
public final static String GRIDFS_FILES_SUFFIX = ".files";
Expand Down Expand Up @@ -213,6 +217,7 @@ public void start() {
cursor.close();
}
} else {
logger.trace("Not mongos");
Thread tailerThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_slurper").newThread(
new Slurper(definition.getMongoServers(), definition, context, client));
tailerThreads.add(tailerThread);
Expand Down Expand Up @@ -243,7 +248,26 @@ private boolean isMongos() {
if (adminDb == null) {
return false;
}
CommandResult cr = adminDb.command(new BasicDBObject("serverStatus", 1));
logger.trace("Found {} database", MONGODB_ADMIN_DATABASE);
// CommandResult cr = adminDb.command(new BasicDBObject("serverStatus",
// 1));
DBObject command = BasicDBObjectBuilder.start(
ImmutableMap.builder().put("serverStatus", 1).put("asserts", 0).put("backgroundFlushing", 0).put("connections", 0)
.put("cursors", 0).put("dur", 0).put("extra_info", 0).put("globalLock", 0).put("indexCounters", 0).put("locks", 0)
.put("metrics", 0).put("network", 0).put("opcounters", 0).put("opcountersRepl", 0).put("recordStats", 0)
.put("repl", 0).build()).get();
logger.trace("About to execute: {}", command);
CommandResult cr = adminDb.command(command);
logger.trace("Command executed return : {}", cr);
// BasicDBObjectBuilder.start(ImmutableMap.of("serverStatus", 1,
// "asserts", 0, "backgroundFlushing", 0, "metrics", 0, "connections",
// 0, "cursors", 0, "dur", 0, "extra_info", 0, "globalLock", 0,
// "indexCounters", 0, "locks", 0, "network", 0, "opcounters", 0,
// "opcountersRepl", 0, "recordStats", 0, "repl", 0)).get();
// replica1:PRIMARY> db.runCommand({serverStatus: 1, asserts: 0,
// backgroundFlushing: 0, metrics: 0, connections: 0, cursors: 0, dur:
// 0, extra_info: 0, globalLock: 0, indexCounters:0, locks: 0, network:
// 0, opcounters:0, opcountersRepl: 0, recordStats: 0, repl: 0})

logger.info("MongoDB version - {}", cr.get("version"));
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -271,7 +295,7 @@ private DB getAdminDb() {
if (adminDb == null) {
adminDb = getMongoClient().getDB(MONGODB_ADMIN_DATABASE);
if (logger.isTraceEnabled()) {
logger.trace("MongoAdminUser: {} - isAuthenticated: {}", definition.getMongoAdminUser(), adminDb.isAuthenticated());
logger.trace("MongoAdminUser: {} - authenticated: {}", definition.getMongoAdminUser(), adminDb.isAuthenticated());
}
if (!definition.getMongoAdminUser().isEmpty() && !definition.getMongoAdminPassword().isEmpty() && !adminDb.isAuthenticated()) {
logger.info("Authenticate {} with {}", MONGODB_ADMIN_DATABASE, definition.getMongoAdminUser());
Expand All @@ -281,6 +305,8 @@ private DB getAdminDb() {
.toCharArray());
if (!cmd.ok()) {
logger.error("Autenticatication failed for {}: {}", MONGODB_ADMIN_DATABASE, cmd.getErrorMessage());
} else {
logger.trace("authenticateCommand: {} - isAuthenticated: {}", cmd, adminDb.isAuthenticated());
}
} catch (MongoException mEx) {
logger.warn("getAdminDb() failed", mEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public enum Operation {
INSERT(MongoDBRiver.OPLOG_INSERT_OPERATION), UPDATE(MongoDBRiver.OPLOG_UPDATE_OPERATION), DELETE(MongoDBRiver.OPLOG_DELETE_OPERATION), DROP_COLLECTION(
"dc"), DROP_DATABASE("dd"), UNKNOWN(null);
"dc"), DROP_DATABASE("dd"), COMMAND(MongoDBRiver.OPLOG_COMMAND_OPERATION), UNKNOWN(null);

private String value;

Expand Down
62 changes: 54 additions & 8 deletions src/main/java/org/elasticsearch/river/mongodb/Slurper.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.bson.types.BSONTimestamp;
import org.bson.types.ObjectId;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.base.CharMatcher;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.logging.ESLogger;
Expand Down Expand Up @@ -86,7 +87,7 @@ public void run() {
break;
}
if (definition.isImportAllCollections()) {
for(String name : slurpedDb.getCollectionNames()) {
for (String name : slurpedDb.getCollectionNames()) {
DBCollection collection = slurpedDb.getCollection(name);
startTimestamp = doInitialImport(collection);
}
Expand Down Expand Up @@ -164,7 +165,8 @@ protected boolean isIndexEmpty() {
*/
protected BSONTimestamp doInitialImport(DBCollection collection) throws InterruptedException {
// TODO: ensure the index type is empty
// DBCollection slurpedCollection = slurpedDb.getCollection(definition.getMongoCollection());
// DBCollection slurpedCollection =
// slurpedDb.getCollection(definition.getMongoCollection());

logger.info("MongoDBRiver is beginning initial import of " + collection.getFullName());
BSONTimestamp startTimestamp = getCurrentOplogTimestamp();
Expand All @@ -174,17 +176,17 @@ protected BSONTimestamp doInitialImport(DBCollection collection) throws Interrup
updateIndexRefresh(definition.getIndexName(), -1L);
}
if (!definition.isMongoGridFS()) {
logger.info("Collection {} - count: {}", definition.getMongoCollection(), collection.count());
logger.info("Collection {} - count: {}", collection.getName(), collection.count());
long count = 0;
cursor = collection.find(definition.getMongoCollectionFilter());
while (cursor.hasNext()) {
DBObject object = cursor.next();
count++;
if (cursor.hasNext()) {
addInsertToStream(null, applyFieldFilter(object));
addInsertToStream(null, applyFieldFilter(object), collection.getName());
} else {
logger.debug("Last entry for initial import - add timestamp: {}", startTimestamp);
addInsertToStream(startTimestamp, applyFieldFilter(object));
addInsertToStream(startTimestamp, applyFieldFilter(object), collection.getName());
}
}
logger.info("Number documents indexed: {}", count);
Expand Down Expand Up @@ -314,8 +316,8 @@ private void processOplogEntry(final DBObject entry, final BSONTimestamp startTi
DBObject object = (DBObject) entry.get(MongoDBRiver.OPLOG_OBJECT);

if (definition.isImportAllCollections()) {
if (!namespace.equals(cmdOplogNamespace)) {
collection = namespace.substring(definition.getMongoDb().length() + 1);
if (namespace.startsWith(definition.getMongoDb()) && !namespace.equals(cmdOplogNamespace)) {
collection = getCollectionFromNamespace(namespace);
}
} else {
collection = definition.getMongoCollection();
Expand All @@ -326,13 +328,24 @@ private void processOplogEntry(final DBObject entry, final BSONTimestamp startTi
operation = Operation.DROP_COLLECTION;
if (definition.isImportAllCollections()) {
collection = object.get(MongoDBRiver.OPLOG_DROP_COMMAND_OPERATION).toString();
if (collection.startsWith("tmp.mr.")) {
return;
}
}
}
if (object.containsField(MongoDBRiver.OPLOG_DROP_DATABASE_COMMAND_OPERATION)) {
operation = Operation.DROP_DATABASE;
}
}

logger.trace("namespace: {} - operation: {}", namespace, operation);
if (namespace.equals(MongoDBRiver.OPLOG_ADMIN_COMMAND)) {
if (operation == Operation.COMMAND) {
processAdminCommandOplogEntry(entry, startTimestamp);
return;
}
}

if (logger.isTraceEnabled()) {
logger.trace("MongoDB object deserialized: {}", object.toString());
}
Expand Down Expand Up @@ -376,6 +389,29 @@ private void processOplogEntry(final DBObject entry, final BSONTimestamp startTi
}
}

private void processAdminCommandOplogEntry(final DBObject entry, final BSONTimestamp startTimestamp) throws InterruptedException {
logger.debug("processAdminCommandOplogEntry - [{}]", entry);
DBObject object = (DBObject) entry.get(MongoDBRiver.OPLOG_OBJECT);
if (definition.isImportAllCollections()) {
if (object.containsField(MongoDBRiver.OPLOG_RENAME_COLLECTION_COMMAND_OPERATION) && object.containsField(MongoDBRiver.OPLOG_TO)) {
String to = object.get(MongoDBRiver.OPLOG_TO).toString();
if (to.startsWith(definition.getMongoDb())) {
String newCollection = getCollectionFromNamespace(to);
DBCollection coll = slurpedDb.getCollection(newCollection);
doInitialImport(coll);
}
}
}
}

private String getCollectionFromNamespace(String namespace) {
if (namespace.startsWith(definition.getMongoDb()) && CharMatcher.is('.').countIn(namespace) == 1) {
return namespace.substring(definition.getMongoDb().length() + 1);
}
logger.info("Cannot get collection from namespace [{}]", namespace);
return null;
}

private boolean isValidOplogEntry(final DBObject entry, final BSONTimestamp startTimestamp) {
String namespace = (String) entry.get(MongoDBRiver.OPLOG_NAMESPACE);
// Initial support for sharded collection -
Expand All @@ -402,7 +438,8 @@ private boolean isValidOplogEntry(final DBObject entry, final BSONTimestamp star
validNamespace = gridfsOplogNamespace.equals(namespace);
} else {
if (definition.isImportAllCollections()) {
if (namespace.startsWith(definition.getMongoDb())) {
// Skip temp entry generated by map / reduce
if (namespace.startsWith(definition.getMongoDb()) && !namespace.startsWith(definition.getMongoDb() + ".tmp.mr")) {
validNamespace = true;
}
} else {
Expand All @@ -413,6 +450,10 @@ private boolean isValidOplogEntry(final DBObject entry, final BSONTimestamp star
if (cmdOplogNamespace.equals(namespace)) {
validNamespace = true;
}

if (MongoDBRiver.OPLOG_ADMIN_COMMAND.equals(namespace)) {
validNamespace = true;
}
}
if (!validNamespace) {
return false;
Expand Down Expand Up @@ -533,6 +574,11 @@ private void addInsertToStream(final BSONTimestamp currentTimestamp, final DBObj
addToStream(Operation.INSERT, currentTimestamp, data, definition.getMongoCollection());
}

private void addInsertToStream(final BSONTimestamp currentTimestamp, final DBObject data, final String collection)
throws InterruptedException {
addToStream(Operation.INSERT, currentTimestamp, data, collection);
}

private void addToStream(final Operation operation, final BSONTimestamp currentTimestamp, final DBObject data, final String collection)
throws InterruptedException {
if (logger.isDebugEnabled()) {
Expand Down
Loading

0 comments on commit 7a546ad

Please sign in to comment.