Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use scalariform #188

Merged
merged 1 commit into from
Jun 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.twitter.chill.akka
/*******************************************************************************
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,7 +14,8 @@ package com.twitter.chill.akka
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
* ****************************************************************************
*/

import akka.actor.ExtendedActorSystem
import akka.actor.ActorRef
Expand All @@ -23,30 +25,31 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output

/***
/**
* *
* This module provides helper classes for serialization of Akka-specific classes.
*
* @author Roman Levenstein
* @author P. Oscar Boykin
*/

import com.twitter.chill.{toRich, IKryoRegistrar}
import com.twitter.chill.{ toRich, IKryoRegistrar }

class ActorRefSerializer(system: ExtendedActorSystem) extends Serializer[ActorRef] with IKryoRegistrar {

def apply(kryo: Kryo): Unit = {
if(!kryo.alreadyRegistered(classOf[ActorRef])) {
if (!kryo.alreadyRegistered(classOf[ActorRef])) {
kryo.forClass[ActorRef](this)
kryo.forSubclass[ActorRef](this)
}
}

override def read(kryo: Kryo, input: Input, typ: Class[ActorRef]): ActorRef = {
val path = input.readString()
system.actorFor(path)
}
override def read(kryo: Kryo, input: Input, typ: Class[ActorRef]): ActorRef = {
val path = input.readString()
system.actorFor(path)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef) = {
override def write(kryo: Kryo, output: Output, obj: ActorRef) = {
output.writeString(Serialization.serializedActorPath(obj))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ limitations under the License.

package com.twitter.chill.akka

import com.twitter.chill.config.{Config => ChillConfig}
import com.typesafe.config.{Config => TypesafeConfig}
import com.twitter.chill.config.{ Config => ChillConfig }
import com.typesafe.config.{ Config => TypesafeConfig }
import com.typesafe.config.ConfigFactory

import scala.util.Try

/** Wraps the immutable typesafe.config.Config in a wrapper that
/**
* Wraps the immutable typesafe.config.Config in a wrapper that
* keeps track of the state and follows the semantics of ChillConfig
*/
class AkkaConfig(var typesafeConfig: TypesafeConfig) extends ChillConfig {
Expand All @@ -35,6 +36,6 @@ class AkkaConfig(var typesafeConfig: TypesafeConfig) extends ChillConfig {
ConfigFactory.parseString("%s = \"%s\"".format(key, v))
.withFallback(typesafeConfig)
}
.getOrElse(typesafeConfig.withoutPath(key))
.getOrElse(typesafeConfig.withoutPath(key))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

package com.twitter.chill.akka

import akka.actor.{ExtendedActorSystem, ActorRef}
import akka.actor.{ ExtendedActorSystem, ActorRef }
import akka.serialization.Serializer

import com.twitter.chill._
Expand Down Expand Up @@ -44,7 +44,8 @@ import com.twitter.chill.config.ConfiguredInstantiator
*/
class AkkaSerializer(system: ExtendedActorSystem) extends Serializer {

/** You can override this to easily change your serializers.
/**
* You can override this to easily change your serializers.
* If you do so, make sure to change the config to use the name of
* your new class
*/
Expand All @@ -71,7 +72,8 @@ class AkkaSerializer(system: ExtendedActorSystem) extends Serializer {
kryoPool.fromBytes(bytes)
}

/** Uses the Config system of chill.config to Configure at runtime which KryoInstantiator to use
/**
* Uses the Config system of chill.config to Configure at runtime which KryoInstantiator to use
* Overriding kryoInstantiator and using your own class name is probably easier for most cases.
* See ConfiguredInstantiator static methods for how to build up a correct Config with
* your reflected or serialized instantiators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AkkaTests extends Specification {
"AkkaSerializer" should {
"be selected for tuples" in {
// Find the Serializer for it
val serializer = serialization.findSerializerFor((1,2,3))
val serializer = serialization.findSerializerFor((1, 2, 3))
serializer.getClass.equals(classOf[AkkaSerializer]) must beTrue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@ import com.esotericsoftware.kryo.serializers.FieldSerializer

import com.twitter.chill.IKryoRegistrar

import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog,
HyperLogLogMonoid, Moments, SpaceSaver, DenseVector, SparseVector, AdaptiveVector}

import com.twitter.algebird.{
AveragedValue,
DecayedValue,
HLL,
HyperLogLog,
HyperLogLogMonoid,
Moments,
SpaceSaver,
DenseVector,
SparseVector,
AdaptiveVector
}

class AlgebirdRegistrar extends IKryoRegistrar {

Expand All @@ -34,7 +43,8 @@ class AlgebirdRegistrar extends IKryoRegistrar {
k.register(classOf[Moments], new MomentsSerializer)
k.addDefaultSerializer(classOf[HLL], new HLLSerializer)

/** AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer
/**
* AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer
* (which is its own bug), force using the fields serializer here
*/
k.register(classOf[DenseVector[_]], new FieldSerializer[DenseVector[_]](k, classOf[DenseVector[_]]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,44 @@ limitations under the License.
package com.twitter.chill.algebird

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{ Serializer => KSerializer }
import com.esotericsoftware.kryo.io.{ Input, Output }

import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog,
HyperLogLogMonoid, Moments, SpaceSaver, SSOne, SSMany}
import com.twitter.algebird.{
AveragedValue,
DecayedValue,
HLL,
HyperLogLog,
HyperLogLogMonoid,
Moments,
SpaceSaver,
SSOne,
SSMany
}

import scala.collection.mutable.{Map => MMap}
import scala.collection.mutable.{ Map => MMap }
import scala.collection.immutable.SortedMap

class AveragedValueSerializer extends KSerializer[AveragedValue] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : AveragedValue) {
def write(kser: Kryo, out: Output, s: AveragedValue) {
out.writeLong(s.count, true)
out.writeDouble(s.value)
}
def read(kser : Kryo, in : Input, cls : Class[AveragedValue]) : AveragedValue =
def read(kser: Kryo, in: Input, cls: Class[AveragedValue]): AveragedValue =
AveragedValue(in.readLong(true), in.readDouble)
}

class MomentsSerializer extends KSerializer[Moments] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : Moments) {
def write(kser: Kryo, out: Output, s: Moments) {
out.writeLong(s.m0, true)
out.writeDouble(s.m1)
out.writeDouble(s.m2)
out.writeDouble(s.m3)
out.writeDouble(s.m4)
}
def read(kser : Kryo, in : Input, cls : Class[Moments]) : Moments = {
def read(kser: Kryo, in: Input, cls: Class[Moments]): Moments = {
Moments(in.readLong(true),
in.readDouble,
in.readDouble,
Expand All @@ -53,36 +62,35 @@ class MomentsSerializer extends KSerializer[Moments] {
}
}


class DecayedValueSerializer extends KSerializer[DecayedValue] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : DecayedValue) {
def write(kser: Kryo, out: Output, s: DecayedValue) {
out.writeDouble(s.value)
out.writeDouble(s.scaledTime)
}
def read(kser : Kryo, in : Input, cls : Class[DecayedValue]) : DecayedValue =
def read(kser: Kryo, in: Input, cls: Class[DecayedValue]): DecayedValue =
DecayedValue(in.readDouble, in.readDouble)
}

class HLLSerializer extends KSerializer[HLL] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : HLL) {
def write(kser: Kryo, out: Output, s: HLL) {
val bytes = HyperLogLog.toBytes(s)
out.writeInt(bytes.size, true)
out.writeBytes(bytes)
}
def read(kser : Kryo, in : Input, cls : Class[HLL]) : HLL = {
def read(kser: Kryo, in: Input, cls: Class[HLL]): HLL = {
HyperLogLog.fromBytes(in.readBytes(in.readInt(true)))
}
}

class HLLMonoidSerializer extends KSerializer[HyperLogLogMonoid] {
setImmutable(true)
val hllMonoids = MMap[Int,HyperLogLogMonoid]()
def write(kser: Kryo, out : Output, mon : HyperLogLogMonoid) {
val hllMonoids = MMap[Int, HyperLogLogMonoid]()
def write(kser: Kryo, out: Output, mon: HyperLogLogMonoid) {
out.writeInt(mon.bits, true)
}
def read(kser : Kryo, in : Input, cls : Class[HyperLogLogMonoid]) : HyperLogLogMonoid = {
def read(kser: Kryo, in: Input, cls: Class[HyperLogLogMonoid]): HyperLogLogMonoid = {
val bits = in.readInt(true)
hllMonoids.getOrElseUpdate(bits, new HyperLogLogMonoid(bits))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class AlgebirdSerializersSpec extends Specification {
f(result) must_== f(x)
}


"kryo with AlgebirdRegistrar" should {
"serialize and deserialize AveragedValue" in {
roundtrip(AveragedValue(10L, 123.45))
Expand All @@ -71,8 +70,8 @@ class AlgebirdSerializersSpec extends Specification {
}

"serialize and deserialize SparseVector and DenseVector" in {
val sparse = AdaptiveVector.fromVector(Vector(1,1,1,1,1,3), 1)
val dense = AdaptiveVector.fromVector(Vector(1,2,3,1,2,3), 1)
val sparse = AdaptiveVector.fromVector(Vector(1, 1, 1, 1, 1, 3), 1)
val dense = AdaptiveVector.fromVector(Vector(1, 2, 3, 1, 2, 3), 1)
roundtrip(sparse)
roundtrip(dense)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
package com.twitter.chill.avro

import org.apache.avro.specific.SpecificRecordBase
import com.twitter.chill.{InjectiveSerializer, KSerializer}
import com.twitter.bijection.avro.{GenericAvroCodecs, SpecificAvroCodecs}
import com.twitter.chill.{ InjectiveSerializer, KSerializer }
import com.twitter.bijection.avro.{ GenericAvroCodecs, SpecificAvroCodecs }
import org.apache.avro.Schema
import com.twitter.bijection.Injection
import org.apache.avro.generic.GenericData.Record
Expand All @@ -28,25 +28,25 @@ import org.apache.avro.generic.GenericRecord
*/
object AvroSerializer {

def SpecificRecordSerializer[T <: SpecificRecordBase : Manifest]: KSerializer[T] = {
def SpecificRecordSerializer[T <: SpecificRecordBase: Manifest]: KSerializer[T] = {
implicit val inj = SpecificAvroCodecs[T]
InjectiveSerializer.asKryo
}

def SpecificRecordBinarySerializer[T <: SpecificRecordBase : Manifest]: KSerializer[T] = {
def SpecificRecordBinarySerializer[T <: SpecificRecordBase: Manifest]: KSerializer[T] = {
implicit val inj = SpecificAvroCodecs.toBinary[T]
InjectiveSerializer.asKryo
}

def SpecificRecordJsonSerializer[T <: SpecificRecordBase : Manifest](schema: Schema): KSerializer[T] = {
def SpecificRecordJsonSerializer[T <: SpecificRecordBase: Manifest](schema: Schema): KSerializer[T] = {
import com.twitter.bijection.StringCodec.utf8
implicit val inj = SpecificAvroCodecs.toJson[T](schema)
implicit val avroToArray = Injection.connect[T, String, Array[Byte]]
InjectiveSerializer.asKryo
}

def GenericRecordSerializer[T <: GenericRecord : Manifest](schema: Schema = null): KSerializer[T] = {
implicit val inj = GenericAvroCodecs[T](schema)
def GenericRecordSerializer[T <: GenericRecord: Manifest](schema: Schema = null): KSerializer[T] = {
implicit val inj = GenericAvroCodecs[T](schema)
InjectiveSerializer.asKryo
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ limitations under the License.
package com.twitter.chill.avro

import org.specs.Specification
import com.twitter.chill.{KSerializer, ScalaKryoInstantiator, KryoPool}
import com.twitter.chill.{ KSerializer, ScalaKryoInstantiator, KryoPool }
import avro.FiscalRecord
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.SchemaBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import com.twitter.bijection.{ Bufferable, Bijection, ImplicitBijection, Injecti
object BijectionEnrichedKryo {
implicit def enrich(k: Kryo): BijectionEnrichedKryo = new BijectionEnrichedKryo(k)

/** Use a bijection[A,B] then the KSerializer on B
/**
* Use a bijection[A,B] then the KSerializer on B
*/
def viaBijection[A,B](kser: KSerializer[B])(implicit bij: ImplicitBijection[A,B], cmf: ClassManifest[B]): KSerializer[A] =
def viaBijection[A, B](kser: KSerializer[B])(implicit bij: ImplicitBijection[A, B], cmf: ClassManifest[B]): KSerializer[A] =
new KSerializer[A] {
def write(k: Kryo, out: Output, obj: A) { kser.write(k, out, bij(obj)) }
def read(k: Kryo, in: Input, cls: Class[A]) =
Expand Down Expand Up @@ -52,16 +53,17 @@ class BijectionEnrichedKryo(k: Kryo) {
k
}

/** B has to already be registered, then use the KSerializer[B] to create KSerialzer[A]
/**
* B has to already be registered, then use the KSerializer[B] to create KSerialzer[A]
*/
def forClassViaBijection[A,B](implicit bij: ImplicitBijection[A,B], acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
def forClassViaBijection[A, B](implicit bij: ImplicitBijection[A, B], acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
val kserb = k.getSerializer(bcmf.erasure).asInstanceOf[KSerializer[B]]
k.register(acmf.erasure, BijectionEnrichedKryo.viaBijection[A,B](kserb))
k.register(acmf.erasure, BijectionEnrichedKryo.viaBijection[A, B](kserb))
k
}

/** Helpful override to alleviate rewriting types. */
def forClassViaBijection[A,B](bij: Bijection[A,B])(implicit acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
def forClassViaBijection[A, B](bij: Bijection[A, B])(implicit acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
implicit def implicitBij = bij
this.forClassViaBijection[A, B]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class InjectionRegistrar[T](val klass: Class[T], @transient b: Injection[T, Arra
implicit def injection = bBox.copy

def apply(k: Kryo) {
if(!k.alreadyRegistered(klass)) {
if (!k.alreadyRegistered(klass)) {
k.register(klass, InjectiveSerializer.asKryo[T])
}
}
Expand All @@ -39,7 +39,7 @@ object InjectionDefaultRegistrar {
class InjectionDefaultRegistrar[T](klass: Class[T], @transient b: Injection[T, Array[Byte]])
extends InjectionRegistrar(klass, b) {
override def apply(k: Kryo) {
if(!k.alreadyRegistered(klass)) {
if (!k.alreadyRegistered(klass)) {
k.addDefaultSerializer(klass, InjectiveSerializer.asKryo[T])
k.register(klass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ object KryoInjection extends Injection[Any, Array[Byte]] {
def instance(kryoPool: => KryoPool): Injection[Any, Array[Byte]] =
new KryoInjectionInstance(kryoPool)

/** Creates a small pool (size 2) and uses it as an Injection
/**
* Creates a small pool (size 2) and uses it as an Injection
* Note the implicit in the package from () => Kryo to KryoInstatiator.
* It is ESSENTIAL that this function is allocating new Kryos, or we will
* not be thread-safe
Expand All @@ -56,7 +57,7 @@ class KryoInjectionInstance(lazyKryoP: => KryoPool) extends Injection[Any, Array
@transient private var kpool: KryoPool = null

private def kryoP: KryoPool = mutex.synchronized {
if(null == kpool) { kpool = lazyKryoP }
if (null == kpool) { kpool = lazyKryoP }
kpool
}

Expand Down
Loading