Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chill-avro for delegating serialization of avro objects #171

Closed
johnynek opened this issue Feb 7, 2014 · 7 comments
Closed

Add chill-avro for delegating serialization of avro objects #171

johnynek opened this issue Feb 7, 2014 · 7 comments

Comments

@johnynek
Copy link
Collaborator

johnynek commented Feb 7, 2014

No description provided.

@ash211
Copy link

ash211 commented Feb 7, 2014

Motivation is that users of chill want to have Kryo serialization delegate to Avro's serialization. Because Avro objects aren't Java serializable, users have to add a generic version themselves every time.

https://spark-project.atlassian.net/browse/SPARK-746

Beginnings of an implementation:

https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala

case class InputStreamWithDecoder(size: Int) {
  val buffer = new Array[Byte](size)
  val stream = new FastByteArrayInputStream(buffer)
  val decoder = DecoderFactory.get().directBinaryDecoder(stream, null.asInstanceOf[BinaryDecoder])
}

// NOTE: This class is not thread-safe; however, Spark guarantees that only a single thread will access it.
class AvroSerializer[T <: SpecificRecord : ClassManifest] extends Serializer[T] {
  val reader = new SpecificDatumReader[T](classManifest[T].erasure.asInstanceOf[Class[T]])
  val writer = new SpecificDatumWriter[T](classManifest[T].erasure.asInstanceOf[Class[T]])
  var in = InputStreamWithDecoder(1024)
  val outstream = new FastByteArrayOutputStream()
  val encoder = EncoderFactory.get().directBinaryEncoder(outstream, null.asInstanceOf[BinaryEncoder])

  setAcceptsNull(false)

  def write(kryo: Kryo, kryoOut: Output, record: T) = {
    outstream.reset()
    writer.write(record, encoder)
    kryoOut.writeInt(outstream.array.length, true)
    kryoOut.write(outstream.array)
  }

  def read(kryo: Kryo, kryoIn: Input, klazz: Class[T]): T = this.synchronized {
    val len = kryoIn.readInt(true)
    if (len > in.size) {
      in = InputStreamWithDecoder(len + 1024)
    }
    in.stream.reset()
    // Read Kryo bytes into input buffer
    kryoIn.readBytes(in.buffer, 0, len)
    // Read the Avro object from the buffer
    reader.read(null.asInstanceOf[T], in.decoder)
  }
}

class AdamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[ADAMRecord], new AvroSerializer[ADAMRecord]())
    kryo.register(classOf[ADAMPileup], new AvroSerializer[ADAMPileup]())
    kryo.register(classOf[ADAMGenotype], new AvroSerializer[ADAMGenotype]())
    kryo.register(classOf[ReferencePositionWithOrientation], new ReferencePositionWithOrientationSerializer)
    kryo.register(classOf[ReferencePosition], new ReferencePositionSerializer)
    kryo.register(classOf[ReferencePositionPair], new ReferencePositionPairSerializer)
    kryo.register(classOf[SingleReadBucket], new SingleReadBucketSerializer)
  }
}

@MansurAshraf
Copy link
Contributor

Some of the code in the above implementation can be abstracted by using avro-bijections

https://github.com/twitter/bijection/blob/develop/bijection-avro/src/main/scala/com/twitter/bijection/avro/AvroCodecs.scala

@MansurAshraf
Copy link
Contributor

so I looked into this a little bit more and it seems like the code above is completely redundant. We have Avro Injections for Both Generic and Specific Records and we also have InjectiveSerializer. We can replace all the code above with these few lines

object AvroSerializer{

  def asAvroSerializer[T <: SpecificRecordBase : Manifest]={
    implicit val inj=SpecificAvroCodecs.toBinary[T]
    InjectiveSerializer.asKryo
  }
}

and then you can register as

import AvroSerializer._
class AdamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
    kryo.register(classOf[ADAMRecord], asAvroSerializer[ADAMRecord])

}

I have not compiled and tested this but I am pretty sure this will work

@ash211
Copy link

ash211 commented Feb 7, 2014

This might fall under Spark now, but ideally you wouldn't have to register each avro class individually in that registerClasses() call. Is there some way to register all classes from Matt's example at once?

The end goal would be to have it just work in Spark, no kryo.register() required from users of that framework, and I think that's possible if you can register all avro classes in one sweep.

@ccsevers
Copy link

ccsevers commented Feb 7, 2014

It should be possible to have something for IndexedRecord, which sits above both GenericRecord and SpecificRecord. I made something similar for cascading.avro but having it in Chill is a much better place.

@rjurney
Copy link

rjurney commented Jun 4, 2014

How can I use this stuff with Spark to load Avros? I can't find any kind of example.

@ianoc
Copy link
Collaborator

ianoc commented Jun 4, 2014

This isn't really for loading data from disk, its used for inflight data,
i.e. between hosts and on shuffle's in spark. For spark you would need to
use the hadoop input api calls and have an hadoop file input format.

On Tue, Jun 3, 2014 at 8:56 PM, Russell Jurney [email protected]
wrote:

How can I use this stuff with Spark to load Avros? I can't find any kind
of example.


Reply to this email directly or view it on GitHub
#171 (comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants