This repository has been archived by the owner on Oct 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathKinesis.scala
269 lines (256 loc) · 11.4 KB
/
Kinesis.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/*
* Copyright 2017 WeightWatchers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.weightwatchers.reactive.kinesis.stream
import akka.{Done, NotUsed}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.auth.AWSCredentialsProvider
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.{ConsumerService, KinesisConsumer}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}
import scala.concurrent.Future
/**
* Main entry point for creating a Kinesis source and sink.
*/
object Kinesis extends LazyLogging {
/**
* Create a source, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* @param consumerConf the configuration to connect to Kinesis.
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
* @param system the actor system.
* @return A source of KinesisEvent objects.
*/
def source(
consumerConf: ConsumerConf,
createConsumer: ActorRef => ConsumerService
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, createConsumer, system))
}
/**
* Create a source, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* @param consumerConf the configuration to connect to Kinesis.
* @param system the actor system.
* @return A source of KinesisEvent objects.
*/
def source(
consumerConf: ConsumerConf
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(consumerConf, KinesisConsumer(consumerConf, _, system))
}
/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param inConfig the name of the sub-config for kinesis.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(consumerName: String, inConfig: String)(
implicit system: ActorSystem
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
}
/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(consumerName: String)(
implicit system: ActorSystem
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(consumerName, "kinesis")
}
/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
* @param inConfig the name of the sub-config for kinesis.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(
consumerName: String,
createConsumer: (ConsumerConf, ActorRef) => ConsumerService,
inConfig: String = "kinesis"
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
val consumerConf = ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)
source(consumerConf, createConsumer(consumerConf, _))
}
/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an actor, which is created with the given Props.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more than maxOutstanding messages are not acknowledged.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param props the props to create a producer actor. This is a function to work around #48.
* @param maxOutStanding the number of messages to send to the actor unacknowledged before back pressure is applied.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(
props: => Props,
maxOutStanding: Int
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
}
/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured with given config object.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param producerConf the configuration to create KinesisProducerActor
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(
producerConf: ProducerConf
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
val maxOutstanding = producerConf.throttlingConf.fold {
logger.info(
"Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000"
)
1000
}(_.maxOutstandingRequests)
sink(KinesisProducerActor.props(producerConf), maxOutstanding)
}
/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param kinesisConfig the configuration object that holds the producer config.
* @param producerName the name of the producer in the system configuration.
* @param credentialsProvider the AWS credentials provider to use to connect.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(
kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider]
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
sink(
ProducerConf(kinesisConfig, producerName, credentialsProvider)
)
}
/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* producer-name {
* stream-name = "sample-stream"
* akka.max-outstanding-requests = 100
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param producerName the name of the producer in the system configuration.
* @param inConfig the configuration object that holds the producer config (usually kinesis).
* @param credentialsProvider the AWS credentials provider to use to connect.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(
producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
}
}