diff --git a/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java index 923883c87..f2a19ebe0 100644 --- a/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java +++ b/experiments/storm/src/main/java/org/apache/gearpump/experiments/storm/util/TimeCacheMapWrapper.java @@ -18,7 +18,8 @@ package org.apache.gearpump.experiments.storm.util; -import backtype.storm.utils.TimeCacheMap; + +import org.apache.storm.utils.TimeCacheMap; /** * Wrapper class to suppress "deprecation" warning, as scala doesn't support the suppression. diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index 544a4eb41..487787364 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -21,17 +21,19 @@ package org.apache.gearpump.experiments.storm.main import java.io.{File, FileOutputStream, FileWriter} import java.nio.ByteBuffer import java.nio.channels.{Channels, WritableByteChannel} -import java.util.{HashMap => JHashMap, Map => JMap, UUID} +import java.util.{List => JList, HashMap => JHashMap, Map => JMap, UUID} +import org.apache.storm.Config +import org.apache.storm.generated._ +import org.apache.storm.nimbus.NimbusInfo +import org.apache.storm.security.auth.{ReqContext, ThriftConnectionType, ThriftServer} +import org.apache.storm.utils.Utils + import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import akka.actor.ActorSystem import com.typesafe.config.ConfigValueFactory -import backtype.storm.Config -import backtype.storm.generated._ -import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} -import backtype.storm.utils.Utils import org.apache.storm.shade.org.json.simple.JSONValue import org.apache.storm.shade.org.yaml.snakeyaml.Yaml import org.slf4j.Logger @@ -231,7 +233,8 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe val topologySummaryList = topologies.map { case (name, _) => new TopologySummary(name, name, 0, 0, 0, 0, "") }.toSeq - new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) + new ClusterSummary(List[SupervisorSummary]().asJava, + topologySummaryList.asJava, List[NimbusSummary]().asJava) } override def beginFileDownload(file: String): String = { @@ -284,6 +287,94 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe new File(jar).delete() topologies -= name } + + override def updateBlobReplication(key: String, replication: Int): Int = { + throw new UnsupportedOperationException + } + + override def setLogConfig(name: String, config: LogConfig): Unit = { + throw new UnsupportedOperationException + } + + override def downloadBlobChunk(session: String): ByteBuffer = { + throw new UnsupportedOperationException + } + + override def beginBlobDownload(key: String): BeginDownloadResult = { + throw new UnsupportedOperationException + } + + override def cancelBlobUpload(session: String): Unit = { + throw new UnsupportedOperationException + } + + override def getTopologyHistory(user: String): TopologyHistoryInfo = { + throw new UnsupportedOperationException + } + + override def getTopologyPageInfo(id: String, window: String, is_include_sys: Boolean): TopologyPageInfo = { + throw new UnsupportedOperationException + } + + override def getBlobMeta(key: String): ReadableBlobMeta = { + throw new UnsupportedOperationException + } + + override def createStateInZookeeper(key: String): Unit = { + throw new UnsupportedOperationException + } + + override def setBlobMeta(key: String, meta: SettableBlobMeta): Unit = { + throw new UnsupportedOperationException + } + + override def getComponentPendingProfileActions(id: String, component_id: String, action: ProfileAction): JList[ProfileRequest] = { + throw new UnsupportedOperationException + } + + override def debug(name: String, component: String, enable: Boolean, samplingPercentage: Double): Unit = { + throw new UnsupportedOperationException + } + + override def getComponentPageInfo(topology_id: String, component_id: String, window: String, is_include_sys: Boolean): ComponentPageInfo = { + throw new UnsupportedOperationException + } + + override def setWorkerProfiler(id: String, profileRequest: ProfileRequest): Unit = { + throw new UnsupportedOperationException + } + + override def finishBlobUpload(session: String): Unit = { + throw new UnsupportedOperationException + } + + override def beginCreateBlob(key: String, meta: SettableBlobMeta): String = { + throw new UnsupportedOperationException + } + + override def getBlobReplication(key: String): Int = { + throw new UnsupportedOperationException + } + + override def deleteBlob(key: String): Unit = { + throw new UnsupportedOperationException + } + + override def listBlobs(session: String): ListBlobsResult = { + throw new UnsupportedOperationException + } + + override def getLogConfig(name: String): LogConfig = { + throw new UnsupportedOperationException + } + + override def beginUpdateBlob(key: String): String = { + throw new UnsupportedOperationException + } + + override def uploadBlobChunk(session: String, chunk: ByteBuffer): Unit = { + throw new UnsupportedOperationException + } } case class TopologyData(topology: StormTopology, config: JMap[AnyRef, AnyRef], jar: String) diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala index 1cfd5a40b..7945b005a 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpStormClient.scala @@ -18,11 +18,11 @@ package org.apache.gearpump.experiments.storm.main -import backtype.storm.Config -import backtype.storm.utils.Utils import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{AkkaApp, LogUtil, Util} +import org.apache.storm.Config +import org.apache.storm.utils.Utils object GearpumpStormClient extends AkkaApp with ArgumentsParser { diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala index f60cc49fe..56f362696 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollector.scala @@ -20,12 +20,12 @@ package org.apache.gearpump.experiments.storm.processor import java.util.{Collection => JCollection, List => JList} -import backtype.storm.task.IOutputCollector -import backtype.storm.tuple.Tuple import org.apache.gearpump.experiments.storm.topology.TimedTuple import org.apache.gearpump.experiments.storm.util.StormConstants._ import org.apache.gearpump.experiments.storm.util.StormOutputCollector import org.apache.gearpump.streaming.task.UpdateCheckpointClock +import org.apache.storm.task.IOutputCollector +import org.apache.storm.tuple.Tuple /** * this is used by Storm bolt to emit messages diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala index 5794b1db7..b086dbad7 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala @@ -20,9 +20,9 @@ package org.apache.gearpump.experiments.storm.producer import java.util.{List => JList} -import backtype.storm.spout.{ISpout, ISpoutOutputCollector} import org.apache.gearpump.TimeStamp import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.apache.storm.spout.{ISpoutOutputCollector, ISpout} case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp) diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala index d0f294997..f86cc8b7c 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala @@ -25,13 +25,6 @@ import java.util.{HashMap => JHashMap, List => JList, Map => JMap} import akka.actor.ActorRef import akka.pattern.ask -import backtype.storm.Config -import backtype.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology} -import backtype.storm.metric.api.IMetric -import backtype.storm.spout.{ISpout, SpoutOutputCollector} -import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} -import backtype.storm.tuple.{Fields, Tuple, TupleImpl} -import backtype.storm.utils.Utils import clojure.lang.Atom import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.gearpump.experiments.storm.processor.StormBoltOutputCollector @@ -43,6 +36,13 @@ import org.apache.gearpump.streaming.DAG import org.apache.gearpump.streaming.task.{GetDAG, TaskId, TaskContext, StartTime} import org.apache.gearpump.util.{Constants, LogUtil} import org.apache.gearpump.{Message, TimeStamp} +import org.apache.storm.Config +import org.apache.storm.generated.{Bolt, ComponentCommon, SpoutSpec, StormTopology} +import org.apache.storm.metric.api.IMetric +import org.apache.storm.spout.{SpoutOutputCollector, ISpout} +import org.apache.storm.task.{OutputCollector, IBolt, GeneralTopologyContext, TopologyContext} +import org.apache.storm.tuple.{Fields, Tuple, TupleImpl} +import org.apache.storm.utils.Utils import org.slf4j.Logger import scala.collection.JavaConverters._ @@ -55,13 +55,15 @@ import scala.concurrent.{Await, Future} trait GearpumpStormComponent { /** * invoked at Task.onStart - * @param startTime task start time + * + * @param startTime task start time */ def start(startTime: StartTime): Unit /** * invoked at Task.onNext - * @param message incoming message + * + * @param message incoming message */ def next(message: Message): Unit @@ -228,7 +230,8 @@ object GearpumpStormComponent { /** * invoked at TICK message when "topology.tick.tuple.freq.secs" is configured - * @param freq tick frequency + * + * @param freq tick frequency */ def tick(freq: Int): Unit = { if (null == tickTuple) { @@ -241,7 +244,8 @@ object GearpumpStormComponent { /** * normalize general config with per component configs * "topology.transactional.id" and "topology.tick.tuple.freq.secs" - * @param stormConfig general config for all components + * + * @param stormConfig general config for all components * @param componentCommon common component parts */ private def normalizeConfig(stormConfig: Map[AnyRef, AnyRef], diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala index 62bc25cc6..86d2c27c9 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopology.scala @@ -22,9 +22,6 @@ import java.lang.{Iterable => JIterable} import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} import akka.actor.ActorSystem -import backtype.storm.Config -import backtype.storm.generated._ -import backtype.storm.utils.{ThriftTopologyUtils, Utils} import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.experiments.storm.processor.StormProcessor import org.apache.gearpump.experiments.storm.producer.StormProducer @@ -34,6 +31,9 @@ import org.apache.gearpump.experiments.storm.util.StormUtil._ import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.LogUtil +import org.apache.storm.Config +import org.apache.storm.generated.{ComponentCommon, GlobalStreamId, Grouping, Bolt, SpoutSpec, StormTopology} +import org.apache.storm.utils.{ThriftTopologyUtils, Utils} import org.slf4j.Logger // TODO: Refactor this file, we should disable using of JavaConversions @@ -64,7 +64,8 @@ object GearpumpStormTopology { * 3. provides interface for Gearpump applications to use Storm topology * * an implicit ActorSystem is required to create Gearpump processors - * @param name topology name + * + * @param name topology name * @param topology Storm topology * @param sysConfig configs from "defaults.yaml" and custom config file * @param appConfig config submitted from user application @@ -124,7 +125,8 @@ private[storm] class GearpumpStormTopology( /** * creates Gearpump processor from Storm spout - * @param spoutId spout id + * + * @param spoutId spout id * @param spoutSpec spout spec * @param stormConfig merged storm config * @param system actor system @@ -141,7 +143,8 @@ private[storm] class GearpumpStormTopology( /** * creates Gearpump processor from Storm bolt - * @param boltId bolt id + * + * @param boltId bolt id * @param boltSpec bolt spec * @param stormConfig merged storm config * @param system actor system @@ -192,7 +195,8 @@ private[storm] class GearpumpStormTopology( * 1. use "topology.tasks" if defined; otherwise use parallelism_hint * 2. parallelism should not be larger than "topology.max.task.parallelism" if defined * 3. component config overrides system config - * @param stormConfig System configs without merging "topology.tasks" and + * + * @param stormConfig System configs without merging "topology.tasks" and * "topology.max.task.parallelism" of component * @return number of task instances for a component */ @@ -223,7 +227,8 @@ private[storm] class GearpumpStormTopology( /** * merge component configs "topology.kryo.decorators" and "topology.kryo.register" - * @param componentConfigs list of component configs + * + * @param componentConfigs list of component configs * @param allConfig existing configs without merging component configs * @return the two configs merged from all the component configs and existing configs */ diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala index eb61acb1c..b6a6b2ee9 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala @@ -20,10 +20,10 @@ package org.apache.gearpump.experiments.storm.topology import java.util.{List => JList} -import backtype.storm.task.GeneralTopologyContext -import backtype.storm.tuple.{Tuple, TupleImpl} import org.apache.gearpump.TimeStamp +import org.apache.storm.task.GeneralTopologyContext +import org.apache.storm.tuple.{TupleImpl, Tuple} /** * this carries Storm tuple values in the Gearpump world @@ -36,10 +36,11 @@ private[storm] class GearpumpTuple( val sourceStreamId: String, @transient val targetPartitions: Map[String, Array[Int]]) extends Serializable { /** - * creates a Storm [[backtype.storm.tuple.Tuple]] to be passed to a Storm component + * creates a Storm [[org.apache.storm.tuple.Tuple]] to be passed to a Storm component * this is needed for each incoming message - * because we cannot get [[backtype.storm.task.GeneralTopologyContext]] at deserialization - * @param topologyContext topology context used for all tasks + * because we cannot get [[org.apache.storm.task.GeneralTopologyContext]] at deserialization + * + * @param topologyContext topology context used for all tasks * @return a Tuple */ def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = { diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala index 1d04af66f..6b825dcec 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/Grouper.scala @@ -19,12 +19,13 @@ package org.apache.gearpump.experiments.storm.util import java.util.{List => JList} +import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.grouping.CustomStreamGrouping +import org.apache.storm.task.TopologyContext +import org.apache.storm.tuple.Fields + import scala.util.Random -import backtype.storm.generated.GlobalStreamId -import backtype.storm.grouping.CustomStreamGrouping -import backtype.storm.task.TopologyContext -import backtype.storm.tuple.Fields /** * Grouper is identical to that in storm but return gearpump partitions for storm tuple values @@ -114,7 +115,7 @@ class AllGrouper(numTasks: Int) extends Grouper { /** * CustomGrouper allows users to specify grouping strategy * - * @param grouping see [[backtype.storm.grouping.CustomStreamGrouping]] + * @param grouping see [[org.apache.storm.grouping.CustomStreamGrouping]] */ class CustomGrouper(grouping: CustomStreamGrouping) extends Grouper { diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala index f95b840da..5bf56a0ee 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala @@ -19,13 +19,14 @@ package org.apache.gearpump.experiments.storm.util import java.util.{ArrayList => JArrayList, Iterator => JIterator, List => JList, Map => JMap} +import org.apache.storm.generated.{JavaObject, GlobalStreamId, Grouping} +import org.apache.storm.grouping.CustomStreamGrouping +import org.apache.storm.task.TopologyContext +import org.apache.storm.tuple.Fields +import org.apache.storm.utils.Utils + import scala.collection.JavaConverters._ -import backtype.storm.generated.{GlobalStreamId, Grouping, JavaObject} -import backtype.storm.grouping.CustomStreamGrouping -import backtype.storm.task.TopologyContext -import backtype.storm.tuple.Fields -import backtype.storm.utils.Utils import org.slf4j.Logger import org.apache.gearpump._ diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala index 8bffc556f..9ef73d13c 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormSerializationFramework.scala @@ -21,8 +21,6 @@ import java.lang.{Integer => JInteger} import java.util.{Map => JMap} import akka.actor.ExtendedActorSystem -import backtype.storm.serialization.SerializationFactory -import backtype.storm.utils.ListDelegate import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} @@ -30,6 +28,8 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.experiments.storm.topology.GearpumpTuple import org.apache.gearpump.experiments.storm.util.StormConstants._ import org.apache.gearpump.serializer.{SerializationFramework, Serializer} +import org.apache.storm.serialization.SerializationFactory +import org.apache.storm.utils.ListDelegate class StormSerializationFramework extends SerializationFramework { private var stormConfig: JMap[AnyRef, AnyRef] = null @@ -54,7 +54,7 @@ class StormSerializationFramework extends SerializationFramework { /** * serializes / deserializes [[org.apache.gearpump.experiments.storm.topology.GearpumpTuple]] * - * @param kryo created by Storm [[backtype.storm.serialization.SerializationFactory]] + * @param kryo created by Storm [[org.apache.storm.serialization.SerializationFactory]] */ class StormSerializer(kryo: Kryo) extends Serializer { // -1 means the max buffer size is 2147483647 diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala index 40e36a61d..eeb41dd74 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormUtil.scala @@ -22,8 +22,8 @@ import java.lang.{Boolean => JBoolean} import java.util.{HashMap => JHashMap, Map => JMap} import akka.actor.ActorSystem -import backtype.storm.Config -import backtype.storm.generated._ +import org.apache.storm.Config +import org.apache.storm.generated.StormTopology import org.apache.storm.shade.org.json.simple.JSONValue import org.apache.gearpump.cluster.UserConfig diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala index 430b1c0a1..948636060 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/processor/StormBoltOutputCollectorSpec.scala @@ -20,9 +20,9 @@ package org.apache.gearpump.experiments.storm.processor import java.util.{List => JList} -import backtype.storm.tuple.Tuple -import backtype.storm.utils.Utils import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.apache.storm.tuple.Tuple +import org.apache.storm.utils.Utils import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala index 49afe0517..1f2fc0f81 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollectorSpec.scala @@ -20,9 +20,9 @@ package org.apache.gearpump.experiments.storm.producer import java.util.{List => JList} -import backtype.storm.spout.ISpout -import backtype.storm.utils.Utils import org.apache.gearpump.experiments.storm.util.StormOutputCollector +import org.apache.storm.spout.ISpout +import org.apache.storm.utils.Utils import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala index bdea50c91..fdb33afb3 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala @@ -20,15 +20,15 @@ package org.apache.gearpump.experiments.storm.topology import java.util.{Map => JMap} import akka.actor.ActorRef -import backtype.storm.spout.{ISpout, SpoutOutputCollector} -import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext} -import backtype.storm.tuple.Tuple import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout} import org.apache.gearpump.experiments.storm.util.StormOutputCollector import org.apache.gearpump.streaming.task.{StartTime, TaskContext, TaskId} import org.apache.gearpump.streaming.{DAG, MockUtil} import org.apache.gearpump.{Message, TimeStamp} +import org.apache.storm.spout.{SpoutOutputCollector, ISpout} +import org.apache.storm.task.{OutputCollector, GeneralTopologyContext, IBolt, TopologyContext} +import org.apache.storm.tuple.Tuple import org.mockito.Matchers.{anyObject, eq => mockitoEq} import org.mockito.Mockito._ import org.scalacheck.Gen diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala index ef383ad54..d76dd7a83 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala @@ -20,11 +20,11 @@ package org.apache.gearpump.experiments.storm.topology import java.util.{HashMap => JHashMap, Map => JMap} -import backtype.storm.Config import org.apache.gearpump.experiments.storm.processor.StormProcessor import org.apache.gearpump.experiments.storm.producer.StormProducer import org.apache.gearpump.experiments.storm.util.TopologyUtil import org.apache.gearpump.streaming.MockUtil +import org.apache.storm.Config import org.scalatest.mock.MockitoSugar import org.scalatest.{Matchers, WordSpec} diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala index f12e54f68..b60959ca0 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.experiments.storm.topology import java.util.{List => JList} -import backtype.storm.task.GeneralTopologyContext -import backtype.storm.tuple.Fields import org.apache.gearpump.TimeStamp +import org.apache.storm.task.GeneralTopologyContext +import org.apache.storm.tuple.Fields import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala index c1cdb3b1d..0c832d647 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GrouperSpec.scala @@ -19,12 +19,13 @@ package org.apache.gearpump.experiments.storm.util import java.util.{List => JList} +import org.apache.storm.generated.GlobalStreamId +import org.apache.storm.grouping.CustomStreamGrouping +import org.apache.storm.task.TopologyContext +import org.apache.storm.tuple.Fields + import scala.collection.JavaConverters._ -import backtype.storm.generated.GlobalStreamId -import backtype.storm.grouping.CustomStreamGrouping -import backtype.storm.task.TopologyContext -import backtype.storm.tuple.Fields import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala index e0e9e612c..3c2666cf9 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala @@ -18,9 +18,10 @@ package org.apache.gearpump.experiments.storm.util import java.util.{List => JList, Map => JMap} +import org.apache.storm.generated.Grouping + import scala.collection.JavaConverters._ -import backtype.storm.generated.Grouping import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalacheck.Gen diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala index e787c3d06..589a6eec2 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormSerializerPoolSpec.scala @@ -18,11 +18,12 @@ package org.apache.gearpump.experiments.storm.util -import java.util.{HashMap => JHashMap, List => JList, Map => JMap} +import java.util.{List => JList, Map => JMap} +import org.apache.storm.utils.Utils + import scala.collection.JavaConverters._ import akka.actor.ExtendedActorSystem -import backtype.storm.utils.Utils import com.esotericsoftware.kryo.Kryo import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala index 36d84cb98..00e27337e 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormUtilSpec.scala @@ -22,8 +22,6 @@ import java.lang.{Boolean => JBoolean, Long => JLong} import java.util.{HashMap => JHashMap, Map => JMap} import scala.collection.JavaConverters._ -import backtype.storm.Config -import backtype.storm.generated.StormTopology import org.apache.storm.shade.org.json.simple.JSONValue import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala index 886013cd8..868c70efd 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/TopologyUtil.scala @@ -18,11 +18,12 @@ package org.apache.gearpump.experiments.storm.util -import backtype.storm.generated.StormTopology -import backtype.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout} -import backtype.storm.topology.TopologyBuilder -import backtype.storm.tuple.Fields -import backtype.storm.utils.Utils +import org.apache.storm.generated.StormTopology +import org.apache.storm.testing.{TestGlobalCount, TestWordCounter, TestWordSpout} +import org.apache.storm.topology.TopologyBuilder +import org.apache.storm.tuple.Fields +import org.apache.storm.utils.Utils + object TopologyUtil { val DEFAULT_STREAM_ID = Utils.DEFAULT_STREAM_ID diff --git a/project/Build.scala b/project/Build.scala index c9f6356d4..4679c2b44 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -49,7 +49,7 @@ object Build extends sbt.Build { val upickleVersion = "0.3.4" val junitVersion = "4.12" val kafkaVersion = "0.8.2.1" - val stormVersion = "0.10.0" + val stormVersion = "1.0.1" val slf4jVersion = "1.7.7" val gsCollectionsVersion = "6.2.0"