-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathActorAndMergeHub.scala
99 lines (81 loc) · 3.2 KB
/
ActorAndMergeHub.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.example
import akka.NotUsed
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
import akka.cluster.typed.{Cluster, Join}
import akka.stream.{OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{MergeHub, Sink, Source, SourceQueueWithComplete}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Random, Success}
object ActorAndMergeHub extends App {
val config = ConfigFactory.load()
val customConf = ConfigFactory.parseString("""
akka.actor.provider = "cluster"
""")
val system = ActorSystem[Nothing](Guardian(), "bla", ConfigFactory.load(customConf).withFallback(config))
}
object Guardian {
def apply(): Behavior[Nothing] = {
Behaviors.setup[Nothing] { context =>
implicit val system = context.system
val cluster = Cluster(context.system)
cluster.manager ! Join(cluster.selfMember.address)
val sharding = ClusterSharding(context.system)
val TypeKey = EntityTypeKey[Counter.Command]("Counter")
val streamEnd = MergeHub.source[String](perProducerBufferSize = 16)
.to(Sink.foreach(println(_))).run()
val shardRegion: ActorRef[ShardingEnvelope[Counter.Command]] =
sharding.init(Entity(TypeKey)(createBehavior = entityContext => Counter(entityContext.entityId, streamEnd)))
Source.repeat("bla")
.throttle(10, 1.second)
.map(x => {
shardRegion ! ShardingEnvelope("counter-"+Random.nextInt(10), Counter.Increment)
})
.to(Sink.ignore)
.run()
Behaviors.same
}
}
}
object Counter {
sealed trait Command
case object Increment extends Command
case object Enqueued extends Command
case object EnqueuedFailure extends Command
final case class GetValue(replyTo: ActorRef[Int]) extends Command
def apply(entityId: String, streamEnd:Sink[String, NotUsed]): Behavior[Command] = Behaviors.receive { (context, message) =>
def updated(value: Int, stream:SourceQueueWithComplete[String], state:String): Behavior[Command] = {
Behaviors.withStash(100) { buffer =>
Behaviors.receiveMessage[Command] {
case Increment =>
val newValue = value + 1
if (state == "waiting") {
buffer.stash(Increment)
}
else if (state == "clear") {
context.pipeToSelf(stream.offer("Entity: "+entityId+" Count: "+newValue)) {
case Success(_) => Enqueued
case Failure(e) => EnqueuedFailure
}
}
updated(newValue, stream, "waiting")
case GetValue(replyTo) =>
replyTo ! value
Behaviors.same
case Enqueued =>
updated(value, stream, "clear")
case EnqueuedFailure =>
println("Enqueue Failure")
Behaviors.same
}
}
}
implicit val sys = context.system
val offerStream = Source.queue[String](10, OverflowStrategy.backpressure)
.to(streamEnd).run()
updated(0, offerStream, "clear")
}
}