Skip to content

Commit

Permalink
Try fixing SPARK-2650 by adjusting initial buffer size and reducing m…
Browse files Browse the repository at this point in the history
…emory allocation
  • Loading branch information
liancheng committed Aug 4, 2014
1 parent e053c55 commit 001f2e5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)

private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024

private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
if (orig.remaining >= size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import org.apache.spark.sql.Row
* }}}
*/
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
private var nulls: ByteBuffer = _
protected var nulls: ByteBuffer = _
protected var nullCount: Int = _
private var pos: Int = _
private var nullCount: Int = _

abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
nulls = ByteBuffer.allocate(1024)
Expand Down Expand Up @@ -78,4 +78,9 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
buffer.rewind()
buffer
}

def buildNonNulls(): ByteBuffer = {
nulls.limit(nulls.position()).rewind()
super.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

this: NativeColumnBuilder[T] with WithCompressionSchemes =>

import CompressionScheme._

var compressionEncoders: Seq[Encoder[T]] = _

abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
Expand Down Expand Up @@ -81,28 +79,32 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
}
}

abstract override def build() = {
val rawBuffer = super.build()
override def build() = {
val nonNullBuffer = buildNonNulls()
val typeId = nonNullBuffer.getInt()
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}

val headerSize = columnHeaderSize(rawBuffer)
// Header = column type ID + null count + null positions
val headerSize = 4 + 4 + nulls.limit()
val compressedSize = if (encoder.compressedSize == 0) {
rawBuffer.limit - headerSize
nonNullBuffer.remaining()
} else {
encoder.compressedSize
}

// Reserves 4 bytes for compression scheme ID
val compressedBuffer = ByteBuffer
// Reserves 4 bytes for compression scheme ID
.allocate(headerSize + 4 + compressedSize)
.order(ByteOrder.nativeOrder)

copyColumnHeader(rawBuffer, compressedBuffer)
// Write the header
.putInt(typeId)
.putInt(nullCount)
.put(nulls)

logInfo(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}")
encoder.compress(rawBuffer, compressedBuffer, columnType)
encoder.compress(nonNullBuffer, compressedBuffer, columnType)
}
}

0 comments on commit 001f2e5

Please sign in to comment.