Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jun 24, 2015
1 parent 3676a82 commit 20821ec
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,20 @@ case class TakeOrderedAndProject(

private val projection = projectList.map(newProjection(_, child.output))

private def collectData(): Iterator[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord).toIterator
private def collectData(): Array[InternalRow] = {
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
projection.map(data.map(_)).getOrElse(data)
}

override def executeCollect(): Array[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(schema)
collectData().map(converter(_).asInstanceOf[Row]).toArray
collectData().map(converter(_).asInstanceOf[Row])
}

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
protected override def doExecute(): RDD[InternalRow] =
sparkContext.makeRDD(collectData().toArray[InternalRow], 1)
sparkContext.makeRDD(collectData(), 1)

override def outputOrdering: Seq[SortOrder] = sortOrder
}
Expand Down

0 comments on commit 20821ec

Please sign in to comment.