Skip to content

Commit

Permalink
Adding batchSize to partition, making log more descriptive
Browse files Browse the repository at this point in the history
  • Loading branch information
Sriram Keerthi Madhava Kunjathur authored and Sriram Keerthi Madhava Kunjathur committed Mar 4, 2016
1 parent 6eaa571 commit f610b28
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public MongoRDD(SparkContext sc, String mongoClientUriString, String database, S
*/
@Override
public Iterator<Document> compute(Partition partition, TaskContext taskContext) {
LOG.info("Returning partition {}", partition);
LOG.info("Iterating partition {}", partition);
return new MongoMapIterator(new MongoClient(new MongoClientURI(mongoClientUriString)).getDatabase(database).getCollection(collection)
.find(query).skip(((MongoMapPartition) partition).from).limit(this.batchSize).iterator());
}
Expand All @@ -74,7 +74,7 @@ public Iterator<Document> compute(Partition partition, TaskContext taskContext)
public Partition[] getPartitions() {
Partition[] partitionArray = new Partition[partitions];
for (int i = 0; i < partitions; i++) {
partitionArray[i] = new MongoMapPartition(i, i * batchSize);
partitionArray[i] = new MongoMapPartition(i, i * batchSize, batchSize);
}
return partitionArray;
}
Expand Down Expand Up @@ -125,16 +125,19 @@ private class MongoMapPartition implements Partition {
private static final long serialVersionUID = 1L;
private int index;
private int from;
private int batchSize;

/**
* Constructor for a new Partition
*
* @param index Index of this partition
* @param from MongoDB index to start the query from, corresponds to the skip() used while calling find()
* @param index Index of this partition
* @param from MongoDB index to start the query from, corresponds to the skip() used while calling find()
* @param batchSize MongoDB batchSize to limit the query to, corresponds to the limit() used while calling find()
*/
public MongoMapPartition(int index, int from) {
public MongoMapPartition(int index, int from, int batchSize) {
this.index = index;
this.from = from;
this.batchSize = batchSize;
}

/**
Expand Down Expand Up @@ -169,7 +172,7 @@ public int hashCode() {

@Override
public String toString() {
return "MongoMapPartition[index=" + index + ", from=" + from + "]";
return "MongoMapPartition[index=" + index + ", from=" + from + ", batchSize=" + batchSize + "]";
}
}
}

0 comments on commit f610b28

Please sign in to comment.