From 1ef455f90fcc528c9f02504a96c4e76d2d3d9080 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Thu, 25 May 2017 18:11:35 +0530 Subject: [PATCH 01/18] WIP - Aggregate Router support --- suuchi-core/pom.xml | 12 +++++++ suuchi-core/src/main/proto/suuchi.proto | 23 ++++++++++++- .../suuchi/router/AggregationRouter.scala | 34 +++++++++++++++++++ .../in/ashwanthkumar/suuchi/rpc/Server.scala | 8 +++++ .../suuchi/rpc/SuuchiAggregatorService.scala | 9 +++++ .../suuchi/example/DistributedKVServer.scala | 9 +++-- 6 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala create mode 100644 suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala diff --git a/suuchi-core/pom.xml b/suuchi-core/pom.xml index 4c8ef55..dcc7167 100644 --- a/suuchi-core/pom.xml +++ b/suuchi-core/pom.xml @@ -25,6 +25,18 @@ 2.8.2 + + com.twitter + algebird-core_${scala.lib.version} + 0.13.0 + + + + com.twitter + bijection-core_${scala.lib.version} + 0.9.5 + + commons-io commons-io diff --git a/suuchi-core/src/main/proto/suuchi.proto b/suuchi-core/src/main/proto/suuchi.proto index 36da119..dfb1c3e 100644 --- a/suuchi-core/src/main/proto/suuchi.proto +++ b/suuchi-core/src/main/proto/suuchi.proto @@ -72,4 +72,25 @@ message ScanResponse { message KV { bytes key = 1; bytes value = 2; -} \ No newline at end of file +} + +message ScatterRequest { + bytes input = 1; +} + +message ScatterResponse { + bytes output = 1; +} + +message GatherResponse { + bytes output = 1; +} + +service Aggregator { + rpc Reduce(ScatterRequest) returns (stream ScatterResponse); + rpc ReReduce(stream ScatterResponse) returns (stream GatherResponse); +} + +/* + Gather(gatherImpl) +*/ \ No newline at end of file diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala new file mode 100644 index 0000000..2804404 --- /dev/null +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala @@ -0,0 +1,34 @@ +package in.ashwanthkumar.suuchi.router + +import java.util +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.Futures +import com.google.protobuf.Message +import in.ashwanthkumar.suuchi.cluster.MemberAddress +import in.ashwanthkumar.suuchi.rpc.CachedChannelPool +import io.grpc._ +import io.grpc.stub.ClientCalls +import com.twitter.algebird.Aggregator + +class AggregationRouter(members: List[MemberAddress], self: MemberAddress, agg: (MethodDescriptor[Message, Message]) => Aggregator[Message, _, Message]) extends ServerInterceptor { me => + val channelPool = CachedChannelPool() + + override def interceptCall[ReqT, RespT](incomingRequest: ServerCall[ReqT, RespT], headers: Metadata, delegate: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = { + delegate.startCall(incomingRequest, headers) + } +} + +object AggregationRouter { + def broadcast[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], input: ReqT): util.List[RespT] = { + val scatterRequests = nodes.map(destination => { + val channel = channelPool.get(destination, insecure = true) + val clientCall = ClientInterceptors.interceptForward(channel) + .newCall(methodDescriptor, CallOptions.DEFAULT.withDeadlineAfter(10, TimeUnit.MINUTES)) // TODO (ashwanthkumar): Make this deadline configurable + ClientCalls.futureUnaryCall(clientCall, input) + }) + + Futures.allAsList(scatterRequests: _*).get() + } +} + diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala index 9365e7e..6fcedd6 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala @@ -2,6 +2,8 @@ package in.ashwanthkumar.suuchi.rpc import java.net.InetAddress +import com.google.protobuf.Message +import com.twitter.algebird.Aggregator import in.ashwanthkumar.suuchi.cluster.MemberAddress import in.ashwanthkumar.suuchi.router._ import io.grpc.{Server => GServer, _} @@ -122,6 +124,12 @@ class Server[T <: ServerBuilder[T]](serverBuilder: ServerBuilder[T], whoami: Mem this } + def aggregate(allNodes: List[MemberAddress], self: MemberAddress, service: BindableService, agg: (MethodDescriptor[Message, Message]) => Aggregator[Message, _, Message]) = { + val aggregator = new AggregationRouter(allNodes, self, agg) + serverBuilder.addService(ServerInterceptors.interceptForward(service, aggregator)) + this + } + /** * Add a service without any routing logic * @param service Service which is handled locally by the node and not forwarded diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala new file mode 100644 index 0000000..11d2bee --- /dev/null +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala @@ -0,0 +1,9 @@ +package in.ashwanthkumar.suuchi.rpc + +import in.ashwanthkumar.suuchi.rpc.generated.{AggregatorGrpc, SuuchiRPC} +import io.grpc.stub.StreamObserver + +class SuuchiAggregatorService extends AggregatorGrpc.AggregatorImplBase { + override def reduce(request: SuuchiRPC.ScatterRequest, responseObserver: StreamObserver[SuuchiRPC.ScatterResponse]): Unit = super.reduce(request, responseObserver) +} + diff --git a/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala b/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala index 59e3ea6..b6b220a 100644 --- a/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala +++ b/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala @@ -1,9 +1,12 @@ package in.ashwanthkumar.suuchi.example +import com.google.protobuf.Message +import com.twitter.algebird.Aggregator import in.ashwanthkumar.suuchi.router.ConsistentHashingRouting import in.ashwanthkumar.suuchi.rpc.Server.whoami -import in.ashwanthkumar.suuchi.rpc.{Server, SuuchiPutService, SuuchiReadService, SuuchiScanService} +import in.ashwanthkumar.suuchi.rpc._ import in.ashwanthkumar.suuchi.store.InMemoryStore +import io.grpc.MethodDescriptor import io.grpc.netty.NettyServerBuilder // Start the app with either / one each of 5051, 5052 or/and 5053 port numbers @@ -13,13 +16,15 @@ object DistributedKVServer extends App { val PARTITIONS_PER_NODE = 100 val REPLICATION_FACTOR = 2 - val routingStrategy = ConsistentHashingRouting(REPLICATION_FACTOR, PARTITIONS_PER_NODE, whoami(5051), whoami(5052), whoami(5053)) + val allNodes = List(whoami(5051), whoami(5052), whoami(5053)) + val routingStrategy = ConsistentHashingRouting(REPLICATION_FACTOR, PARTITIONS_PER_NODE, allNodes:_*) val store = new InMemoryStore val server = Server(NettyServerBuilder.forPort(port), whoami(port)) .routeUsing(new SuuchiReadService(store), routingStrategy) .withParallelReplication(new SuuchiPutService(store), REPLICATION_FACTOR, routingStrategy) .withService(new SuuchiScanService(store)) + .aggregate(allNodes, allNodes.head, new SuuchiAggregatorService(), Map.empty[MethodDescriptor[Message, Message], Aggregator[Message, _, Message]]) server.start() From 45cd2bdc2b43018fb6c8b3f5139e62f93f634478 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 10:42:36 +0530 Subject: [PATCH 02/18] Basic working version of scatter - gather --- suuchi-core/src/main/proto/suuchi.proto | 18 ++---- .../suuchi/router/AggregationRouter.scala | 57 ++++++++++++++++--- .../ashwanthkumar/suuchi/router/Headers.scala | 2 + .../suuchi/router/Marshallers.scala | 8 +++ .../in/ashwanthkumar/suuchi/rpc/Server.scala | 4 +- .../suuchi/rpc/SuuchiAggregatorService.scala | 17 +++++- .../suuchi/client/SuuchiClient.scala | 11 +++- .../suuchi/example/DistributedKVServer.scala | 5 +- 8 files changed, 91 insertions(+), 31 deletions(-) diff --git a/suuchi-core/src/main/proto/suuchi.proto b/suuchi-core/src/main/proto/suuchi.proto index dfb1c3e..b8eaa60 100644 --- a/suuchi-core/src/main/proto/suuchi.proto +++ b/suuchi-core/src/main/proto/suuchi.proto @@ -74,23 +74,13 @@ message KV { bytes value = 2; } -message ScatterRequest { - bytes input = 1; +message ReduceRequest { } -message ScatterResponse { - bytes output = 1; -} - -message GatherResponse { - bytes output = 1; +message ReduceResponse { + int64 output = 1; } service Aggregator { - rpc Reduce(ScatterRequest) returns (stream ScatterResponse); - rpc ReReduce(stream ScatterResponse) returns (stream GatherResponse); + rpc Reduce (ReduceRequest) returns (ReduceResponse); } - -/* - Gather(gatherImpl) -*/ \ No newline at end of file diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala index 2804404..595e661 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala @@ -4,26 +4,67 @@ import java.util import java.util.concurrent.TimeUnit import com.google.common.util.concurrent.Futures -import com.google.protobuf.Message +import com.twitter.algebird.Aggregator import in.ashwanthkumar.suuchi.cluster.MemberAddress import in.ashwanthkumar.suuchi.rpc.CachedChannelPool import io.grpc._ -import io.grpc.stub.ClientCalls -import com.twitter.algebird.Aggregator +import io.grpc.stub.{ClientCalls, MetadataUtils, StreamObserver, StreamObservers} +import scala.collection.JavaConverters._ + +trait Aggregation { + def aggregator[ReqT, RespT]: PartialFunction[MethodDescriptor[ReqT, RespT], Aggregator[RespT, Any, RespT]] +} -class AggregationRouter(members: List[MemberAddress], self: MemberAddress, agg: (MethodDescriptor[Message, Message]) => Aggregator[Message, _, Message]) extends ServerInterceptor { me => +class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends ServerInterceptor { val channelPool = CachedChannelPool() - override def interceptCall[ReqT, RespT](incomingRequest: ServerCall[ReqT, RespT], headers: Metadata, delegate: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = { - delegate.startCall(incomingRequest, headers) + override def interceptCall[ReqT, RespT](incomingRequest: ServerCall[ReqT, RespT], headers: Metadata, next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = { + val isBroadcastRequest = headers.containsKey(Headers.BROADCAST_REQUEST_KEY) + if (isBroadcastRequest || !agg.aggregator.isDefinedAt(incomingRequest.getMethodDescriptor)) { + next.startCall(incomingRequest, headers) + } else { + // ServerCall.Listener for ServerStreaming methods + headers.put(Headers.BROADCAST_REQUEST_KEY, true) + incomingRequest.request(2) + new ServerCall.Listener[ReqT] { + val aggregator = agg.aggregator.apply(incomingRequest.getMethodDescriptor) + var reduced: RespT = _ + + var request: ReqT = _ + + override def onCancel() = { + println("AggregationRouter#onCancel") + incomingRequest.close(Status.CANCELLED, headers) + } + override def onHalfClose() = { + println("AggregationRouter#onHalfClose") + val gathered = AggregationRouter.scatter(members, channelPool, incomingRequest.getMethodDescriptor, headers, request) + reduced = aggregator.apply(gathered.asScala) + incomingRequest.sendHeaders(headers) + incomingRequest.sendMessage(reduced) + incomingRequest.close(Status.OK, headers) + } + override def onReady() = { + println("AggregationRouter#onReady") + } + override def onMessage(message: ReqT) = { + println("AggregationRouter#onMessage") + // We don't do the aggregation here but on onHalfClose() + request = message + } + override def onComplete() = { + println("AggregationRouter#onComplete") + } + } + } } } object AggregationRouter { - def broadcast[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], input: ReqT): util.List[RespT] = { + def scatter[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], headers:Metadata, input: ReqT): util.List[RespT] = { val scatterRequests = nodes.map(destination => { val channel = channelPool.get(destination, insecure = true) - val clientCall = ClientInterceptors.interceptForward(channel) + val clientCall = ClientInterceptors.interceptForward(channel, MetadataUtils.newAttachHeadersInterceptor(headers)) .newCall(methodDescriptor, CallOptions.DEFAULT.withDeadlineAfter(10, TimeUnit.MINUTES)) // TODO (ashwanthkumar): Make this deadline configurable ClientCalls.futureUnaryCall(clientCall, input) }) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Headers.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Headers.scala index 3194477..300178d 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Headers.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Headers.scala @@ -5,7 +5,9 @@ import io.grpc.Metadata object Headers { val ELIGIBLE_NODES = "eligible_nodes" val REPLICATION_REQUEST = "replication_request" + val BROADCAST_REQUEST = "broadcast_request" val REPLICATION_REQUEST_KEY = Metadata.Key.of(Headers.REPLICATION_REQUEST, StringMarshaller) val ELIGIBLE_NODES_KEY = Metadata.Key.of(Headers.ELIGIBLE_NODES, MemberAddressMarshaller) + val BROADCAST_REQUEST_KEY = Metadata.Key.of(Headers.BROADCAST_REQUEST, BooleanMarshaller) } diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Marshallers.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Marshallers.scala index 773927f..b0c591a 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Marshallers.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/Marshallers.scala @@ -11,6 +11,14 @@ case object StringMarshaller extends AsciiMarshaller[String] { override def parseAsciiString(serialized: String): String = serialized } +/** + * Send a boolean value using AsciiMarshaller + */ +case object BooleanMarshaller extends AsciiMarshaller[Boolean] { + override def toAsciiString(value: Boolean): String = value.toString + override def parseAsciiString(serialized: String): Boolean = serialized.toBoolean +} + /** * Converts a collection of [[MemberAddress]] to it's external form separated by `|` */ diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala index 6fcedd6..90724cd 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala @@ -124,8 +124,8 @@ class Server[T <: ServerBuilder[T]](serverBuilder: ServerBuilder[T], whoami: Mem this } - def aggregate(allNodes: List[MemberAddress], self: MemberAddress, service: BindableService, agg: (MethodDescriptor[Message, Message]) => Aggregator[Message, _, Message]) = { - val aggregator = new AggregationRouter(allNodes, self, agg) + def aggregate(allNodes: List[MemberAddress], service: BindableService, agg: Aggregation) = { + val aggregator = new AggregationRouter(allNodes, agg) serverBuilder.addService(ServerInterceptors.interceptForward(service, aggregator)) this } diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala index 11d2bee..400269f 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/SuuchiAggregatorService.scala @@ -1,9 +1,24 @@ package in.ashwanthkumar.suuchi.rpc +import com.twitter.algebird.{Aggregator, LongRing, Semigroup} +import in.ashwanthkumar.suuchi.router.Aggregation +import in.ashwanthkumar.suuchi.rpc.generated.SuuchiRPC.ReduceResponse import in.ashwanthkumar.suuchi.rpc.generated.{AggregatorGrpc, SuuchiRPC} import io.grpc.stub.StreamObserver class SuuchiAggregatorService extends AggregatorGrpc.AggregatorImplBase { - override def reduce(request: SuuchiRPC.ScatterRequest, responseObserver: StreamObserver[SuuchiRPC.ScatterResponse]): Unit = super.reduce(request, responseObserver) + override def reduce(request: SuuchiRPC.ReduceRequest, responseObserver: StreamObserver[SuuchiRPC.ReduceResponse]) = { + responseObserver.onNext(ReduceResponse.newBuilder().setOutput(1).build()) + responseObserver.onCompleted() + } } +class SumOfNumbers extends Aggregation { + override def aggregator[ReqT, RespT] = { + case AggregatorGrpc.METHOD_REDUCE => new Aggregator[ReduceResponse, Long, ReduceResponse] { + override def prepare(input: ReduceResponse) = input.getOutput + override def semigroup: Semigroup[Long] = LongRing + override def present(reduced: Long) = ReduceResponse.newBuilder().setOutput(reduced).build() + }.asInstanceOf[Aggregator[RespT, Any, RespT]] + } +} \ No newline at end of file diff --git a/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/client/SuuchiClient.scala b/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/client/SuuchiClient.scala index 0ebf926..42890e6 100644 --- a/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/client/SuuchiClient.scala +++ b/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/client/SuuchiClient.scala @@ -3,8 +3,8 @@ package in.ashwanthkumar.suuchi.client import java.util.concurrent.TimeUnit import com.google.protobuf.ByteString -import in.ashwanthkumar.suuchi.rpc.generated.SuuchiRPC.{GetRequest, PutRequest, ScanRequest} -import in.ashwanthkumar.suuchi.rpc.generated.{SuuchiPutGrpc, SuuchiReadGrpc, SuuchiScanGrpc} +import in.ashwanthkumar.suuchi.rpc.generated.SuuchiRPC.{GetRequest, PutRequest, ReduceRequest, ScanRequest} +import in.ashwanthkumar.suuchi.rpc.generated.{AggregatorGrpc, SuuchiPutGrpc, SuuchiReadGrpc, SuuchiScanGrpc} import io.grpc.netty.NettyChannelBuilder import org.slf4j.LoggerFactory @@ -20,6 +20,7 @@ class SuuchiClient(host: String, port: Int) { private val writeStub = SuuchiPutGrpc.newBlockingStub(channel) private val readStub = SuuchiReadGrpc.newBlockingStub(channel) private val scanStub = SuuchiScanGrpc.newBlockingStub(channel) + private val aggStub = AggregatorGrpc.newBlockingStub(channel) def shutdown() = { channel.awaitTermination(5, TimeUnit.SECONDS) @@ -52,6 +53,10 @@ class SuuchiClient(host: String, port: Int) { def scan() = { scanStub.scan(ScanRequest.newBuilder().setStart(Int.MinValue).setEnd(Int.MaxValue).build()) } + + def sumOfNumbers() = { + aggStub.reduce(ReduceRequest.newBuilder().build()) + } } object SuuchiClient extends App { @@ -78,5 +83,7 @@ object SuuchiClient extends App { println(new String(response.getKv.getKey.toByteArray)) } + println(client.sumOfNumbers) + client.shutdown() } diff --git a/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala b/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala index b6b220a..57991d8 100644 --- a/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala +++ b/suuchi-examples/src/main/scala/in/ashwanthkumar/suuchi/example/DistributedKVServer.scala @@ -1,12 +1,9 @@ package in.ashwanthkumar.suuchi.example -import com.google.protobuf.Message -import com.twitter.algebird.Aggregator import in.ashwanthkumar.suuchi.router.ConsistentHashingRouting import in.ashwanthkumar.suuchi.rpc.Server.whoami import in.ashwanthkumar.suuchi.rpc._ import in.ashwanthkumar.suuchi.store.InMemoryStore -import io.grpc.MethodDescriptor import io.grpc.netty.NettyServerBuilder // Start the app with either / one each of 5051, 5052 or/and 5053 port numbers @@ -24,7 +21,7 @@ object DistributedKVServer extends App { .routeUsing(new SuuchiReadService(store), routingStrategy) .withParallelReplication(new SuuchiPutService(store), REPLICATION_FACTOR, routingStrategy) .withService(new SuuchiScanService(store)) - .aggregate(allNodes, allNodes.head, new SuuchiAggregatorService(), Map.empty[MethodDescriptor[Message, Message], Aggregator[Message, _, Message]]) + .aggregate(allNodes, new SuuchiAggregatorService(), new SumOfNumbers) server.start() From ede076c32f696df65c1af03a3ff3fdeb5fbb97ad Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 10:48:39 +0530 Subject: [PATCH 03/18] Adding travis settings for publishing snapshots to sonatype --- .travis.yml | 12 +++++------- bin/hooks_publish_docs.sh | 7 +++++++ travis-settings.xml | 13 +++++++++++++ 3 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 bin/hooks_publish_docs.sh create mode 100644 travis-settings.xml diff --git a/.travis.yml b/.travis.yml index 7deace6..1380ccd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,16 +3,14 @@ jdk: - oraclejdk8 env: global: - secure: MmyMAHnhCNXJIclWXIZIIb2ZpJZknvaqZ2HUpxf+LIsr0dSwg3OhULc6hOEsktusio4rf7/mK5FihOEz4Lb4FFAzTuTn23YyPjjNY4BHIoqF+v5sgy0z8EGUl+DymoJExQ02CRwy6gyIk0MUYbcuZMbUEg5S8v/JvLL1hgEhv+8eAN3dGrMoog8NsDuYlhzZlFXXm8DN83PZ/9boagWq+WGHajNCUC7Ra7y4TfhRUKEDZfoln39xeazQwdUFq9iZ0urGM+QhZxL6wf2G68DAHfTmor/ahVZlOMcbmhakVqxVYz17f0ovKlvD2ybBDzaqae5j/AiTkEmHTEvXzXeSJT0VHE0/JuSHH+r+1tJxnrT+zg6s9YSOUDaZRkIWmLFe/jWV3X8AZ9MwETZDQptyTgxHqIYM8HBQA8j3CkjzgLMFkZfqwVgJRc2QjUwKKttmLEM7mJlxuyNoqD5fNHhFJCJBnJtJQAV3J6CEse4a1qOQRJoFBGIp8u5l/ej9Bwdf+WVp+mXt3uQOfr6L6PgYE+Ll9gy/gAPwn8IgllYN9q/ypGc9BO2fe77uWfDgQsTjwNfNnA0wKf3YYYunNnMN58oFBjukpZZKiyUOjgEdrrcCX/I6zMRDaA2YrW4s+JG1Idr2przEouPTbNjanw0hqQzz96Gslsv6FNhRYJaJicw= + - secure: MmyMAHnhCNXJIclWXIZIIb2ZpJZknvaqZ2HUpxf+LIsr0dSwg3OhULc6hOEsktusio4rf7/mK5FihOEz4Lb4FFAzTuTn23YyPjjNY4BHIoqF+v5sgy0z8EGUl+DymoJExQ02CRwy6gyIk0MUYbcuZMbUEg5S8v/JvLL1hgEhv+8eAN3dGrMoog8NsDuYlhzZlFXXm8DN83PZ/9boagWq+WGHajNCUC7Ra7y4TfhRUKEDZfoln39xeazQwdUFq9iZ0urGM+QhZxL6wf2G68DAHfTmor/ahVZlOMcbmhakVqxVYz17f0ovKlvD2ybBDzaqae5j/AiTkEmHTEvXzXeSJT0VHE0/JuSHH+r+1tJxnrT+zg6s9YSOUDaZRkIWmLFe/jWV3X8AZ9MwETZDQptyTgxHqIYM8HBQA8j3CkjzgLMFkZfqwVgJRc2QjUwKKttmLEM7mJlxuyNoqD5fNHhFJCJBnJtJQAV3J6CEse4a1qOQRJoFBGIp8u5l/ej9Bwdf+WVp+mXt3uQOfr6L6PgYE+Ll9gy/gAPwn8IgllYN9q/ypGc9BO2fe77uWfDgQsTjwNfNnA0wKf3YYYunNnMN58oFBjukpZZKiyUOjgEdrrcCX/I6zMRDaA2YrW4s+JG1Idr2przEouPTbNjanw0hqQzz96Gslsv6FNhRYJaJicw= + - secure: omyCDeP61VvjEdf5fncihLSHt8cvyIg8vfyaqlYG8+S0izAB5gnJoS/492r/hNxUDIt6LXH4wX+mILAUrn0d22+UtgCkweP2SnZ0N8DuHbwqet1sA5qwN9hUnUCdcZEyBCIfV0ShDPyZflxyumJ0Z4iEeL/LqIiTVx0t/ivYP5trVMuasVf9iQFCHhywcbTIphYxOiV3Rz0StyQzElg3tljleq+Me+7nM01H/i7lsQtKBAggW0Maul54cEBWYzj8ivppgdaTzJjiObU3WJ7/vR1Jnnt6SHjiv0h9F97Fcm+PuOH+Np81B1j4HG4hSBDDAX8vTMlPLnAqlsUUD4CAmf5w8zKOZWuGtNr/Gcd/noYugsc5m8cTCVgniCAfD2x5dXzfKTDgXLLIBWGRCUZeXh9PaA6O9vyGB7ICcoCXkmFx+kM2dJ9rUaAFuv4J18XuNWEOLbmQUZnuBeGa01+Z0Wq0sg9QUCC8yfvt/qc8qfKZPiteMQTy1TCWpGlMDoHNADpI/xzaNSNZhzD9pfmC128PuH5xYx81sA9h+zQrifya/h/mAIc+/whbzC7vJ+Pny5zavSuVGsZ6KrTlR41qg/D1GO7SgTu49h46ZYmWck3tjIEoiXjVcEycW5TN6STDP1FmVGKxhAi0xg+vXRQa7y1w0+LYgrgyz+6T39afhgY= + - secure: rUkD/vXLmdUB/hDBkregZz4YmJoG7Ro91xJbADumf4Lzfm8wO4bK1mpbsGqBDOXvp/the76MPOdu6halDw8njktfXcrwUCPQaXtn8A7+bQW1Kc1L/SpKs5G+ycGVKF8AAYUMMBRs6RX1QHQTwxSOVkOTYD6me9rYZTX6PdcW9fCw7EgIzBc8X2I8tQckCj5DIoLECxUo2bwrkL7KKkNmFPLK7Cm6jddNASqzqZo3pybeGTFuGbAsXN11b2CfINl2srjJ/gH3DDXKFlcAKpPuH7YW+3YIA/15S6xoimUFnLaUILFpNTTwxGmQwZ/h235K6kh0kWHdPdeUlGzl0yN8KgSwS1DFKLfe+VrKSD0441XyW/snwtrlC9jl/jX2A5uxaalVDD27MZj3Z8kpdSHtuaYIZ86d7yNRWqOJMcwkI7K9W4JiocBGZ373EttxVM7dywQuPPu/bwyEI2fq44/JUQD+3AJ2Exgoh1NFrHT+H+NCDXY+x7W8S+lD9adlrmvHZfNZ1I401V8u9mEApw0XWaQCIvKfqXogk4UDtzxAvJKv0J7A1k70xH59OtnU5kHWF5j1805hkVhp/1+AC6UNVaiNLAoRNSH5PQFRtfJz4WyfMmonR+ULtBsFTdw6ON1gxbHAHiAY0WQmcBwUsF3gOn8GZrXKGxizKb6QutH64wI= script: - mvn clean install cache: directories: - $HOME/.m2 after_success: -- git config user.name "Ashwanth Kumar" -- git config user.email "ashwanthkumar@googlemail.com" -- git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git"; -- git fetch gh-token && git fetch gh-token gh-pages:gh-pages; -- sudo pip install mkdocs==0.15.3 -- mkdocs gh-deploy -v --clean --remote-name gh-token; +- bash bin/hooks_publish_docs.sh +- mvn deploy --settings travis-settings.xml -DskipTests=true -B diff --git a/bin/hooks_publish_docs.sh b/bin/hooks_publish_docs.sh new file mode 100644 index 0000000..e958d97 --- /dev/null +++ b/bin/hooks_publish_docs.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +- git config user.name "Ashwanth Kumar" +- git config user.email "ashwanthkumar@googlemail.com" +- git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git"; +- git fetch gh-token && git fetch gh-token gh-pages:gh-pages; +- sudo pip install mkdocs==0.15.3 +- mkdocs gh-deploy -v --clean --remote-name gh-token; diff --git a/travis-settings.xml b/travis-settings.xml new file mode 100644 index 0000000..09b4a3b --- /dev/null +++ b/travis-settings.xml @@ -0,0 +1,13 @@ + + + + + ossrh + ${env.SONATYPE_USERNAME} + ${env.SONATYPE_PASSWORD} + + From d7a759351fe71b366bcdf9a75988ceb8cf9333d4 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 10:53:42 +0530 Subject: [PATCH 04/18] Not masking the username since it's easy to guess from the logs anyways --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 1380ccd..10e2415 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,8 +4,8 @@ jdk: env: global: - secure: MmyMAHnhCNXJIclWXIZIIb2ZpJZknvaqZ2HUpxf+LIsr0dSwg3OhULc6hOEsktusio4rf7/mK5FihOEz4Lb4FFAzTuTn23YyPjjNY4BHIoqF+v5sgy0z8EGUl+DymoJExQ02CRwy6gyIk0MUYbcuZMbUEg5S8v/JvLL1hgEhv+8eAN3dGrMoog8NsDuYlhzZlFXXm8DN83PZ/9boagWq+WGHajNCUC7Ra7y4TfhRUKEDZfoln39xeazQwdUFq9iZ0urGM+QhZxL6wf2G68DAHfTmor/ahVZlOMcbmhakVqxVYz17f0ovKlvD2ybBDzaqae5j/AiTkEmHTEvXzXeSJT0VHE0/JuSHH+r+1tJxnrT+zg6s9YSOUDaZRkIWmLFe/jWV3X8AZ9MwETZDQptyTgxHqIYM8HBQA8j3CkjzgLMFkZfqwVgJRc2QjUwKKttmLEM7mJlxuyNoqD5fNHhFJCJBnJtJQAV3J6CEse4a1qOQRJoFBGIp8u5l/ej9Bwdf+WVp+mXt3uQOfr6L6PgYE+Ll9gy/gAPwn8IgllYN9q/ypGc9BO2fe77uWfDgQsTjwNfNnA0wKf3YYYunNnMN58oFBjukpZZKiyUOjgEdrrcCX/I6zMRDaA2YrW4s+JG1Idr2przEouPTbNjanw0hqQzz96Gslsv6FNhRYJaJicw= - - secure: omyCDeP61VvjEdf5fncihLSHt8cvyIg8vfyaqlYG8+S0izAB5gnJoS/492r/hNxUDIt6LXH4wX+mILAUrn0d22+UtgCkweP2SnZ0N8DuHbwqet1sA5qwN9hUnUCdcZEyBCIfV0ShDPyZflxyumJ0Z4iEeL/LqIiTVx0t/ivYP5trVMuasVf9iQFCHhywcbTIphYxOiV3Rz0StyQzElg3tljleq+Me+7nM01H/i7lsQtKBAggW0Maul54cEBWYzj8ivppgdaTzJjiObU3WJ7/vR1Jnnt6SHjiv0h9F97Fcm+PuOH+Np81B1j4HG4hSBDDAX8vTMlPLnAqlsUUD4CAmf5w8zKOZWuGtNr/Gcd/noYugsc5m8cTCVgniCAfD2x5dXzfKTDgXLLIBWGRCUZeXh9PaA6O9vyGB7ICcoCXkmFx+kM2dJ9rUaAFuv4J18XuNWEOLbmQUZnuBeGa01+Z0Wq0sg9QUCC8yfvt/qc8qfKZPiteMQTy1TCWpGlMDoHNADpI/xzaNSNZhzD9pfmC128PuH5xYx81sA9h+zQrifya/h/mAIc+/whbzC7vJ+Pny5zavSuVGsZ6KrTlR41qg/D1GO7SgTu49h46ZYmWck3tjIEoiXjVcEycW5TN6STDP1FmVGKxhAi0xg+vXRQa7y1w0+LYgrgyz+6T39afhgY= - secure: rUkD/vXLmdUB/hDBkregZz4YmJoG7Ro91xJbADumf4Lzfm8wO4bK1mpbsGqBDOXvp/the76MPOdu6halDw8njktfXcrwUCPQaXtn8A7+bQW1Kc1L/SpKs5G+ycGVKF8AAYUMMBRs6RX1QHQTwxSOVkOTYD6me9rYZTX6PdcW9fCw7EgIzBc8X2I8tQckCj5DIoLECxUo2bwrkL7KKkNmFPLK7Cm6jddNASqzqZo3pybeGTFuGbAsXN11b2CfINl2srjJ/gH3DDXKFlcAKpPuH7YW+3YIA/15S6xoimUFnLaUILFpNTTwxGmQwZ/h235K6kh0kWHdPdeUlGzl0yN8KgSwS1DFKLfe+VrKSD0441XyW/snwtrlC9jl/jX2A5uxaalVDD27MZj3Z8kpdSHtuaYIZ86d7yNRWqOJMcwkI7K9W4JiocBGZ373EttxVM7dywQuPPu/bwyEI2fq44/JUQD+3AJ2Exgoh1NFrHT+H+NCDXY+x7W8S+lD9adlrmvHZfNZ1I401V8u9mEApw0XWaQCIvKfqXogk4UDtzxAvJKv0J7A1k70xH59OtnU5kHWF5j1805hkVhp/1+AC6UNVaiNLAoRNSH5PQFRtfJz4WyfMmonR+ULtBsFTdw6ON1gxbHAHiAY0WQmcBwUsF3gOn8GZrXKGxizKb6QutH64wI= + - SONATYPE_USERNAME: ashwanthkumar script: - mvn clean install cache: From 4575e8b3449d2852d555439a71b9037f2a019a54 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 10:55:21 +0530 Subject: [PATCH 05/18] Fixing the doc publish hook --- bin/hooks_publish_docs.sh | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/bin/hooks_publish_docs.sh b/bin/hooks_publish_docs.sh index e958d97..53bdb9c 100644 --- a/bin/hooks_publish_docs.sh +++ b/bin/hooks_publish_docs.sh @@ -1,7 +1,8 @@ #!/usr/bin/env bash -- git config user.name "Ashwanth Kumar" -- git config user.email "ashwanthkumar@googlemail.com" -- git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git"; -- git fetch gh-token && git fetch gh-token gh-pages:gh-pages; -- sudo pip install mkdocs==0.15.3 -- mkdocs gh-deploy -v --clean --remote-name gh-token; + +git config user.name "Ashwanth Kumar" +git config user.email "ashwanthkumar@googlemail.com" +git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git"; +git fetch gh-token && git fetch gh-token gh-pages:gh-pages; +sudo pip install mkdocs==0.15.3 +mkdocs gh-deploy -v --clean --remote-name gh-token; From 69eee55acc273aaf75253dcd3594d62d1f4ab0d0 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 10:58:28 +0530 Subject: [PATCH 06/18] Fixing the maven settings scheme --- travis-settings.xml | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/travis-settings.xml b/travis-settings.xml index 09b4a3b..99c7bcf 100644 --- a/travis-settings.xml +++ b/travis-settings.xml @@ -1,13 +1,18 @@ - - - - - ossrh - ${env.SONATYPE_USERNAME} - ${env.SONATYPE_PASSWORD} - - + We used this to publish our artifacts to sonatype snapshots after every build! + --> + + + + ossrh + ${env.SONATYPE_USERNAME} + ${env.SONATYPE_PASSWORD} + + + From 7d229db3a8cef9a6e2d6aa2bb53482800cb579d2 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 11:12:05 +0530 Subject: [PATCH 07/18] Replacing println -> log.debug --- .../suuchi/router/AggregationRouter.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala index 595e661..b605286 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala @@ -9,6 +9,8 @@ import in.ashwanthkumar.suuchi.cluster.MemberAddress import in.ashwanthkumar.suuchi.rpc.CachedChannelPool import io.grpc._ import io.grpc.stub.{ClientCalls, MetadataUtils, StreamObserver, StreamObservers} +import org.slf4j.LoggerFactory + import scala.collection.JavaConverters._ trait Aggregation { @@ -17,6 +19,7 @@ trait Aggregation { class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends ServerInterceptor { val channelPool = CachedChannelPool() + val log = LoggerFactory.getLogger(classOf[AggregationRouter]) override def interceptCall[ReqT, RespT](incomingRequest: ServerCall[ReqT, RespT], headers: Metadata, next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = { val isBroadcastRequest = headers.containsKey(Headers.BROADCAST_REQUEST_KEY) @@ -33,11 +36,11 @@ class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends var request: ReqT = _ override def onCancel() = { - println("AggregationRouter#onCancel") + log.debug("AggregationRouter#onCancel") incomingRequest.close(Status.CANCELLED, headers) } override def onHalfClose() = { - println("AggregationRouter#onHalfClose") + log.debug("AggregationRouter#onHalfClose") val gathered = AggregationRouter.scatter(members, channelPool, incomingRequest.getMethodDescriptor, headers, request) reduced = aggregator.apply(gathered.asScala) incomingRequest.sendHeaders(headers) @@ -45,15 +48,15 @@ class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends incomingRequest.close(Status.OK, headers) } override def onReady() = { - println("AggregationRouter#onReady") + log.debug("AggregationRouter#onReady") } override def onMessage(message: ReqT) = { - println("AggregationRouter#onMessage") + log.debug("AggregationRouter#onMessage") // We don't do the aggregation here but on onHalfClose() request = message } override def onComplete() = { - println("AggregationRouter#onComplete") + log.debug("AggregationRouter#onComplete") } } } From 256b436adc54b6293b74fd9d00f7d024482e53d0 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 13:47:02 +0530 Subject: [PATCH 08/18] Removing 'reduced' state from aggregate listener --- .../in/ashwanthkumar/suuchi/router/AggregationRouter.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala index b605286..cbb89ef 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala @@ -31,8 +31,6 @@ class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends incomingRequest.request(2) new ServerCall.Listener[ReqT] { val aggregator = agg.aggregator.apply(incomingRequest.getMethodDescriptor) - var reduced: RespT = _ - var request: ReqT = _ override def onCancel() = { @@ -42,7 +40,7 @@ class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends override def onHalfClose() = { log.debug("AggregationRouter#onHalfClose") val gathered = AggregationRouter.scatter(members, channelPool, incomingRequest.getMethodDescriptor, headers, request) - reduced = aggregator.apply(gathered.asScala) + val reduced = aggregator.apply(gathered.asScala) incomingRequest.sendHeaders(headers) incomingRequest.sendMessage(reduced) incomingRequest.close(Status.OK, headers) From 9293eba50c1a7486797a0f11520228da4a97ee48 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 16:12:27 +0530 Subject: [PATCH 09/18] Adding Server#aggregate to take in ServerServiceDefinition overload as well --- .../src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala index 90724cd..9436fb0 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala @@ -125,6 +125,10 @@ class Server[T <: ServerBuilder[T]](serverBuilder: ServerBuilder[T], whoami: Mem } def aggregate(allNodes: List[MemberAddress], service: BindableService, agg: Aggregation) = { + aggregate(allNodes, service.bindService(), agg) + } + + def aggregate(allNodes: List[MemberAddress], service: ServerServiceDefinition, agg: Aggregation) = { val aggregator = new AggregationRouter(allNodes, agg) serverBuilder.addService(ServerInterceptors.interceptForward(service, aggregator)) this From 45cf7df5dd32f35d6407eb7f0cb6021ab6303838 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 26 May 2017 16:17:20 +0530 Subject: [PATCH 10/18] Specifying the return type for aggregate --- .../src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala index 9436fb0..f17287e 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/rpc/Server.scala @@ -124,11 +124,11 @@ class Server[T <: ServerBuilder[T]](serverBuilder: ServerBuilder[T], whoami: Mem this } - def aggregate(allNodes: List[MemberAddress], service: BindableService, agg: Aggregation) = { + def aggregate(allNodes: List[MemberAddress], service: BindableService, agg: Aggregation): Server[T] = { aggregate(allNodes, service.bindService(), agg) } - def aggregate(allNodes: List[MemberAddress], service: ServerServiceDefinition, agg: Aggregation) = { + def aggregate(allNodes: List[MemberAddress], service: ServerServiceDefinition, agg: Aggregation): Server[T] = { val aggregator = new AggregationRouter(allNodes, agg) serverBuilder.addService(ServerInterceptors.interceptForward(service, aggregator)) this From de664b611083e432f36f76a842b66a1a00628b25 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 14:43:02 +0530 Subject: [PATCH 11/18] WIP - Tests fr AggregationRouter Simple test cases to cover to non-scatter block --- suuchi-core/pom.xml | 2 + suuchi-core/src/test/proto/test.proto | 24 +++++++ .../suuchi/router/AggregationRouterSpec.scala | 72 +++++++++++++++++++ 3 files changed, 98 insertions(+) create mode 100644 suuchi-core/src/test/proto/test.proto create mode 100644 suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala diff --git a/suuchi-core/pom.xml b/suuchi-core/pom.xml index 0166109..55ce0b6 100644 --- a/suuchi-core/pom.xml +++ b/suuchi-core/pom.xml @@ -142,6 +142,8 @@ compile compile-custom + test-compile + test-compile-custom diff --git a/suuchi-core/src/test/proto/test.proto b/suuchi-core/src/test/proto/test.proto new file mode 100644 index 0000000..6d35d58 --- /dev/null +++ b/suuchi-core/src/test/proto/test.proto @@ -0,0 +1,24 @@ +syntax = "proto3"; + +option java_package = "in.ashwanthkumar.suuchi.core.tests"; +option java_outer_classname = "SuuchiTestRPC"; + +message ReduceRequest { +} + +message ReduceResponse { + int64 output = 1; +} + +service Aggregator { + rpc Reduce (ReduceRequest) returns (ReduceResponse); +} + +message FooRequest { +} +message FooResponse { +} + +service Random { + rpc Foo (FooRequest) returns (FooResponse); +} diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala new file mode 100644 index 0000000..2552338 --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala @@ -0,0 +1,72 @@ +package in.ashwanthkumar.suuchi.router + +import com.twitter.algebird.{Aggregator, MonoidAggregator} +import in.ashwanthkumar.suuchi.cluster.MemberAddress +import in.ashwanthkumar.suuchi.core.tests.SuuchiTestRPC.{FooRequest, FooResponse} +import in.ashwanthkumar.suuchi.core.tests.{RandomGrpc, SuuchiTestRPC} +import io.grpc.ServerCall.Listener +import io.grpc.{Metadata, MethodDescriptor, ServerCall, ServerCallHandler} +import io.grpc.testing.TestMethodDescriptors +import org.mockito.Matchers.{any, eq => mockEq} +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.FlatSpec +import org.scalatest.Matchers.{be, convertToAnyShouldWrapper} + +class AggregationRouterSpec extends FlatSpec { + class RandomAggregation extends Aggregation { + override def aggregator[A, B] + : PartialFunction[MethodDescriptor[A, B], Aggregator[B, Any, B]] = { + case RandomGrpc.METHOD_FOO => Aggregator.const(1l).asInstanceOf[Aggregator[B, Any, B]] + } + } + + "AggregationRouter" should "not do if the aggregation is not defined for the method" in { + val router = + new AggregationRouter(List(MemberAddress("host:1")), new RandomAggregation) + val serverCall = mock(classOf[ServerCall[Int, Int]]) + val serverMethodDesc = TestMethodDescriptors.noopMethod[Int, Int]() + when(serverCall.getMethodDescriptor).thenReturn(serverMethodDesc) + + val headers = new Metadata + val delegate = mock(classOf[Listener[Int]]) + val next = mock(classOf[ServerCallHandler[Int, Int]]) + when(next.startCall(any(classOf[ServerCall[Int, Int]]), mockEq(headers))) + .thenReturn(delegate) + + val listener = router.interceptCall(serverCall, headers, next) + listener.onReady() + listener.onMessage(1) + listener.onHalfClose() + listener.onComplete() + listener.onCancel() + + verify(next, times(1)).startCall(mockEq(serverCall), mockEq(headers)) + } + + it should "not scatter requests if it already has BROADCAST header" in { + val router = + new AggregationRouter(List(MemberAddress("host:1")), new RandomAggregation) + + val serverCall = mock(classOf[ServerCall[FooRequest, FooResponse]]) + val serverMethodDesc = TestMethodDescriptors.noopMethod[FooRequest, FooResponse]() + when(serverCall.getMethodDescriptor).thenReturn(serverMethodDesc) + + val headers = new Metadata() + headers.put(Headers.BROADCAST_REQUEST_KEY, true) + + val delegate = mock(classOf[Listener[FooRequest]]) + val next = mock(classOf[ServerCallHandler[FooRequest, FooResponse]]) + when(next.startCall(any(classOf[ServerCall[FooRequest, FooResponse]]), mockEq(headers))) + .thenReturn(delegate) + + val listener = router.interceptCall(serverCall, headers, next) + listener.onReady() + listener.onMessage(FooRequest.newBuilder().build()) + listener.onHalfClose() + listener.onComplete() + listener.onCancel() + + verify(next, times(1)).startCall(mockEq(serverCall), mockEq(headers)) + } + +} From be981871b4fbc2485aac49512d52f4fa423661d4 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 15:02:26 +0530 Subject: [PATCH 12/18] Doing mkdocs only for master branch --- bin/hooks_publish_docs.sh | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/bin/hooks_publish_docs.sh b/bin/hooks_publish_docs.sh index 53bdb9c..3b1a419 100644 --- a/bin/hooks_publish_docs.sh +++ b/bin/hooks_publish_docs.sh @@ -4,5 +4,10 @@ git config user.name "Ashwanth Kumar" git config user.email "ashwanthkumar@googlemail.com" git remote add gh-token "https://${GH_TOKEN}@github.com/ashwanthkumar/suuchi.git"; git fetch gh-token && git fetch gh-token gh-pages:gh-pages; -sudo pip install mkdocs==0.15.3 -mkdocs gh-deploy -v --clean --remote-name gh-token; + +# Deploy Docs only for builds out of master and not PRs or tags. +if ([ "$TRAVIS_BRANCH" == "master" ] || [ ! -z "$TRAVIS_TAG" ]) && + [ "$TRAVIS_PULL_REQUEST" == "false" ]; then + sudo pip install mkdocs==0.15.3 + mkdocs gh-deploy -v --clean --remote-name gh-token; +fi From 095c1c5a021d867dddb9d0e9113e0f328159f81e Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 15:03:07 +0530 Subject: [PATCH 13/18] Adding sudo:required in .travis.yml --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 2831526..85ccaf8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ language: java +sudo: required jdk: - oraclejdk8 env: From 2015c6c1998ea51f796abb18358a9099028ef9b0 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 15:07:47 +0530 Subject: [PATCH 14/18] Using virtualenv in container based infra for travis builds --- .travis.yml | 3 ++- bin/hooks_publish_docs.sh | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 85ccaf8..3db33b3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,5 @@ language: java -sudo: required +sudo: false jdk: - oraclejdk8 env: @@ -10,6 +10,7 @@ env: cache: directories: - $HOME/.m2 + - $HOME/DENV after_success: - bash bin/hooks_publish_docs.sh - bash <(curl -s https://codecov.io/bash) diff --git a/bin/hooks_publish_docs.sh b/bin/hooks_publish_docs.sh index 3b1a419..19054ab 100644 --- a/bin/hooks_publish_docs.sh +++ b/bin/hooks_publish_docs.sh @@ -8,6 +8,8 @@ git fetch gh-token && git fetch gh-token gh-pages:gh-pages; # Deploy Docs only for builds out of master and not PRs or tags. if ([ "$TRAVIS_BRANCH" == "master" ] || [ ! -z "$TRAVIS_TAG" ]) && [ "$TRAVIS_PULL_REQUEST" == "false" ]; then - sudo pip install mkdocs==0.15.3 + virtualenv --system-site-packages ${HOME}/DENV + source ${HOME}/DENV/bin/activate + pip install mkdocs==0.15.3 mkdocs gh-deploy -v --clean --remote-name gh-token; fi From 23da476231535432fa740e7b339ac0ebd5e586e2 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 20:43:21 +0530 Subject: [PATCH 15/18] Catching exceptions and throwing INTERNAL server error during aggregate Adding more test cases for AggregationRouter --- .../suuchi/router/AggregationRouter.scala | 21 +++- .../suuchi/router/AggregationRouterSpec.scala | 106 +++++++++++++++++- 2 files changed, 118 insertions(+), 9 deletions(-) diff --git a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala index cbb89ef..0001665 100644 --- a/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala +++ b/suuchi-core/src/main/scala/in/ashwanthkumar/suuchi/router/AggregationRouter.scala @@ -39,17 +39,22 @@ class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends } override def onHalfClose() = { log.debug("AggregationRouter#onHalfClose") - val gathered = AggregationRouter.scatter(members, channelPool, incomingRequest.getMethodDescriptor, headers, request) - val reduced = aggregator.apply(gathered.asScala) - incomingRequest.sendHeaders(headers) - incomingRequest.sendMessage(reduced) - incomingRequest.close(Status.OK, headers) + try { + val gathered = scatter(members, channelPool, incomingRequest.getMethodDescriptor, headers, request) + val reduced = aggregator.apply(gathered.asScala) + incomingRequest.sendHeaders(headers) + incomingRequest.sendMessage(reduced) + incomingRequest.close(Status.OK, headers) + } catch { + case e: Throwable => + log.error(e.getMessage, e) + incomingRequest.close(Status.INTERNAL.withCause(e), headers) + } } override def onReady() = { log.debug("AggregationRouter#onReady") } override def onMessage(message: ReqT) = { - log.debug("AggregationRouter#onMessage") // We don't do the aggregation here but on onHalfClose() request = message } @@ -59,6 +64,10 @@ class AggregationRouter(members: List[MemberAddress], agg: Aggregation) extends } } } + + protected def scatter[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], headers: Metadata, input: ReqT): util.List[RespT] = { + AggregationRouter.scatter(nodes, channelPool, methodDescriptor, headers, input) + } } object AggregationRouter { diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala index 2552338..3c57d62 100644 --- a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala @@ -5,18 +5,24 @@ import in.ashwanthkumar.suuchi.cluster.MemberAddress import in.ashwanthkumar.suuchi.core.tests.SuuchiTestRPC.{FooRequest, FooResponse} import in.ashwanthkumar.suuchi.core.tests.{RandomGrpc, SuuchiTestRPC} import io.grpc.ServerCall.Listener -import io.grpc.{Metadata, MethodDescriptor, ServerCall, ServerCallHandler} +import io.grpc._ import io.grpc.testing.TestMethodDescriptors import org.mockito.Matchers.{any, eq => mockEq} import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest.FlatSpec import org.scalatest.Matchers.{be, convertToAnyShouldWrapper} +import java.util.{List => JList} + +import scala.collection.JavaConverters._ +import in.ashwanthkumar.suuchi.rpc.CachedChannelPool +import org.mockito.ArgumentCaptor class AggregationRouterSpec extends FlatSpec { class RandomAggregation extends Aggregation { override def aggregator[A, B] : PartialFunction[MethodDescriptor[A, B], Aggregator[B, Any, B]] = { - case RandomGrpc.METHOD_FOO => Aggregator.const(1l).asInstanceOf[Aggregator[B, Any, B]] + case RandomGrpc.METHOD_FOO => + Aggregator.const(FooResponse.getDefaultInstance).asInstanceOf[Aggregator[B, Any, B]] } } @@ -51,7 +57,7 @@ class AggregationRouterSpec extends FlatSpec { val serverMethodDesc = TestMethodDescriptors.noopMethod[FooRequest, FooResponse]() when(serverCall.getMethodDescriptor).thenReturn(serverMethodDesc) - val headers = new Metadata() + val headers = new Metadata() headers.put(Headers.BROADCAST_REQUEST_KEY, true) val delegate = mock(classOf[Listener[FooRequest]]) @@ -69,4 +75,98 @@ class AggregationRouterSpec extends FlatSpec { verify(next, times(1)).startCall(mockEq(serverCall), mockEq(headers)) } + it should "scatter requests to all the nodes when aggregation is defined on the method" in { + val router = + new AggregationRouter(List(MemberAddress("host:1")), + new RandomAggregation) { + override protected def scatter[ReqT, RespT](nodes: List[MemberAddress], channelPool: CachedChannelPool, methodDescriptor: MethodDescriptor[ReqT, RespT], headers: Metadata, input: ReqT) = { + List(FooResponse.getDefaultInstance.asInstanceOf[RespT]).asJava + } + } + + val serverCall = mock(classOf[ServerCall[FooRequest, FooResponse]]) + when(serverCall.getMethodDescriptor).thenReturn(RandomGrpc.METHOD_FOO) + + val headers = new Metadata() + + val delegate = mock(classOf[Listener[FooRequest]]) + val next = mock(classOf[ServerCallHandler[FooRequest, FooResponse]]) + when(next.startCall(any(classOf[ServerCall[FooRequest, FooResponse]]), mockEq(headers))) + .thenReturn(delegate) + + val listener = router.interceptCall(serverCall, headers, next) + listener.onReady() + listener.onMessage(FooRequest.newBuilder().build()) + listener.onHalfClose() + // during onHalfClose + verify(serverCall, times(1)).sendHeaders(mockEq(headers)) + verify(serverCall, times(1)).sendMessage(mockEq(FooResponse.getDefaultInstance)) + verify(serverCall, times(1)).close(mockEq(Status.OK), mockEq(headers)) + + listener.onComplete() + + listener.onCancel() + // during onCancel + verify(serverCall, times(1)).close(mockEq(Status.CANCELLED), mockEq(headers)) + + // general interactions + verify(next, times(0)).startCall(mockEq(serverCall), mockEq(headers)) + verify(serverCall, times(1)).request(2) + headers.containsKey(Headers.BROADCAST_REQUEST_KEY) should be(true) + } + + it should "fail with INTERNAL when scatter request fail" in { + val router = + new AggregationRouter(List(MemberAddress("host:1")), new RandomAggregation) { + override protected def scatter[ReqT, RespT]( + nodes: List[MemberAddress], + channelPool: CachedChannelPool, + methodDescriptor: MethodDescriptor[ReqT, RespT], + headers: Metadata, + input: ReqT) = { + throw new RuntimeException("scatter failed") + } + } + + val serverCall = mock(classOf[ServerCall[FooRequest, FooResponse]]) + when(serverCall.getMethodDescriptor).thenReturn(RandomGrpc.METHOD_FOO) + + val headers = new Metadata() + + val delegate = mock(classOf[Listener[FooRequest]]) + val next = mock(classOf[ServerCallHandler[FooRequest, FooResponse]]) + when(next.startCall(any(classOf[ServerCall[FooRequest, FooResponse]]), mockEq(headers))) + .thenReturn(delegate) + + val listener = router.interceptCall(serverCall, headers, next) + listener.onReady() + listener.onMessage(FooRequest.newBuilder().build()) + listener.onHalfClose() + // during onHalfClose + val statusCaptor = ArgumentCaptor.forClass(classOf[Status]) + verify(serverCall, times(1)).close(statusCaptor.capture(), mockEq(headers)) + val status = statusCaptor.getValue + status.getCode should be(Status.INTERNAL.getCode) + status.getCause.getMessage should be("scatter failed") + + listener.onComplete() + + // general interactions + verify(next, times(0)).startCall(mockEq(serverCall), mockEq(headers)) + verify(serverCall, times(1)).request(2) + headers.containsKey(Headers.BROADCAST_REQUEST_KEY) should be(true) + } + +} + +class MockScatterAggregationRouter[T](members: List[MemberAddress], + agg: Aggregation, + scatterResponses: JList[T]) + extends AggregationRouter(members, agg) { + override protected def scatter[ReqT, RespT](nodes: List[MemberAddress], + channelPool: CachedChannelPool, + methodDescriptor: MethodDescriptor[ReqT, RespT], + headers: Metadata, + input: ReqT) = + scatterResponses.asInstanceOf[JList[RespT]] } From 7d4e1b16f83a3fb072c7344d7460889bfeee9364 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 20:52:30 +0530 Subject: [PATCH 16/18] Adding BooleanMarshallerSpec for fun and profit --- .../suuchi/router/BooleanMarshallerSpec.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/BooleanMarshallerSpec.scala diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/BooleanMarshallerSpec.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/BooleanMarshallerSpec.scala new file mode 100644 index 0000000..c1d517e --- /dev/null +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/BooleanMarshallerSpec.scala @@ -0,0 +1,14 @@ +package in.ashwanthkumar.suuchi.router + +import org.scalatest.FlatSpec +import org.scalatest.Matchers.{be, convertToAnyShouldWrapper} + +class BooleanMarshallerSpec extends FlatSpec { + "BooleanMarshaller" should "return the bool as string when serialised" in { + BooleanMarshaller.toAsciiString(true) should be("true") + } + + it should "return the bool when de-serialised" in { + BooleanMarshaller.parseAsciiString("true") should be(true) + } +} From 315230082742e8be3fc2e7f62fcb34923c7456a2 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 20:55:13 +0530 Subject: [PATCH 17/18] removing the un-used MockScatterAggregationRouter --- .../suuchi/router/AggregationRouterSpec.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala index 3c57d62..c6ce755 100644 --- a/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala +++ b/suuchi-core/src/test/scala/in/ashwanthkumar/suuchi/router/AggregationRouterSpec.scala @@ -158,15 +158,3 @@ class AggregationRouterSpec extends FlatSpec { } } - -class MockScatterAggregationRouter[T](members: List[MemberAddress], - agg: Aggregation, - scatterResponses: JList[T]) - extends AggregationRouter(members, agg) { - override protected def scatter[ReqT, RespT](nodes: List[MemberAddress], - channelPool: CachedChannelPool, - methodDescriptor: MethodDescriptor[ReqT, RespT], - headers: Metadata, - input: ReqT) = - scatterResponses.asInstanceOf[JList[RespT]] -} From a27fe075206ac26c39729d682e866eeec9097317 Mon Sep 17 00:00:00 2001 From: Ashwanth Kumar Date: Fri, 2 Jun 2017 20:58:14 +0530 Subject: [PATCH 18/18] Excluding the generating proto java classes from test report --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 37edd0c..c16114b 100644 --- a/pom.xml +++ b/pom.xml @@ -223,6 +223,7 @@ in/ashwanthkumar/suuchi/rpc/generated/* + in/ashwanthkumar/suuchi/core/tests/*