Skip to content

Commit

Permalink
update indexing in the helper to use multiple persists and final merg…
Browse files Browse the repository at this point in the history
…e to

catch further issues in aggregator implementations
  • Loading branch information
himanshug committed Sep 9, 2015
1 parent 07266d6 commit 4491103
Showing 1 changed file with 50 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.CharSource;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -58,9 +57,11 @@
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
Expand All @@ -71,7 +72,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -237,19 +238,57 @@ public void createIndex(
int maxRowCount
) throws Exception
{
try(IncrementalIndex index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount)) {
while (rows.hasNext()) {
IncrementalIndex index = null;
List<File> toMerge = new ArrayList<>();

try {
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount);
while (rows.hasNext()) {
Object row = rows.next();
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
//InputRowsParser<String>
index.add(((StringInputRowParser) parser).parse((String) row));
} else {
index.add(parser.parse(row));
try {
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
//InputRowsParser<String>
index.add(((StringInputRowParser) parser).parse((String) row));
} else {
index.add(parser.parse(row));
}
}
catch (IndexSizeExceededException ex) {
File tmp = Files.createTempDir();
toMerge.add(tmp);
IndexMerger.persist(index, tmp, null, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount);
}
}

if (toMerge.size() > 0) {
File tmp = Files.createTempDir();
toMerge.add(tmp);
IndexMerger.persist(index, tmp, null, new IndexSpec());

List<QueryableIndex> indexes = new ArrayList<>(toMerge.size());
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
}
IndexMerger.mergeQueryableIndex(indexes, metrics, outDir, new IndexSpec());

for (QueryableIndex qi : indexes) {
qi.close();
}
} else {
IndexMerger.persist(index, outDir, null, new IndexSpec());
}
}
finally {
if (index != null) {
index.close();
}

for (File file : toMerge) {
FileUtils.deleteDirectory(file);
}
IndexMerger.persist(index, outDir, null, new IndexSpec());
}
}

Expand Down

0 comments on commit 4491103

Please sign in to comment.