From 6ac029e5b52271b91154ea5b908fab4dfe4a6598 Mon Sep 17 00:00:00 2001 From: Ben McCann Date: Mon, 30 Sep 2013 01:42:56 -0700 Subject: [PATCH 1/4] Add last timestamp to admin page --- .../rest/action/RestMongoDBRiverAction.java | 25 +++-- .../river/mongodb/MongoDBRiverDefinition.java | 13 ++- .../river/mongodb/MongoDBRiverModule.java | 6 +- src/site/index.html | 94 +++++++++---------- .../mongodb/MongoDBRiverDefinitionTest.java | 2 +- .../river/mongodb/SlurperTest.java | 2 +- 6 files changed, 75 insertions(+), 67 deletions(-) diff --git a/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java b/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java index bda60191..d5fa21c9 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java +++ b/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java @@ -20,19 +20,24 @@ import org.elasticsearch.rest.XContentRestResponse; import org.elasticsearch.rest.XContentThrowableRestResponse; import org.elasticsearch.rest.action.support.RestXContentBuilder; +import org.elasticsearch.river.RiverIndexName; +import org.elasticsearch.river.RiverSettings; import org.elasticsearch.river.mongodb.MongoDBRiver; +import org.elasticsearch.river.mongodb.MongoDBRiverDefinition; import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; import org.elasticsearch.search.SearchHit; public class RestMongoDBRiverAction extends BaseRestHandler { - private static final String BASE_URL = "/_river/" + MongoDBRiver.TYPE; + private final String riverIndexName; @Inject - public RestMongoDBRiverAction(Settings settings, Client client, RestController controller) { + public RestMongoDBRiverAction(Settings settings, Client client, RestController controller, @RiverIndexName String riverIndexName) { super(settings, client); - controller.registerHandler(RestRequest.Method.GET, BASE_URL + "/{action}", this); - controller.registerHandler(RestRequest.Method.POST, BASE_URL + "/{river}/{action}", this); + this.riverIndexName = riverIndexName; + String baseUrl = "/" + riverIndexName + "/" + MongoDBRiver.TYPE; + controller.registerHandler(RestRequest.Method.GET, baseUrl + "/{action}", this); + controller.registerHandler(RestRequest.Method.POST, baseUrl + "/{river}/{action}", this); } @Override @@ -120,16 +125,22 @@ private void errorResponse(RestRequest request, RestChannel channel, Throwable e } private List> getRivers() { - SearchResponse searchResponse = client.prepareSearch("_river").setQuery(new FieldQueryBuilder("type", "mongodb")).execute() - .actionGet(); + SearchResponse searchResponse = client.prepareSearch("_river") + .setQuery(new FieldQueryBuilder("type", "mongodb")) + .execute().actionGet(); long totalHits = searchResponse.getHits().totalHits(); logger.trace("totalHits: {}", totalHits); List> rivers = new ArrayList>(); for (SearchHit hit : searchResponse.getHits().hits()) { Map source = new HashMap(); - source.put("name", hit.getType()); + String riverName = hit.getType(); + RiverSettings riverSettings = new RiverSettings(null, hit.getSource()); + MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName, riverIndexName, riverSettings, null); + + source.put("name", riverName); source.put("enabled", MongoDBRiverHelper.isRiverEnabled(client, hit.getType())); source.put("settings", hit.getSource()); + source.put("lastTimestamp", MongoDBRiver.getLastTimestamp(client, definition)); logger.trace("source: {}", hit.getSourceAsString()); rivers.add(source); } diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java index 92691998..a5779f5f 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.support.XContentMapValues; -import org.elasticsearch.river.RiverName; import org.elasticsearch.river.RiverSettings; import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ScriptService; @@ -319,7 +318,7 @@ public MongoDBRiverDefinition build() { } @SuppressWarnings("unchecked") - public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverName, String riverIndexName, RiverSettings settings, + public synchronized static MongoDBRiverDefinition parseSettings(String riverName, String riverIndexName, RiverSettings settings, ScriptService scriptService) { Preconditions.checkNotNull(riverName, "No riverName specified"); @@ -327,7 +326,7 @@ public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverN Preconditions.checkNotNull(settings, "No settings specified"); Builder builder = new Builder(); - builder.riverName(riverName.name()); + builder.riverName(riverName); builder.riverIndexName(riverIndexName); List mongoServers = new ArrayList(); @@ -513,8 +512,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverN // mongoDbPassword = mdp; } - builder.mongoDb(XContentMapValues.nodeStringValue(mongoSettings.get(DB_FIELD), riverName.name())); - builder.mongoCollection(XContentMapValues.nodeStringValue(mongoSettings.get(COLLECTION_FIELD), riverName.name())); + builder.mongoDb(XContentMapValues.nodeStringValue(mongoSettings.get(DB_FIELD), riverName)); + builder.mongoCollection(XContentMapValues.nodeStringValue(mongoSettings.get(COLLECTION_FIELD), riverName)); builder.mongoGridFS(XContentMapValues.nodeBooleanValue(mongoSettings.get(GRIDFS_FIELD), false)); if (mongoSettings.containsKey(FILTER_FIELD)) { builder.mongoFilter(XContentMapValues.nodeStringValue(mongoSettings.get(FILTER_FIELD), "")); @@ -541,8 +540,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverN } catch (UnknownHostException e) { e.printStackTrace(); } - builder.mongoDb(riverName.name()); - builder.mongoCollection(riverName.name()); + builder.mongoDb(riverName); + builder.mongoCollection(riverName); } if (settings.settings().containsKey(INDEX_OBJECT)) { diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverModule.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverModule.java index 6f622662..ac00edd7 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverModule.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverModule.java @@ -40,9 +40,9 @@ protected void configure() { } @Provides - protected MongoDBRiverDefinition provideDefinition(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, - ScriptService scriptService) { - return MongoDBRiverDefinition.parseSettings(riverName, riverIndexName, settings, scriptService); + protected MongoDBRiverDefinition provideDefinition( + RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, ScriptService scriptService) { + return MongoDBRiverDefinition.parseSettings(riverName.name(), riverIndexName, settings, scriptService); } } diff --git a/src/site/index.html b/src/site/index.html index d1cc704f..1d428630 100644 --- a/src/site/index.html +++ b/src/site/index.html @@ -1,52 +1,50 @@ - - -MongoDB River Plugin for ElasticSearch - - - - - - - - -
-
-
-

MongoDB River Administration

- -
-
-
-
-

- {{ river.name }} Enabled Disabled -

-
-
-
Settings:

{{ toString(river.settings) }}
- Stop Start -
-
-
-
-
-
-
-
- - + --> + + + + + + diff --git a/src/test/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinitionTest.java b/src/test/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinitionTest.java index e8676370..a91d0836 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinitionTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinitionTest.java @@ -20,7 +20,7 @@ public void testLoadMongoDBRiverDefinition() { RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap( Streams.copyToByteArray(in), false).v2()); ScriptService scriptService = null; - MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName, "my-river-index", riverSettings, + MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), "my-river-index", riverSettings, scriptService); Assert.assertNotNull(definition); Assert.assertEquals("mycollection", definition.getIncludeCollection()); diff --git a/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java b/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java index 43980217..5fdc8fdf 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java @@ -107,7 +107,7 @@ private TestSlurper createSlurper() throws Exception { RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap( getSettingsString().getBytes(), false).v2()); RiverName riverName = new RiverName("mongodb", river); - MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName, index, riverSettings, null); + MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), index, riverSettings, null); return new TestSlurper(mongo.getServerAddressList(), definition, new SharedContext(null, true), getNode().client()); } From 4c9ce969c4ca59c33b3e69739adbfd740262b710 Mon Sep 17 00:00:00 2001 From: Ben McCann Date: Mon, 30 Sep 2013 10:35:30 -0700 Subject: [PATCH 2/4] Add the index count to the admin UI --- .../rest/action/RestMongoDBRiverAction.java | 3 ++- .../elasticsearch/river/mongodb/MongoDBRiver.java | 13 +++++++++++-- src/site/index.html | 1 + 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java b/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java index d5fa21c9..3d66b34a 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java +++ b/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java @@ -140,7 +140,8 @@ private List> getRivers() { source.put("name", riverName); source.put("enabled", MongoDBRiverHelper.isRiverEnabled(client, hit.getType())); source.put("settings", hit.getSource()); - source.put("lastTimestamp", MongoDBRiver.getLastTimestamp(client, definition)); + source.put("lastTimestamp", MongoDBRiver.getLastTimestamp(client, definition)); + source.put("indexCount", MongoDBRiver.getIndexCount(client, definition)); logger.trace("source: {}", hit.getSourceAsString()); rivers.add(source); } diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index d2773dce..f7284419 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -32,6 +32,7 @@ import org.bson.types.BSONTimestamp; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -351,8 +352,8 @@ private XContentBuilder getGridFSMapping() throws IOException { public static BSONTimestamp getLastTimestamp(Client client, MongoDBRiverDefinition definition) { GetResponse lastTimestampResponse = client - .prepareGet(definition.getRiverIndexName(), definition.getRiverName(), definition.getMongoOplogNamespace()).execute() - .actionGet(); + .prepareGet(definition.getRiverIndexName(), definition.getRiverName(), definition.getMongoOplogNamespace()) + .execute().actionGet(); // API changes since 0.90.0 lastTimestampResponse.exists() replaced by // lastTimestampResponse.isExists() @@ -396,6 +397,14 @@ static void updateLastTimestamp(final MongoDBRiverDefinition definition, final B } } + public static long getIndexCount(Client client, MongoDBRiverDefinition definition) { + CountResponse countResponse = client + .prepareCount(definition.getIndexName()) + .execute().actionGet(); + return countResponse.getCount(); + } + + protected static class QueueEntry { private final DBObject data; diff --git a/src/site/index.html b/src/site/index.html index 1d428630..c8aa39a9 100644 --- a/src/site/index.html +++ b/src/site/index.html @@ -29,6 +29,7 @@

Last Replicated - {{ river.lastTimestamp }}
+ Documents Indexed - {{ river.indexCount }}

Settings:

{{ toString(river.settings) }}
Stop From 39e3173296b4dd0fc35241c1b4b7099a5d2775ec Mon Sep 17 00:00:00 2001 From: Ben McCann Date: Mon, 30 Sep 2013 12:41:58 -0700 Subject: [PATCH 3/4] Make status an enum instead of a boolean --- .../rest/action/RestMongoDBRiverAction.java | 7 +-- .../elasticsearch/river/mongodb/Indexer.java | 2 +- .../river/mongodb/MongoDBRiver.java | 14 +++--- .../river/mongodb/SharedContext.java | 14 +++--- .../elasticsearch/river/mongodb/Slurper.java | 5 +- .../elasticsearch/river/mongodb/Status.java | 46 +++---------------- .../river/mongodb/StatusChecker.java | 42 +++++++++++++++++ .../mongodb/util/MongoDBRiverHelper.java | 26 +++++------ src/site/index.html | 11 +++-- src/site/scripts/app.js | 27 +++++------ .../river/mongodb/SlurperTest.java | 2 +- 11 files changed, 103 insertions(+), 93 deletions(-) create mode 100644 src/main/java/org/elasticsearch/river/mongodb/StatusChecker.java diff --git a/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java b/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java index 3d66b34a..872e982c 100644 --- a/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java +++ b/src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.river.RiverSettings; import org.elasticsearch.river.mongodb.MongoDBRiver; import org.elasticsearch.river.mongodb.MongoDBRiverDefinition; +import org.elasticsearch.river.mongodb.Status; import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; import org.elasticsearch.search.SearchHit; @@ -66,7 +67,7 @@ private void start(RestRequest request, RestChannel channel) { respondError(request, channel, "Parameter 'river' is required", RestStatus.BAD_REQUEST); return; } - MongoDBRiverHelper.setRiverEnabled(client, river, true); + MongoDBRiverHelper.setRiverStatus(client, river, Status.RUNNING); respondSuccess(request, channel, RestStatus.OK); } @@ -76,7 +77,7 @@ private void stop(RestRequest request, RestChannel channel) { respondError(request, channel, "Parameter 'river' is required", RestStatus.BAD_REQUEST); return; } - MongoDBRiverHelper.setRiverEnabled(client, river, false); + MongoDBRiverHelper.setRiverStatus(client, river, Status.STOPPED); respondSuccess(request, channel, RestStatus.OK); } @@ -138,7 +139,7 @@ private List> getRivers() { MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName, riverIndexName, riverSettings, null); source.put("name", riverName); - source.put("enabled", MongoDBRiverHelper.isRiverEnabled(client, hit.getType())); + source.put("status", MongoDBRiverHelper.getRiverStatus(client, hit.getType())); source.put("settings", hit.getSource()); source.put("lastTimestamp", MongoDBRiver.getLastTimestamp(client, definition)); source.put("indexCount", MongoDBRiver.getIndexCount(client, definition)); diff --git a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java index 026ac49c..e8c57ed1 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Indexer.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Indexer.java @@ -60,7 +60,7 @@ public Indexer(MongoDBRiverDefinition definition, SharedContext context, Client @Override public void run() { - while (context.isActive()) { + while (context.getStatus() == Status.RUNNING) { sw = new StopWatch().start(); deletedDocuments = 0; insertedDocuments = 0; diff --git a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java index f7284419..a968d513 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java +++ b/src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java @@ -75,8 +75,8 @@ public class MongoDBRiver extends AbstractRiverComponent implements River { public final static String TYPE = "mongodb"; public final static String NAME = "mongodb-river"; - public final static String STATUS = "_mongodbstatus"; - public final static String ENABLED = "enabled"; + public final static String STATUS_ID = "_riverstatus"; + public final static String STATUS_FIELD = "status"; public final static String DESCRIPTION = "MongoDB River Plugin"; public final static String LAST_TIMESTAMP_FIELD = "_last_ts"; public final static String MONGODB_LOCAL_DATABASE = "local"; @@ -131,21 +131,21 @@ public MongoDBRiver(RiverName riverName, RiverSettings settings, @RiverIndexName BlockingQueue stream = definition.getThrottleSize() == -1 ? new LinkedTransferQueue() : new ArrayBlockingQueue(definition.getThrottleSize()); - this.context = new SharedContext(stream, false); + this.context = new SharedContext(stream, Status.STOPPED); this.statusThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_status").newThread( - new Status(this, definition, context)); + new StatusChecker(this, definition, context)); this.statusThread.start(); } @Override public void start() { - if (!MongoDBRiverHelper.isRiverEnabled(client, riverName.getName())) { + if (MongoDBRiverHelper.getRiverStatus(client, riverName.getName()) == Status.STOPPED) { logger.debug("Cannot start river {}. It is currently disabled", riverName.getName()); startInvoked = true; return; } - this.context.setActive(true); + this.context.setStatus(Status.RUNNING); for (ServerAddress server : definition.getMongoServers()) { logger.info("Using mongodb server(s): host [{}], port [{}]", server.getHost(), server.getPort()); } @@ -331,7 +331,7 @@ public void close() { } catch (Throwable t) { logger.error("Fail to close river {}", t, riverName.getName()); } finally { - this.context.setActive(false); + this.context.setStatus(Status.STOPPED); } } diff --git a/src/main/java/org/elasticsearch/river/mongodb/SharedContext.java b/src/main/java/org/elasticsearch/river/mongodb/SharedContext.java index 9ecff346..51747a61 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/SharedContext.java +++ b/src/main/java/org/elasticsearch/river/mongodb/SharedContext.java @@ -10,11 +10,11 @@ public class SharedContext { private BlockingQueue stream; - private boolean active; + private Status status; - public SharedContext(BlockingQueue stream, boolean active) { + public SharedContext(BlockingQueue stream, Status status) { this.stream = stream; - this.active = active; + this.status = status; } public BlockingQueue getStream() { @@ -25,12 +25,12 @@ public void setStream(BlockingQueue stream) { this.stream = stream; } - public boolean isActive() { - return active; + public Status getStatus() { + return status; } - public void setActive(boolean active) { - this.active = active; + public void setStatus(Status status) { + this.status = status; } } diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java index 51914340..9c16b400 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java @@ -64,7 +64,7 @@ public Slurper(List mongoServers, MongoDBRiverDefinition definiti @Override public void run() { - while (context.isActive()) { + while (context.getStatus() == Status.RUNNING) { try { if (!assignCollections()) { break; // failed to assign oplogCollection or @@ -73,6 +73,9 @@ public void run() { BSONTimestamp startTimestamp = null; if (!riverHasIndexedSomething()) { + //if (MongoDBRiver.getIndexCount(client, definition) > 0) { + // break; + //} startTimestamp = doInitialImport(); } diff --git a/src/main/java/org/elasticsearch/river/mongodb/Status.java b/src/main/java/org/elasticsearch/river/mongodb/Status.java index 56c9c024..2d585f3e 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Status.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Status.java @@ -1,43 +1,9 @@ package org.elasticsearch.river.mongodb; -import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; +public enum Status { -class Status implements Runnable { - - private final MongoDBRiver mongoDBRiver; - private final MongoDBRiverDefinition definition; - private final SharedContext context; - - public Status(MongoDBRiver mongoDBRiver, MongoDBRiverDefinition definition, SharedContext context) { - this.mongoDBRiver = mongoDBRiver; - this.definition = definition; - this.context = context; - } - - @Override - public void run() { - while (true) { - try { - if (this.mongoDBRiver.startInvoked) { - boolean enabled = MongoDBRiverHelper.isRiverEnabled(this.mongoDBRiver.client, this.definition.getRiverName()); - - if (this.context.isActive() && !enabled) { - MongoDBRiver.logger.info("About to stop river: {}", this.definition.getRiverName()); - this.mongoDBRiver.close(); - } - - if (!this.context.isActive() && enabled) { - MongoDBRiver.logger.trace("About to start river: {}", this.definition.getRiverName()); - this.mongoDBRiver.start(); - } - } - Thread.sleep(1000L); - } catch (InterruptedException e) { - MongoDBRiver.logger.info("Status thread interrupted", e, (Object) null); - Thread.currentThread().interrupt(); - break; - } - - } - } -} \ No newline at end of file + RUNNING, + STOPPED, + INITIAL_IMPORT_FAILED; + +} diff --git a/src/main/java/org/elasticsearch/river/mongodb/StatusChecker.java b/src/main/java/org/elasticsearch/river/mongodb/StatusChecker.java new file mode 100644 index 00000000..bde090f6 --- /dev/null +++ b/src/main/java/org/elasticsearch/river/mongodb/StatusChecker.java @@ -0,0 +1,42 @@ +package org.elasticsearch.river.mongodb; + +import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; + +class StatusChecker implements Runnable { + + private final MongoDBRiver mongoDBRiver; + private final MongoDBRiverDefinition definition; + private final SharedContext context; + + public StatusChecker(MongoDBRiver mongoDBRiver, MongoDBRiverDefinition definition, SharedContext context) { + this.mongoDBRiver = mongoDBRiver; + this.definition = definition; + this.context = context; + } + + @Override + public void run() { + while (true) { + try { + if (this.mongoDBRiver.startInvoked) { + Status status = MongoDBRiverHelper.getRiverStatus(this.mongoDBRiver.client, this.definition.getRiverName()); + if (status != this.context.getStatus()) { + if (status == Status.RUNNING) { + MongoDBRiver.logger.info("About to stop river: {}", this.definition.getRiverName()); + this.mongoDBRiver.close(); + } else if (status == Status.STOPPED) { + MongoDBRiver.logger.trace("About to start river: {}", this.definition.getRiverName()); + this.mongoDBRiver.start(); + } + } + } + Thread.sleep(1000L); + } catch (InterruptedException e) { + MongoDBRiver.logger.info("Status thread interrupted", e, (Object) null); + Thread.currentThread().interrupt(); + break; + } + + } + } +} diff --git a/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java b/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java index e8cfe09b..db1a0cab 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java @@ -11,32 +11,28 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.river.mongodb.MongoDBRiver; +import org.elasticsearch.river.mongodb.Status; public abstract class MongoDBRiverHelper { private static final ESLogger logger = Loggers.getLogger(MongoDBRiverHelper.class); - public static boolean isRiverEnabled(Client client, String riverName) { - boolean enabled = true; - GetResponse getResponse = client.prepareGet("_river", riverName, MongoDBRiver.STATUS).execute().actionGet(); - - if (!getResponse.isExists()) { - setRiverEnabled(client, riverName, true); + public static Status getRiverStatus(Client client, String riverName) { + GetResponse statusResponse = client.prepareGet("_river", riverName, MongoDBRiver.STATUS_ID).execute().actionGet(); + if (!statusResponse.isExists()) { + setRiverStatus(client, riverName, Status.RUNNING); + return Status.RUNNING; } else { - Object obj = XContentMapValues.extractValue(MongoDBRiver.TYPE + "." + MongoDBRiver.ENABLED, getResponse.getSourceAsMap()); - if (obj != null) { - enabled = Boolean.parseBoolean(obj.toString()); - } + Object obj = XContentMapValues.extractValue(MongoDBRiver.TYPE + "." + MongoDBRiver.STATUS_FIELD, statusResponse.getSourceAsMap()); + return Status.valueOf(obj.toString()); } - // logger.trace("River {} enabled? {}", riverName, enabled); - return enabled; } - public static void setRiverEnabled(Client client, String riverName, boolean enabled) { + public static void setRiverStatus(Client client, String riverName, Status status) { XContentBuilder xb; try { - xb = jsonBuilder().startObject().startObject(MongoDBRiver.TYPE).field(MongoDBRiver.ENABLED, enabled).endObject().endObject(); - client.prepareIndex("_river", riverName, MongoDBRiver.STATUS).setSource(xb).execute().actionGet(); + xb = jsonBuilder().startObject().startObject(MongoDBRiver.TYPE).field(MongoDBRiver.STATUS_FIELD, status).endObject().endObject(); + client.prepareIndex("_river", riverName, MongoDBRiver.STATUS_ID).setSource(xb).execute().actionGet(); } catch (IOException ioEx) { logger.error("setRiverEnabled failed for river {}", ioEx, riverName); } diff --git a/src/site/index.html b/src/site/index.html index c8aa39a9..7ff134c7 100644 --- a/src/site/index.html +++ b/src/site/index.html @@ -21,10 +21,11 @@

MongoDB River Administration

-

+

{{ river.name }} - Enabled - Disabled + {{river.status}} + {{river.status}} + {{river.status}}

@@ -32,8 +33,8 @@

Documents Indexed - {{ river.indexCount }}

Settings:

{{ toString(river.settings) }}
- Stop - Start + Stop + Start

diff --git a/src/site/scripts/app.js b/src/site/scripts/app.js index 322abf45..14482b9b 100644 --- a/src/site/scripts/app.js +++ b/src/site/scripts/app.js @@ -3,43 +3,44 @@ var mongoDBRiverApp = angular.module('mongoDBRiverApp', ['ngResource', 'ui.bootstrap']); mongoDBRiverApp.controller('MainCtrl', function ($log, $scope, $resource) { - var riverResource = $resource('/_river/:type/:river/:action' , {type:'@type', river:'@river'}, { + var riverResource = $resource('/_river/:type/:river/:action' , {type:'@type', river:'@river'}, + { list: {method:'GET', params: {action: 'list'}, isArray: true}, start: {method:'POST', params: {action: 'start'}}, stop: {method:'POST', params: {action: 'stop'}} - }); + } + ); $scope.rivers = []; $scope.type = null; + $scope.list = function(type){ $log.log('list river type: ' + type); - $scope.type = type; - var rivers = riverResource.list({'type': type}, function() { + $scope.type = type || 'mongodb'; + var rivers = riverResource.list({'type': $scope.type}, function() { $log.log('rivers count: ' + rivers.length); $scope.rivers = rivers; }); }; + $scope.start = function(name){ $log.log('start: ' + name); riverResource.start({'type': $scope.type, 'river': name}, function(river, response) { - setRiverEnabled(name, true); + $scope.list(); }); }; + $scope.stop = function(name){ $log.log('stop: ' + name); riverResource.stop({'type': $scope.type, 'river': name}, function() { - setRiverEnabled(name, false); + $scope.list(); }); }; + $scope.toString = function(object){ var value = JSON.stringify(angular.copy(object), undefined, 2); return value; }; - function setRiverEnabled(name, enabled) { - var river = _.find($scope.rivers, {'name': name}); - if (river !== undefined) { - river.enabled = enabled; - } - } - $scope.list('mongodb'); + + $scope.list(); }); diff --git a/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java b/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java index 5fdc8fdf..7d9ec8aa 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java @@ -109,7 +109,7 @@ private TestSlurper createSlurper() throws Exception { RiverName riverName = new RiverName("mongodb", river); MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), index, riverSettings, null); - return new TestSlurper(mongo.getServerAddressList(), definition, new SharedContext(null, true), getNode().client()); + return new TestSlurper(mongo.getServerAddressList(), definition, new SharedContext(null, Status.RUNNING), getNode().client()); } private static class TestSlurper extends Slurper { From 256140a8cf60219f5ebae2380cb8a5a82ded5fb5 Mon Sep 17 00:00:00 2001 From: Ben McCann Date: Mon, 30 Sep 2013 20:05:57 -0700 Subject: [PATCH 4/4] Add a check to see if the index already exists before we begin the initial import --- .../elasticsearch/river/mongodb/Slurper.java | 18 ++- .../mongodb/util/MongoDBRiverHelper.java | 4 +- .../river/mongodb/SlurperTest.java | 134 ------------------ .../simple/RiverMongoIndexExistsTest.java | 114 +++++++++++++++ .../simple/RiverMongoInitialImportTest.java | 86 ++++++----- 5 files changed, 180 insertions(+), 176 deletions(-) delete mode 100644 src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java create mode 100644 src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoIndexExistsTest.java diff --git a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java index 9c16b400..10e7d24c 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/Slurper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/Slurper.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.river.mongodb.util.MongoDBHelper; +import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; import com.mongodb.BasicDBObject; import com.mongodb.Bytes; @@ -72,10 +73,11 @@ public void run() { } BSONTimestamp startTimestamp = null; - if (!riverHasIndexedSomething()) { - //if (MongoDBRiver.getIndexCount(client, definition) > 0) { - // break; - //} + if (!riverHasIndexedFromOplog()) { + if (!isIndexEmpty()) { + MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED); + break; + } startTimestamp = doInitialImport(); } @@ -116,15 +118,19 @@ public void run() { } } - protected boolean riverHasIndexedSomething() { + protected boolean riverHasIndexedFromOplog() { return MongoDBRiver.getLastTimestamp(client, definition) != null; } + protected boolean isIndexEmpty() { + return MongoDBRiver.getIndexCount(client, definition) == 0; + } + /** * Does an initial sync the same way MongoDB does. * https://groups.google.com/ * forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns - * + * * @return the last oplog timestamp before the import began * @throws InterruptedException * if the blocking queue stream is interrupted while waiting diff --git a/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java b/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java index db1a0cab..c65f3a15 100644 --- a/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java +++ b/src/main/java/org/elasticsearch/river/mongodb/util/MongoDBRiverHelper.java @@ -29,12 +29,14 @@ public static Status getRiverStatus(Client client, String riverName) { } public static void setRiverStatus(Client client, String riverName, Status status) { + logger.debug("setRiverStatus called with {}", status); XContentBuilder xb; try { xb = jsonBuilder().startObject().startObject(MongoDBRiver.TYPE).field(MongoDBRiver.STATUS_FIELD, status).endObject().endObject(); client.prepareIndex("_river", riverName, MongoDBRiver.STATUS_ID).setSource(xb).execute().actionGet(); } catch (IOException ioEx) { - logger.error("setRiverEnabled failed for river {}", ioEx, riverName); + logger.error("setRiverStatus failed for river {}", ioEx, riverName); } } + } diff --git a/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java b/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java deleted file mode 100644 index 7d9ec8aa..00000000 --- a/src/test/java/org/elasticsearch/river/mongodb/SlurperTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.river.mongodb; - -import java.util.List; - -import org.bson.types.BSONTimestamp; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.river.RiverName; -import org.elasticsearch.river.RiverSettings; -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.google.common.collect.ImmutableMap; -import com.mongodb.BasicDBObject; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBObject; -import com.mongodb.ServerAddress; -import com.mongodb.WriteConcern; -import com.mongodb.WriteResult; - -@Test -public class SlurperTest extends RiverMongoDBTestAbstract { - - private DB mongoDB; - private DBCollection mongoCollection; - - protected SlurperTest() { - super("testmongodb-" + System.currentTimeMillis(), "testriver-" + System.currentTimeMillis(), "person-" - + System.currentTimeMillis(), "personindex-" + System.currentTimeMillis()); - } - - private void createDatabase() { - logger.debug("createDatabase {}", getDatabase()); - try { - mongoDB = getMongo().getDB(getDatabase()); - mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); - logger.info("Start createCollection"); - mongoCollection = mongoDB.createCollection(getCollection(), null); - Assert.assertNotNull(mongoCollection); - } catch (Throwable t) { - logger.error("createDatabase failed.", t); - } - } - - private void createIndex() throws Exception { - getNode().client().prepareIndex("_river", river, "_meta").setSource(getSettingsString()).execute().actionGet(); - } - - private void cleanUp() { - logger.info("Drop database " + mongoDB.getName()); - mongoDB.dropDatabase(); - } - - @Test - public void testInitialImport() throws Throwable { - logger.debug("Start InitialImport"); - try { - createDatabase(); - createIndex(); - - DBObject dbObject1 = new BasicDBObject(ImmutableMap.of("name", "Richard")); - WriteResult result1 = mongoCollection.insert(dbObject1); - logger.info("WriteResult: {}", result1.toString()); - Thread.sleep(wait); - - TestSlurper slurper = createSlurper(); - Assert.assertTrue(slurper.assignCollections()); - Assert.assertFalse(slurper.riverHasIndexedSomething()); - - new Thread(slurper).start(); - Thread.sleep(wait); - - Assert.assertTrue(slurper.doInitialImportHasBeenCalled()); - } finally { - cleanUp(); - } - } - - private String getSettingsString() throws Exception { - return getJsonSettings(TEST_MONGODB_RIVER_SIMPLE_JSON, String.valueOf(getMongoPort1()), String.valueOf(getMongoPort2()), - String.valueOf(getMongoPort3()), database, collection, index); - } - - private TestSlurper createSlurper() throws Exception { - super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); - - RiverSettings riverSettings = new RiverSettings(ImmutableSettings.settingsBuilder().build(), XContentHelper.convertToMap( - getSettingsString().getBytes(), false).v2()); - RiverName riverName = new RiverName("mongodb", river); - MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName.name(), index, riverSettings, null); - - return new TestSlurper(mongo.getServerAddressList(), definition, new SharedContext(null, Status.RUNNING), getNode().client()); - } - - private static class TestSlurper extends Slurper { - - public TestSlurper(List mongoServers, MongoDBRiverDefinition definition, SharedContext context, Client client) { - super(mongoServers, definition, context, client); - } - - private boolean doInitialImportCalled = false; - - public BSONTimestamp doInitialImport() { - doInitialImportCalled = true; - return null; - } - - public boolean doInitialImportHasBeenCalled() { - return doInitialImportCalled; - } - - } - -} diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoIndexExistsTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoIndexExistsTest.java new file mode 100644 index 00000000..b7000618 --- /dev/null +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoIndexExistsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.river.mongodb.simple; + +import static org.elasticsearch.client.Requests.countRequest; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; +import org.elasticsearch.river.mongodb.Status; +import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.WriteConcern; +import com.mongodb.WriteResult; + +@Test +public class RiverMongoIndexExistsTest extends RiverMongoDBTestAbstract { + + private DB mongoDB; + private DBCollection mongoCollection; + + protected RiverMongoIndexExistsTest() { + super("testmongodb-" + System.currentTimeMillis(), + "testriver-" + System.currentTimeMillis(), + "person-" + System.currentTimeMillis(), + "personindex-" + System.currentTimeMillis()); + } + + @Test + public void dontDoInitialImportIfCollectionExists() throws Throwable { + logger.debug("Start InitialImport"); + try { + createDatabase(); + + DBObject dbObject1 = new BasicDBObject(ImmutableMap.of("name", "Richard")); + WriteResult result1 = mongoCollection.insert(dbObject1); + logger.info("WriteResult: {}", result1.toString()); + Thread.sleep(wait); + + createRiver(); + Thread.sleep(wait); + + ActionFuture response = getNode().client().admin().indices() + .exists(new IndicesExistsRequest(getIndex())); + assertThat(response.actionGet().isExists(), equalTo(true)); + refreshIndex(); + assertThat(getNode().client().count(countRequest(getIndex())).actionGet().getCount(), + equalTo(1l)); + + deleteRiver(); + createRiver(); + + Thread.sleep(wait); + Assert.assertEquals(Status.INITIAL_IMPORT_FAILED, + MongoDBRiverHelper.getRiverStatus(getNode().client(), river)); + } catch (Throwable t) { + logger.error("InitialImport failed.", t); + t.printStackTrace(); + throw t; + } finally { + cleanUp(); + } + } + + private void createDatabase() { + logger.debug("createDatabase {}", getDatabase()); + try { + mongoDB = getMongo().getDB(getDatabase()); + mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); + logger.info("Start createCollection"); + mongoCollection = mongoDB.createCollection(getCollection(), null); + Assert.assertNotNull(mongoCollection); + } catch (Throwable t) { + logger.error("createDatabase failed.", t); + } + } + + private void createRiver() throws Exception { + super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); + } + + private void cleanUp() { + super.deleteRiver(); + logger.info("Drop database " + mongoDB.getName()); + mongoDB.dropDatabase(); + } + +} diff --git a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java index 20920a3d..6a977e99 100644 --- a/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java +++ b/src/test/java/org/elasticsearch/river/mongodb/simple/RiverMongoInitialImportTest.java @@ -25,8 +25,12 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; -import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.river.mongodb.MongoDBRiver; import org.elasticsearch.river.mongodb.RiverMongoDBTestAbstract; +import org.elasticsearch.river.mongodb.Status; +import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper; import org.testng.Assert; import org.testng.annotations.Test; @@ -45,35 +49,14 @@ public class RiverMongoInitialImportTest extends RiverMongoDBTestAbstract { private DBCollection mongoCollection; protected RiverMongoInitialImportTest() { - super("testmongodb-" + System.currentTimeMillis(), "testriver-" + System.currentTimeMillis(), "person-" - + System.currentTimeMillis(), "personindex-" + System.currentTimeMillis()); - } - - private void createDatabase() { - logger.debug("createDatabase {}", getDatabase()); - try { - mongoDB = getMongo().getDB(getDatabase()); - mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); - logger.info("Start createCollection"); - mongoCollection = mongoDB.createCollection(getCollection(), null); - Assert.assertNotNull(mongoCollection); - } catch (Throwable t) { - logger.error("createDatabase failed.", t); - } - } - - private void createRiver() throws Exception { - super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); - } - - private void cleanUp() { - super.deleteRiver(); - logger.info("Drop database " + mongoDB.getName()); - mongoDB.dropDatabase(); + super("testmongodb-" + System.currentTimeMillis(), + "testriver-" + System.currentTimeMillis(), + "person-" + System.currentTimeMillis(), + "personindex-" + System.currentTimeMillis()); } @Test - public void InitialImport() throws Throwable { + public void initialImport() throws Throwable { logger.debug("Start InitialImport"); try { createDatabase(); @@ -83,32 +66,42 @@ public void InitialImport() throws Throwable { logger.info("WriteResult: {}", result1.toString()); Thread.sleep(wait); + // Make sure we're starting out with the river not setup + GetResponse statusResponse = getNode().client().prepareGet("_river", river, MongoDBRiver.STATUS_ID).execute().actionGet(); + Assert.assertFalse(statusResponse.isExists(), + "Expected no river but found one " + XContentMapValues.extractValue(MongoDBRiver.TYPE + "." + MongoDBRiver.STATUS_FIELD, statusResponse.getSourceAsMap())); + + // Setup the river createRiver(); Thread.sleep(wait); + // Check that it did an initial import successfully ActionFuture response = getNode().client().admin().indices() .exists(new IndicesExistsRequest(getIndex())); assertThat(response.actionGet().isExists(), equalTo(true)); - refreshIndex(); - CountResponse countResponse = getNode().client().count(countRequest(getIndex())).actionGet(); - assertThat(countResponse.getCount(), equalTo(1l)); + Assert.assertEquals(Status.RUNNING, + MongoDBRiverHelper.getRiverStatus(getNode().client(), river)); + assertThat(getNode().client().count(countRequest(getIndex())).actionGet().getCount(), + equalTo(1l)); + // Check that it syncs the oplog DBObject dbObject2 = new BasicDBObject(ImmutableMap.of("name", "Ben")); WriteResult result2 = mongoCollection.insert(dbObject2); logger.info("WriteResult: {}", result2.toString()); Thread.sleep(wait); refreshIndex(); - CountResponse countResponse2 = getNode().client().count(countRequest(getIndex())).actionGet(); - assertThat(countResponse2.getCount(), equalTo(2l)); + Assert.assertEquals(Status.RUNNING, + MongoDBRiverHelper.getRiverStatus(getNode().client(), river)); + assertThat(getNode().client().count(countRequest(getIndex())).actionGet().getCount(), + equalTo(2l)); mongoCollection.remove(dbObject1, WriteConcern.REPLICAS_SAFE); Thread.sleep(wait); refreshIndex(); - countResponse = getNode().client().count(countRequest(getIndex())).actionGet(); - logger.debug("Count after delete request: {}", countResponse.getCount()); - assertThat(countResponse.getCount(), equalTo(1L)); + assertThat(getNode().client().count(countRequest(getIndex())).actionGet().getCount(), + equalTo(1L)); } catch (Throwable t) { logger.error("InitialImport failed.", t); @@ -119,4 +112,27 @@ public void InitialImport() throws Throwable { } } + private void createDatabase() { + logger.debug("createDatabase {}", getDatabase()); + try { + mongoDB = getMongo().getDB(getDatabase()); + mongoDB.setWriteConcern(WriteConcern.REPLICAS_SAFE); + logger.info("Start createCollection"); + mongoCollection = mongoDB.createCollection(getCollection(), null); + Assert.assertNotNull(mongoCollection); + } catch (Throwable t) { + logger.error("createDatabase failed.", t); + } + } + + private void createRiver() throws Exception { + super.createRiver(TEST_MONGODB_RIVER_SIMPLE_JSON); + } + + private void cleanUp() { + super.deleteRiver(); + logger.info("Drop database " + mongoDB.getName()); + mongoDB.dropDatabase(); + } + }