RabbitMQProducer handles Nacks. UserActor accepts user balance additions from Astakos...
[aquarium] / src / main / scala / gr / grnet / aquarium / service / FinagleRESTService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, FuturePool ⇒ TFuturePool, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message._
55 import com.ckkloverdos.resource.StreamResource
56 import com.ckkloverdos.maybe.{Just, Failed}
57 import gr.grnet.aquarium.event.model.ExternalEventModel
58 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
59 import akka.dispatch.{Future ⇒ AFuture}
60 import com.fasterxml.jackson.databind.ObjectMapper
61 import java.util
62 import scala.Left
63 import scala.Some
64 import com.ckkloverdos.maybe.Failed
65 import gr.grnet.aquarium.actor.message.GetUserStateRequest
66 import scala.Right
67 import com.ckkloverdos.maybe.Just
68 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
69 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
70 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
71
72 /**
73  *
74  * @author Christos KK Loverdos <loverdos@gmail.com>
75  */
76
77 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
78   final val TEXT_PLAIN       = "text/plain"
79   final val APPLICATION_JSON = "application/json"
80
81   @volatile private[this] var _port: Int = _
82   @volatile private[this] var _shutdownTimeoutMillis: Long = _
83   @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
84
85   def propertyPrefix = Some(RESTService.Prefix)
86
87   /**
88    * Configure this instance with the provided properties.
89    *
90    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
91    */
92   def configure(props: Props) {
93     this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
94     this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
95     this._userActorFutureTimeoutMillis = 5000L
96
97     logger.debug("HTTP port is %s".format(this._port))
98   }
99
100   def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
101     val response = new TDefaultHttpResponse(HTTP_1_1, status)
102     response.setContent(copiedBuffer(body, UTF_8))
103     response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
104
105     TFuture.value(response)
106   }
107
108   def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
109     stringResponse(OK, body, contentType)
110   }
111
112   def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
113     stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
114   }
115
116   def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
117     val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
118
119     resource.stringContent.toMaybeEither match {
120       case Just(body) ⇒
121         stringResponseOK(fmt(body), contentType)
122
123       case Failed(e) ⇒
124         throw e
125     }
126   }
127
128   def eventInfoResponse[E <: ExternalEventModel](
129       eventID: String,
130       getter: String ⇒ Option[E]
131   ): TFuture[THttpResponse] = {
132     getter(eventID) match {
133       case Some(event) ⇒
134         stringResponseOK(event.toJsonString, APPLICATION_JSON)
135
136       case None ⇒
137         statusResponse(NOT_FOUND)
138     }
139   }
140
141   final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
142     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
143       service(request) handle {
144         case error ⇒
145           logger.error("While serving %s".format(request), error)
146           val status = INTERNAL_SERVER_ERROR
147           val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
148           errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
149
150           errorResponse
151       }
152     }
153   }
154
155   final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
156     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
157       if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
158         val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
159         aquarium.adminCookie match {
160           case Some(`headerValue`) ⇒
161             service(request)
162
163           case Some(_) ⇒
164             statusResponse(FORBIDDEN)
165
166           case None ⇒
167             statusResponse(FORBIDDEN)
168         }
169       } else {
170         service(request)
171       }
172     }
173   }
174
175   final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
176     def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
177       // We want to asynchronously route the message via akka and get the whole computation as a
178       // twitter future.
179       val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
180       val promise = new TPromise[UserActorResponseMessage[_]]()
181
182       val actualWork = akka.pattern.ask(actorRef, request)(
183           ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
184         asInstanceOf[AFuture[UserActorResponseMessage[_]]]
185
186       actualWork.onComplete {
187         case Left(throwable) ⇒
188           promise.setException(throwable)
189
190         case Right(value) ⇒
191           promise.setValue(value)
192       }
193
194       promise
195     }
196   }
197
198   final case class MainService() extends Service[THttpRequest, THttpResponse] {
199
200     final val actorRouterService = UserActorService()
201
202     def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
203       actorRouterService(requestMessage).transform { tryResponse ⇒
204         tryResponse match {
205           case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
206             logger.debug("{}", responseMessage)
207             logger.debug("{}", responseMessage.responseToJsonString)
208             val statusCode = responseMessage.suggestedHTTPStatus
209             val status = THttpResponseStatus.valueOf(statusCode)
210
211             responseMessage.response match {
212               case Left(errorMessage) ⇒
213                 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
214                   statusCode,
215                   errorMessage,
216                   requestMessage,
217                   responseMessage))
218
219                 stringResponse(status, errorMessage, TEXT_PLAIN)
220
221               case Right(_) ⇒
222                 stringResponse(status, responseMessage.responseToJsonString, APPLICATION_JSON)
223             }
224
225           case TThrow(throwable) ⇒
226             throw throwable
227         }
228       }
229     }
230
231     // FIXME make efficient; this partial function thing is crap for serving requests
232     def apply(request: THttpRequest): TFuture[THttpResponse] = {
233       val millis = TimeHelpers.nowMillis()
234       val uri = request.getUri
235       val method = request.getMethod
236       logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
237
238       type URIPF = PartialFunction[String, TFuture[THttpResponse]]
239       def pong(ok:Boolean) = {
240         val now = TimeHelpers.nowMillis()
241         val nowFormatted = ISODateTimeFormat.dateTime().print(now)
242         val reply = if(ok) "PONG" else "DOWN"
243         stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
244       }
245       val PingHandler: URIPF = {
246         case RESTPaths.AquariumPingPath() ⇒
247           pong(true)
248         case RESTPaths.RabbitMQPingPath() ⇒
249           pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
250         case RESTPaths.IMStorePingPath() ⇒
251           pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
252         case RESTPaths.RCStorePingPath() ⇒
253           pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
254       }
255
256       val UserActorCacheHandler: URIPF = {
257         case RESTPaths.UserActorCacheStatsPath() ⇒
258           val cacheSize = aquarium.akkaService.cacheSize
259           val stats = aquarium.akkaService.cacheStats
260
261           val mapper = new ObjectMapper()
262           val writer = mapper.writerWithDefaultPrettyPrinter()
263           
264           val map = new java.util.LinkedHashMap[String, Any]
265           map.put("cacheSize", cacheSize)
266           map.put("requestCount", stats.requestCount())
267           map.put("hitCount", stats.hitCount())
268           map.put("hitRate", stats.hitRate())
269           map.put("missCount", stats.missCount())
270           map.put("missRate", stats.missRate())
271           map.put("evictionCount", stats.evictionCount())
272           map.put("loadCount", stats.loadCount())
273           map.put("loadSuccessCount", stats.loadSuccessCount())
274           map.put("loadExceptionCount", stats.loadExceptionCount())
275           map.put("loadExceptionRate", stats.loadExceptionRate())
276           map.put("totalLoadTime", stats.totalLoadTime())
277           map.put("averageLoadPenalty", stats.averageLoadPenalty())
278
279           val json = writer.writeValueAsString(map)
280           stringResponseOK(json, APPLICATION_JSON)
281
282
283         case RESTPaths.UserActorCacheContentsPath() ⇒
284           val buffer = new scala.collection.mutable.ArrayBuffer[String]()
285           aquarium.akkaService.foreachCachedUserID(buffer.append(_))
286           val output = buffer.sorted.mkString("\n")
287           stringResponseOK(output, TEXT_PLAIN)
288
289         case RESTPaths.UserActorCacheCountPath() ⇒
290           stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
291       }
292
293       val ConfHandler: URIPF = {
294         case RESTPaths.ResourcesPath() ⇒
295           stringResponseOK("%s\n%s\n%s\n" .format(
296             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
297             ResourceLocator.ResourceNames.LOGBACK_XML,
298             ResourceLocator.ResourceNames.POLICY_JSON),
299           TEXT_PLAIN)
300
301         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
302           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
303
304         case RESTPaths.ResourcesLogbackXMLPath() ⇒
305           resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
306
307         case RESTPaths.ResourcesPolicyJSONPath() ⇒
308           resourceInfoResponse(ResourceLocator.Resources.PolicyJSONResource, TEXT_PLAIN)
309       }
310
311       val EventsHandler: URIPF = {
312         case RESTPaths.ResourceEventPath(id) ⇒
313           eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
314
315         case RESTPaths.IMEventPath(id) ⇒
316           eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
317       }
318
319       val UserHandler: URIPF = {
320         case RESTPaths.UserBalancePath(userID) ⇒
321           // /user/(.+)/balance/?
322           callUserActor(GetUserBalanceRequest(userID, millis))
323
324         case RESTPaths.UserStatePath(userID) ⇒
325           // /user/(.+)/state/?
326           callUserActor(GetUserStateRequest(userID, millis))
327
328         case RESTPaths.UserWalletPath(userID) ⇒
329           // /user/(.+)/wallet/?
330           callUserActor(GetUserWalletRequest(userID, millis))
331
332         case RESTPaths.UserBillPath(userID,st1,st2) ⇒
333           val t1 = st1.toLong
334           val t2 = st2.toLong
335           val t = Timeslot(t1,if(t2==0)Long.MaxValue else t2)
336           callUserActor(GetUserBillRequest(userID,t,millis))
337       }
338
339       val DefaultHandler: URIPF = {
340         case _ ⇒
341           statusResponse(NOT_FOUND)
342       }
343
344       val AllHandlers = List(
345         PingHandler,
346         UserActorCacheHandler,
347         ConfHandler,
348         EventsHandler,
349         UserHandler,
350         DefaultHandler
351       )
352
353       val combinedHandler =  AllHandlers.reduceLeft(_ orElse _)
354
355       combinedHandler(uri)
356     }
357   }
358
359   val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
360   lazy val server = ServerBuilder().
361     codec(Http()).
362     bindTo(new InetSocketAddress(this._port)).
363     name("HttpServer").
364     build(service)
365
366   def start(): Unit = {
367     logger.info("Starting HTTP on port %s".format(this._port))
368     // Just for the side effect
369     assert(server ne null)
370   }
371
372   def stop(): Unit = {
373     logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
374     server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))
375   }
376 }