diff --git a/mongordd/src/main/java/com/caffinc/sparktools/mongordd/MongoRDD.java b/mongordd/src/main/java/com/caffinc/sparktools/mongordd/MongoRDD.java index 2aec141..d5354ce 100644 --- a/mongordd/src/main/java/com/caffinc/sparktools/mongordd/MongoRDD.java +++ b/mongordd/src/main/java/com/caffinc/sparktools/mongordd/MongoRDD.java @@ -60,7 +60,7 @@ public MongoRDD(SparkContext sc, String mongoClientUriString, String database, S */ @Override public Iterator compute(Partition partition, TaskContext taskContext) { - LOG.info("Returning partition {}", partition); + LOG.info("Iterating partition {}", partition); return new MongoMapIterator(new MongoClient(new MongoClientURI(mongoClientUriString)).getDatabase(database).getCollection(collection) .find(query).skip(((MongoMapPartition) partition).from).limit(this.batchSize).iterator()); } @@ -74,7 +74,7 @@ public Iterator compute(Partition partition, TaskContext taskContext) public Partition[] getPartitions() { Partition[] partitionArray = new Partition[partitions]; for (int i = 0; i < partitions; i++) { - partitionArray[i] = new MongoMapPartition(i, i * batchSize); + partitionArray[i] = new MongoMapPartition(i, i * batchSize, batchSize); } return partitionArray; } @@ -125,16 +125,19 @@ private class MongoMapPartition implements Partition { private static final long serialVersionUID = 1L; private int index; private int from; + private int batchSize; /** * Constructor for a new Partition * - * @param index Index of this partition - * @param from MongoDB index to start the query from, corresponds to the skip() used while calling find() + * @param index Index of this partition + * @param from MongoDB index to start the query from, corresponds to the skip() used while calling find() + * @param batchSize MongoDB batchSize to limit the query to, corresponds to the limit() used while calling find() */ - public MongoMapPartition(int index, int from) { + public MongoMapPartition(int index, int from, int batchSize) { this.index = index; this.from = from; + this.batchSize = batchSize; } /** @@ -169,7 +172,7 @@ public int hashCode() { @Override public String toString() { - return "MongoMapPartition[index=" + index + ", from=" + from + "]"; + return "MongoMapPartition[index=" + index + ", from=" + from + ", batchSize=" + batchSize + "]"; } } } \ No newline at end of file