Skip to content

Commit

Permalink
Merge pull request #188 from twitter/jco/scalariform
Browse files Browse the repository at this point in the history
Use scalariform
  • Loading branch information
ianoc committed Jun 19, 2014
2 parents 4c95fbd + 8c9eead commit b0e1028
Show file tree
Hide file tree
Showing 45 changed files with 613 additions and 621 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.twitter.chill.akka
/*******************************************************************************
/**
* *****************************************************************************
* Copyright 2012 Roman Levenstein
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,7 +14,8 @@ package com.twitter.chill.akka
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
* ****************************************************************************
*/

import akka.actor.ExtendedActorSystem
import akka.actor.ActorRef
Expand All @@ -23,30 +25,31 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output

/***
/**
* *
* This module provides helper classes for serialization of Akka-specific classes.
*
* @author Roman Levenstein
* @author P. Oscar Boykin
*/

import com.twitter.chill.{toRich, IKryoRegistrar}
import com.twitter.chill.{ toRich, IKryoRegistrar }

class ActorRefSerializer(system: ExtendedActorSystem) extends Serializer[ActorRef] with IKryoRegistrar {

def apply(kryo: Kryo): Unit = {
if(!kryo.alreadyRegistered(classOf[ActorRef])) {
if (!kryo.alreadyRegistered(classOf[ActorRef])) {
kryo.forClass[ActorRef](this)
kryo.forSubclass[ActorRef](this)
}
}

override def read(kryo: Kryo, input: Input, typ: Class[ActorRef]): ActorRef = {
val path = input.readString()
system.actorFor(path)
}
override def read(kryo: Kryo, input: Input, typ: Class[ActorRef]): ActorRef = {
val path = input.readString()
system.actorFor(path)
}

override def write(kryo: Kryo, output: Output, obj: ActorRef) = {
override def write(kryo: Kryo, output: Output, obj: ActorRef) = {
output.writeString(Serialization.serializedActorPath(obj))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ limitations under the License.

package com.twitter.chill.akka

import com.twitter.chill.config.{Config => ChillConfig}
import com.typesafe.config.{Config => TypesafeConfig}
import com.twitter.chill.config.{ Config => ChillConfig }
import com.typesafe.config.{ Config => TypesafeConfig }
import com.typesafe.config.ConfigFactory

import scala.util.Try

/** Wraps the immutable typesafe.config.Config in a wrapper that
/**
* Wraps the immutable typesafe.config.Config in a wrapper that
* keeps track of the state and follows the semantics of ChillConfig
*/
class AkkaConfig(var typesafeConfig: TypesafeConfig) extends ChillConfig {
Expand All @@ -35,6 +36,6 @@ class AkkaConfig(var typesafeConfig: TypesafeConfig) extends ChillConfig {
ConfigFactory.parseString("%s = \"%s\"".format(key, v))
.withFallback(typesafeConfig)
}
.getOrElse(typesafeConfig.withoutPath(key))
.getOrElse(typesafeConfig.withoutPath(key))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

package com.twitter.chill.akka

import akka.actor.{ExtendedActorSystem, ActorRef}
import akka.actor.{ ExtendedActorSystem, ActorRef }
import akka.serialization.Serializer

import com.twitter.chill._
Expand Down Expand Up @@ -44,7 +44,8 @@ import com.twitter.chill.config.ConfiguredInstantiator
*/
class AkkaSerializer(system: ExtendedActorSystem) extends Serializer {

/** You can override this to easily change your serializers.
/**
* You can override this to easily change your serializers.
* If you do so, make sure to change the config to use the name of
* your new class
*/
Expand All @@ -71,7 +72,8 @@ class AkkaSerializer(system: ExtendedActorSystem) extends Serializer {
kryoPool.fromBytes(bytes)
}

/** Uses the Config system of chill.config to Configure at runtime which KryoInstantiator to use
/**
* Uses the Config system of chill.config to Configure at runtime which KryoInstantiator to use
* Overriding kryoInstantiator and using your own class name is probably easier for most cases.
* See ConfiguredInstantiator static methods for how to build up a correct Config with
* your reflected or serialized instantiators.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AkkaTests extends Specification {
"AkkaSerializer" should {
"be selected for tuples" in {
// Find the Serializer for it
val serializer = serialization.findSerializerFor((1,2,3))
val serializer = serialization.findSerializerFor((1, 2, 3))
serializer.getClass.equals(classOf[AkkaSerializer]) must beTrue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@ import com.esotericsoftware.kryo.serializers.FieldSerializer

import com.twitter.chill.IKryoRegistrar

import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog,
HyperLogLogMonoid, Moments, SpaceSaver, DenseVector, SparseVector, AdaptiveVector}

import com.twitter.algebird.{
AveragedValue,
DecayedValue,
HLL,
HyperLogLog,
HyperLogLogMonoid,
Moments,
SpaceSaver,
DenseVector,
SparseVector,
AdaptiveVector
}

class AlgebirdRegistrar extends IKryoRegistrar {

Expand All @@ -34,7 +43,8 @@ class AlgebirdRegistrar extends IKryoRegistrar {
k.register(classOf[Moments], new MomentsSerializer)
k.addDefaultSerializer(classOf[HLL], new HLLSerializer)

/** AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer
/**
* AdaptiveVector is IndexedSeq, which picks up the chill IndexedSeq serializer
* (which is its own bug), force using the fields serializer here
*/
k.register(classOf[DenseVector[_]], new FieldSerializer[DenseVector[_]](k, classOf[DenseVector[_]]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,44 @@ limitations under the License.
package com.twitter.chill.algebird

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{ Serializer => KSerializer }
import com.esotericsoftware.kryo.io.{ Input, Output }

import com.twitter.algebird.{AveragedValue, DecayedValue, HLL, HyperLogLog,
HyperLogLogMonoid, Moments, SpaceSaver, SSOne, SSMany}
import com.twitter.algebird.{
AveragedValue,
DecayedValue,
HLL,
HyperLogLog,
HyperLogLogMonoid,
Moments,
SpaceSaver,
SSOne,
SSMany
}

import scala.collection.mutable.{Map => MMap}
import scala.collection.mutable.{ Map => MMap }
import scala.collection.immutable.SortedMap

class AveragedValueSerializer extends KSerializer[AveragedValue] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : AveragedValue) {
def write(kser: Kryo, out: Output, s: AveragedValue) {
out.writeLong(s.count, true)
out.writeDouble(s.value)
}
def read(kser : Kryo, in : Input, cls : Class[AveragedValue]) : AveragedValue =
def read(kser: Kryo, in: Input, cls: Class[AveragedValue]): AveragedValue =
AveragedValue(in.readLong(true), in.readDouble)
}

class MomentsSerializer extends KSerializer[Moments] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : Moments) {
def write(kser: Kryo, out: Output, s: Moments) {
out.writeLong(s.m0, true)
out.writeDouble(s.m1)
out.writeDouble(s.m2)
out.writeDouble(s.m3)
out.writeDouble(s.m4)
}
def read(kser : Kryo, in : Input, cls : Class[Moments]) : Moments = {
def read(kser: Kryo, in: Input, cls: Class[Moments]): Moments = {
Moments(in.readLong(true),
in.readDouble,
in.readDouble,
Expand All @@ -53,36 +62,35 @@ class MomentsSerializer extends KSerializer[Moments] {
}
}


class DecayedValueSerializer extends KSerializer[DecayedValue] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : DecayedValue) {
def write(kser: Kryo, out: Output, s: DecayedValue) {
out.writeDouble(s.value)
out.writeDouble(s.scaledTime)
}
def read(kser : Kryo, in : Input, cls : Class[DecayedValue]) : DecayedValue =
def read(kser: Kryo, in: Input, cls: Class[DecayedValue]): DecayedValue =
DecayedValue(in.readDouble, in.readDouble)
}

class HLLSerializer extends KSerializer[HLL] {
setImmutable(true)
def write(kser: Kryo, out : Output, s : HLL) {
def write(kser: Kryo, out: Output, s: HLL) {
val bytes = HyperLogLog.toBytes(s)
out.writeInt(bytes.size, true)
out.writeBytes(bytes)
}
def read(kser : Kryo, in : Input, cls : Class[HLL]) : HLL = {
def read(kser: Kryo, in: Input, cls: Class[HLL]): HLL = {
HyperLogLog.fromBytes(in.readBytes(in.readInt(true)))
}
}

class HLLMonoidSerializer extends KSerializer[HyperLogLogMonoid] {
setImmutable(true)
val hllMonoids = MMap[Int,HyperLogLogMonoid]()
def write(kser: Kryo, out : Output, mon : HyperLogLogMonoid) {
val hllMonoids = MMap[Int, HyperLogLogMonoid]()
def write(kser: Kryo, out: Output, mon: HyperLogLogMonoid) {
out.writeInt(mon.bits, true)
}
def read(kser : Kryo, in : Input, cls : Class[HyperLogLogMonoid]) : HyperLogLogMonoid = {
def read(kser: Kryo, in: Input, cls: Class[HyperLogLogMonoid]): HyperLogLogMonoid = {
val bits = in.readInt(true)
hllMonoids.getOrElseUpdate(bits, new HyperLogLogMonoid(bits))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class AlgebirdSerializersSpec extends Specification {
f(result) must_== f(x)
}


"kryo with AlgebirdRegistrar" should {
"serialize and deserialize AveragedValue" in {
roundtrip(AveragedValue(10L, 123.45))
Expand All @@ -71,8 +70,8 @@ class AlgebirdSerializersSpec extends Specification {
}

"serialize and deserialize SparseVector and DenseVector" in {
val sparse = AdaptiveVector.fromVector(Vector(1,1,1,1,1,3), 1)
val dense = AdaptiveVector.fromVector(Vector(1,2,3,1,2,3), 1)
val sparse = AdaptiveVector.fromVector(Vector(1, 1, 1, 1, 1, 3), 1)
val dense = AdaptiveVector.fromVector(Vector(1, 2, 3, 1, 2, 3), 1)
roundtrip(sparse)
roundtrip(dense)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ limitations under the License.
package com.twitter.chill.avro

import org.apache.avro.specific.SpecificRecordBase
import com.twitter.chill.{InjectiveSerializer, KSerializer}
import com.twitter.bijection.avro.{GenericAvroCodecs, SpecificAvroCodecs}
import com.twitter.chill.{ InjectiveSerializer, KSerializer }
import com.twitter.bijection.avro.{ GenericAvroCodecs, SpecificAvroCodecs }
import org.apache.avro.Schema
import com.twitter.bijection.Injection
import org.apache.avro.generic.GenericData.Record
Expand All @@ -28,25 +28,25 @@ import org.apache.avro.generic.GenericRecord
*/
object AvroSerializer {

def SpecificRecordSerializer[T <: SpecificRecordBase : Manifest]: KSerializer[T] = {
def SpecificRecordSerializer[T <: SpecificRecordBase: Manifest]: KSerializer[T] = {
implicit val inj = SpecificAvroCodecs[T]
InjectiveSerializer.asKryo
}

def SpecificRecordBinarySerializer[T <: SpecificRecordBase : Manifest]: KSerializer[T] = {
def SpecificRecordBinarySerializer[T <: SpecificRecordBase: Manifest]: KSerializer[T] = {
implicit val inj = SpecificAvroCodecs.toBinary[T]
InjectiveSerializer.asKryo
}

def SpecificRecordJsonSerializer[T <: SpecificRecordBase : Manifest](schema: Schema): KSerializer[T] = {
def SpecificRecordJsonSerializer[T <: SpecificRecordBase: Manifest](schema: Schema): KSerializer[T] = {
import com.twitter.bijection.StringCodec.utf8
implicit val inj = SpecificAvroCodecs.toJson[T](schema)
implicit val avroToArray = Injection.connect[T, String, Array[Byte]]
InjectiveSerializer.asKryo
}

def GenericRecordSerializer[T <: GenericRecord : Manifest](schema: Schema = null): KSerializer[T] = {
implicit val inj = GenericAvroCodecs[T](schema)
def GenericRecordSerializer[T <: GenericRecord: Manifest](schema: Schema = null): KSerializer[T] = {
implicit val inj = GenericAvroCodecs[T](schema)
InjectiveSerializer.asKryo
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ limitations under the License.
package com.twitter.chill.avro

import org.specs.Specification
import com.twitter.chill.{KSerializer, ScalaKryoInstantiator, KryoPool}
import com.twitter.chill.{ KSerializer, ScalaKryoInstantiator, KryoPool }
import avro.FiscalRecord
import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.SchemaBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import com.twitter.bijection.{ Bufferable, Bijection, ImplicitBijection, Injecti
object BijectionEnrichedKryo {
implicit def enrich(k: Kryo): BijectionEnrichedKryo = new BijectionEnrichedKryo(k)

/** Use a bijection[A,B] then the KSerializer on B
/**
* Use a bijection[A,B] then the KSerializer on B
*/
def viaBijection[A,B](kser: KSerializer[B])(implicit bij: ImplicitBijection[A,B], cmf: ClassManifest[B]): KSerializer[A] =
def viaBijection[A, B](kser: KSerializer[B])(implicit bij: ImplicitBijection[A, B], cmf: ClassManifest[B]): KSerializer[A] =
new KSerializer[A] {
def write(k: Kryo, out: Output, obj: A) { kser.write(k, out, bij(obj)) }
def read(k: Kryo, in: Input, cls: Class[A]) =
Expand Down Expand Up @@ -52,16 +53,17 @@ class BijectionEnrichedKryo(k: Kryo) {
k
}

/** B has to already be registered, then use the KSerializer[B] to create KSerialzer[A]
/**
* B has to already be registered, then use the KSerializer[B] to create KSerialzer[A]
*/
def forClassViaBijection[A,B](implicit bij: ImplicitBijection[A,B], acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
def forClassViaBijection[A, B](implicit bij: ImplicitBijection[A, B], acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
val kserb = k.getSerializer(bcmf.erasure).asInstanceOf[KSerializer[B]]
k.register(acmf.erasure, BijectionEnrichedKryo.viaBijection[A,B](kserb))
k.register(acmf.erasure, BijectionEnrichedKryo.viaBijection[A, B](kserb))
k
}

/** Helpful override to alleviate rewriting types. */
def forClassViaBijection[A,B](bij: Bijection[A,B])(implicit acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
def forClassViaBijection[A, B](bij: Bijection[A, B])(implicit acmf: ClassManifest[A], bcmf: ClassManifest[B]): Kryo = {
implicit def implicitBij = bij
this.forClassViaBijection[A, B]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class InjectionRegistrar[T](val klass: Class[T], @transient b: Injection[T, Arra
implicit def injection = bBox.copy

def apply(k: Kryo) {
if(!k.alreadyRegistered(klass)) {
if (!k.alreadyRegistered(klass)) {
k.register(klass, InjectiveSerializer.asKryo[T])
}
}
Expand All @@ -39,7 +39,7 @@ object InjectionDefaultRegistrar {
class InjectionDefaultRegistrar[T](klass: Class[T], @transient b: Injection[T, Array[Byte]])
extends InjectionRegistrar(klass, b) {
override def apply(k: Kryo) {
if(!k.alreadyRegistered(klass)) {
if (!k.alreadyRegistered(klass)) {
k.addDefaultSerializer(klass, InjectiveSerializer.asKryo[T])
k.register(klass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ object KryoInjection extends Injection[Any, Array[Byte]] {
def instance(kryoPool: => KryoPool): Injection[Any, Array[Byte]] =
new KryoInjectionInstance(kryoPool)

/** Creates a small pool (size 2) and uses it as an Injection
/**
* Creates a small pool (size 2) and uses it as an Injection
* Note the implicit in the package from () => Kryo to KryoInstatiator.
* It is ESSENTIAL that this function is allocating new Kryos, or we will
* not be thread-safe
Expand All @@ -56,7 +57,7 @@ class KryoInjectionInstance(lazyKryoP: => KryoPool) extends Injection[Any, Array
@transient private var kpool: KryoPool = null

private def kryoP: KryoPool = mutex.synchronized {
if(null == kpool) { kpool = lazyKryoP }
if (null == kpool) { kpool = lazyKryoP }
kpool
}

Expand Down
Loading

0 comments on commit b0e1028

Please sign in to comment.