root / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / service / RabbitMQService.scala @ 61bfaf2e
History | View | Annotate | Download (13.2 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.connector.rabbitmq.service |
37 |
|
38 |
import com.ckkloverdos.props.Props |
39 |
import gr.grnet.aquarium.util.date.TimeHelpers |
40 |
import gr.grnet.aquarium.util.{Loggable, Lifecycle} |
41 |
import gr.grnet.aquarium.util.safeUnit |
42 |
import com.rabbitmq.client.Address |
43 |
import gr.grnet.aquarium.{Configurator, Configurable} |
44 |
import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType} |
45 |
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys |
46 |
import com.ckkloverdos.env.{EnvKey, Env} |
47 |
import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters} |
48 |
import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel} |
49 |
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel} |
50 |
import com.ckkloverdos.maybe.MaybeEither |
51 |
import gr.grnet.aquarium.actor.RouterRole |
52 |
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent} |
53 |
import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore} |
54 |
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer |
55 |
import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey} |
56 |
|
57 |
/** |
58 |
* |
59 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
60 |
*/ |
61 |
|
62 |
class RabbitMQService extends Loggable with Lifecycle with Configurable { |
63 |
@volatile private[this] var _props: Props = Props()(StdConverters.AllConverters) |
64 |
@volatile private[this] var _consumers = List[RabbitMQConsumer]() |
65 |
|
66 |
def propertyPrefix = Some(RabbitMQService.PropertiesPrefix) |
67 |
|
68 |
def configurator = Configurator.MasterConfigurator |
69 |
|
70 |
def eventBus = configurator.eventBus |
71 |
|
72 |
def resourceEventStore = configurator.resourceEventStore |
73 |
|
74 |
def imEventStore = configurator.imEventStore |
75 |
|
76 |
def converters = configurator.converters |
77 |
|
78 |
def router = configurator.actorProvider.actorForRole(RouterRole) |
79 |
|
80 |
/** |
81 |
* Configure this instance with the provided properties. |
82 |
* |
83 |
* If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix. |
84 |
*/ |
85 |
def configure(props: Props) = { |
86 |
this._props = props |
87 |
|
88 |
doConfigure() |
89 |
} |
90 |
|
91 |
private[this] def doConfigure(): Unit = { |
92 |
val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒ |
93 |
converters.convert[JsonTextFormat](payload) |
94 |
} |
95 |
|
96 |
val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒ |
97 |
StdResourceEvent.fromJsonTextFormat(jsonTextFormat) |
98 |
} |
99 |
|
100 |
val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒ |
101 |
StdIMEvent.fromJsonTextFormat(jsonTextFormat) |
102 |
} |
103 |
|
104 |
val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒ |
105 |
router ! ProcessResourceEvent(rcEvent) |
106 |
} |
107 |
|
108 |
val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒ |
109 |
logger.info("Forwarding {}", rcEvent) |
110 |
} |
111 |
|
112 |
val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒ |
113 |
router ! ProcessIMEvent(imEvent) |
114 |
} |
115 |
|
116 |
val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒ |
117 |
logger.info("Forwarding {}", imEvent) |
118 |
} |
119 |
|
120 |
val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent]( |
121 |
jsonParser, |
122 |
(payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error), |
123 |
rcEventParser, |
124 |
rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent), |
125 |
rcDebugForwardAction |
126 |
) |
127 |
|
128 |
val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent]( |
129 |
jsonParser, |
130 |
(payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error), |
131 |
imEventParser, |
132 |
imEvent ⇒ imEventStore.insertIMEvent(imEvent), |
133 |
imDebugForwardAction |
134 |
) |
135 |
|
136 |
val futureExecutor = new PayloadHandlerFutureExecutor |
137 |
|
138 |
// (e)xchange:(r)outing key:(q) |
139 |
|
140 |
// These two are to trigger an error if the property does not exist |
141 |
locally(_props(RabbitMQConfKeys.rcevents_queues)) |
142 |
locally(_props(RabbitMQConfKeys.imevents_queues)) |
143 |
|
144 |
val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues) |
145 |
|
146 |
val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield { |
147 |
RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ) |
148 |
} |
149 |
val rcConsumerConfs = rcConsumerConfs_.toSet.toList |
150 |
if(rcConsumerConfs.size != rcConsumerConfs_.size) { |
151 |
logger.warn( |
152 |
"Duplicate %s consumer info in %s=%s".format( |
153 |
RabbitMQService.PropertiesPrefix, |
154 |
RabbitMQConfKeys.rcevents_queues, |
155 |
_props(RabbitMQConfKeys.rcevents_queues))) |
156 |
} |
157 |
|
158 |
val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues) |
159 |
val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield { |
160 |
RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ) |
161 |
} |
162 |
val imConsumerConfs = imConsumerConfs_.toSet.toList |
163 |
if(imConsumerConfs.size != imConsumerConfs_.size) { |
164 |
logger.warn( |
165 |
"Duplicate %s consumer info in %s=%s".format( |
166 |
RabbitMQService.PropertiesPrefix, |
167 |
RabbitMQConfKeys.imevents_queues, |
168 |
_props(RabbitMQConfKeys.imevents_queues))) |
169 |
} |
170 |
|
171 |
val rcConsumers = for(rccc ← rcConsumerConfs) yield { |
172 |
logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format( |
173 |
RabbitMQService.PropertiesPrefix, |
174 |
rccc.exchangeName, |
175 |
rccc.routingKey, |
176 |
rccc.queueName |
177 |
)) |
178 |
new RabbitMQConsumer(rccc, rcHandler, futureExecutor) |
179 |
} |
180 |
|
181 |
val imConsumers = for(imcc ← imConsumerConfs) yield { |
182 |
logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format( |
183 |
RabbitMQService.PropertiesPrefix, |
184 |
imcc.exchangeName, |
185 |
imcc.routingKey, |
186 |
imcc.queueName |
187 |
)) |
188 |
new RabbitMQConsumer(imcc, imHandler, futureExecutor) |
189 |
} |
190 |
|
191 |
this._consumers = rcConsumers ++ imConsumers |
192 |
|
193 |
val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_) |
194 |
lg("Got %s consumers".format(this._consumers.size)) |
195 |
|
196 |
this._consumers.foreach(logger.debug("Configured {}", _)) |
197 |
} |
198 |
|
199 |
def start() = { |
200 |
logStartingF("") { |
201 |
this._consumers.foreach(_.start()) |
202 |
|
203 |
for(consumer ← this._consumers) { |
204 |
if(!consumer.isAlive()) { |
205 |
logger.warn("Consumer not started yet {}", consumer.toDebugString) |
206 |
} |
207 |
} |
208 |
} {} |
209 |
} |
210 |
|
211 |
def stop() = { |
212 |
logStoppingF("") { |
213 |
for(consumer ← this._consumers) { |
214 |
safeUnit(consumer.stop()) |
215 |
} |
216 |
} {} |
217 |
} |
218 |
} |
219 |
|
220 |
object RabbitMQService { |
221 |
final val PropertiesPrefix = "rabbitmq" |
222 |
final val PropertiesPrefixAndDot = PropertiesPrefix + "." |
223 |
|
224 |
@inline private[this] def p(name: String) = PropertiesPrefixAndDot + name |
225 |
|
226 |
final val DefaultExchangeConfArguments = Env() |
227 |
|
228 |
final val DefaultExchangeConf = Env() + |
229 |
(RabbitMQExchangeKeys.`type`, TopicExchange) + |
230 |
(RabbitMQExchangeKeys.autoDelete, false) + |
231 |
(RabbitMQExchangeKeys.durable, true) + |
232 |
(RabbitMQExchangeKeys.arguments, DefaultExchangeConfArguments) |
233 |
|
234 |
|
235 |
final val DefaultQueueConfArguments = Env() + |
236 |
(RabbitMQQueueKeys.Args.xHAPolixy, "all") |
237 |
|
238 |
final val DefaultQueueConf = Env() + |
239 |
(RabbitMQQueueKeys.autoDelete, false) + |
240 |
(RabbitMQQueueKeys.durable, true) + |
241 |
(RabbitMQQueueKeys.exclusive, false) + |
242 |
(RabbitMQQueueKeys.arguments, DefaultQueueConfArguments) |
243 |
|
244 |
final val DefaultChannelConf = Env() + |
245 |
(RabbitMQChannelKeys.qosPrefetchCount, 1) + |
246 |
(RabbitMQChannelKeys.qosPrefetchSize, 0) + |
247 |
(RabbitMQChannelKeys.qosGlobal, false) |
248 |
|
249 |
def makeConnectionConf(props: Props) = { |
250 |
val servers = props.getTrimmedList(RabbitMQConfKeys.servers) |
251 |
val port = props.getIntEx(RabbitMQConfKeys.port) |
252 |
val addresses = servers.map(new Address(_, port)).toArray |
253 |
|
254 |
// TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys |
255 |
// TODO: Normally this means to get rid of Props and use Env everywhere. |
256 |
// TODO: [Will be more type-safe anyway.] |
257 |
Env() + |
258 |
(RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) + |
259 |
(RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) + |
260 |
(RabbitMQConKeys.vhost, props(RabbitMQConfKeys.vhost)) + |
261 |
(RabbitMQConKeys.servers, addresses) + |
262 |
(RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis)) |
263 |
} |
264 |
|
265 |
def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = { |
266 |
val Array(exchange, routing, queue) = oneERQ.split(':') |
267 |
|
268 |
RabbitMQConsumerConf( |
269 |
exchangeName = exchange, |
270 |
routingKey = routing, |
271 |
queueName = queue, |
272 |
connectionConf = makeConnectionConf(props), |
273 |
exchangeConf = DefaultExchangeConf, |
274 |
channelConf = DefaultChannelConf, |
275 |
queueConf = DefaultQueueConf |
276 |
) |
277 |
} |
278 |
|
279 |
object RabbitMQConKeys { |
280 |
final val username = StringKey(p("username")) |
281 |
final val password = StringKey(p("password")) |
282 |
final val vhost = StringKey(p("vhost")) |
283 |
final val servers = ArrayKey[Address](p("servers")) |
284 |
|
285 |
final val reconnect_period_millis = LongKey(p("reconnect.period.millis")) |
286 |
} |
287 |
|
288 |
/** |
289 |
* A [[com.ckkloverdos.key.TypedKey]] for the exchange type |
290 |
*/ |
291 |
final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type") |
292 |
|
293 |
/** |
294 |
* Configuration keys for a `RabbitMQ` exchange. |
295 |
* |
296 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
297 |
*/ |
298 |
object RabbitMQExchangeKeys { |
299 |
final val `type` = RabbitMQExchangeTypedKey |
300 |
final val durable = BooleanKey(p("durable")) |
301 |
final val autoDelete = BooleanKey(p("autoDelete")) |
302 |
final val arguments = EnvKey(p("arguments")) |
303 |
} |
304 |
|
305 |
/** |
306 |
* Configuration keys for a `RabbitMQ` exchange. |
307 |
* |
308 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
309 |
*/ |
310 |
object RabbitMQQueueKeys { |
311 |
final val durable = BooleanKey(p("durable")) |
312 |
final val autoDelete = BooleanKey(p("autoDelete")) |
313 |
final val exclusive = BooleanKey(p("exclusive")) |
314 |
final val arguments = EnvKey(p("arguments")) |
315 |
|
316 |
object Args { |
317 |
// http://www.rabbitmq.com/ha.html |
318 |
// NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ |
319 |
final val xHAPolixy = StringKey("x-ha-policy") |
320 |
} |
321 |
} |
322 |
|
323 |
/** |
324 |
* Configuration keys for a `RabbitMQ` channel. |
325 |
* |
326 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
327 |
*/ |
328 |
object RabbitMQChannelKeys { |
329 |
final val qosPrefetchCount = IntKey(p("qosPrefetchCount")) |
330 |
final val qosPrefetchSize = IntKey(p("qosPrefetchSize")) |
331 |
final val qosGlobal = BooleanKey(p("qosGlobal")) |
332 |
} |
333 |
|
334 |
object RabbitMQConfKeys { |
335 |
/** |
336 |
* How often do we attemot a reconnection? |
337 |
*/ |
338 |
final val reconnect_period_millis = p("reconnect.period.millis") |
339 |
|
340 |
/** |
341 |
* Comma separated list of AMQP servers running in active-active |
342 |
* configuration. |
343 |
*/ |
344 |
final val servers = p("servers") |
345 |
|
346 |
/** |
347 |
* Comma separated list of AMQP servers running in active-active |
348 |
* configuration. |
349 |
*/ |
350 |
final val port = p("port") |
351 |
|
352 |
/** |
353 |
* User name for connecting with the AMQP server |
354 |
*/ |
355 |
final val username = p("username") |
356 |
|
357 |
/** |
358 |
* Password for connecting with the AMQP server |
359 |
*/ |
360 |
final val password = p("passwd") |
361 |
|
362 |
/** |
363 |
* Virtual host on the AMQP server |
364 |
*/ |
365 |
final val vhost = p("vhost") |
366 |
|
367 |
/** |
368 |
* Comma separated list of exchanges known to aquarium |
369 |
* FIXME: What is this?? |
370 |
*/ |
371 |
final val exchange = p("exchange") |
372 |
|
373 |
/** |
374 |
* Queues for retrieving resource events from. Multiple queues can be |
375 |
* declared, separated by comma. |
376 |
* |
377 |
* Format is `exchange:routing.key:queue-name,...` |
378 |
*/ |
379 |
final val rcevents_queues = p("rcevents.queues") |
380 |
|
381 |
/** |
382 |
* Queues for retrieving user events from. Multiple queues can be |
383 |
* declared, separated by comma. |
384 |
* |
385 |
* Format is `exchange:routing.key:queue-name,...` |
386 |
*/ |
387 |
final val imevents_queues = p("imevents.queues") |
388 |
} |
389 |
} |