76b046f558df1797241283e19330c65b96adb681
[aquarium] / src / main / scala / gr / grnet / aquarium / service / FinagleRESTService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message._
55 import com.ckkloverdos.resource.StreamResource
56 import gr.grnet.aquarium.event.model.ExternalEventModel
57 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
58 import akka.dispatch.{Future ⇒ AFuture}
59 import gr.grnet.aquarium.util.json.JsonHelpers
60
61 import com.ckkloverdos.maybe.Failed
62 import gr.grnet.aquarium.actor.message.GetUserStateRequest
63 import com.ckkloverdos.maybe.Just
64 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
65 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
66 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
67
68 /**
69  *
70  * @author Christos KK Loverdos <loverdos@gmail.com>
71  */
72
73 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
74   final val TEXT_PLAIN       = "text/plain"
75   final val APPLICATION_JSON = "application/json"
76
77   @volatile private[this] var _port: Int = _
78   @volatile private[this] var _shutdownTimeoutMillis: Long = _
79   @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
80
81   def propertyPrefix = Some(RESTService.Prefix)
82
83   /**
84    * Configure this instance with the provided properties.
85    *
86    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
87    */
88   def configure(props: Props) {
89     this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
90     this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
91     this._userActorFutureTimeoutMillis = 5000L
92
93     logger.debug("HTTP port is %s".format(this._port))
94   }
95
96   def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
97     val response = new TDefaultHttpResponse(HTTP_1_1, status)
98     response.setContent(copiedBuffer(body, UTF_8))
99     response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
100
101     TFuture.value(response)
102   }
103
104   def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
105     stringResponse(OK, body, contentType)
106   }
107
108   def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
109     stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
110   }
111
112   def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
113     val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
114
115     resource.stringContent.toMaybeEither match {
116       case Just(body) ⇒
117         stringResponseOK(fmt(body), contentType)
118
119       case Failed(e) ⇒
120         throw e
121     }
122   }
123
124   def eventInfoResponse[E <: ExternalEventModel](
125       eventID: String,
126       getter: String ⇒ Option[E]
127   ): TFuture[THttpResponse] = {
128     getter(eventID) match {
129       case Some(event) ⇒
130         stringResponseOK(event.toJsonString, APPLICATION_JSON)
131
132       case None ⇒
133         statusResponse(NOT_FOUND)
134     }
135   }
136
137   final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
138     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
139       service(request) handle {
140         case error ⇒
141           logger.error("While serving %s".format(request), error)
142           val status = INTERNAL_SERVER_ERROR
143           val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
144           errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
145
146           errorResponse
147       }
148     }
149   }
150
151   final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
152     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
153       if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
154         val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
155         aquarium.adminCookie match {
156           case Some(`headerValue`) ⇒
157             service(request)
158
159           case Some(_) ⇒
160             statusResponse(FORBIDDEN)
161
162           case None ⇒
163             statusResponse(FORBIDDEN)
164         }
165       } else {
166         service(request)
167       }
168     }
169   }
170
171   final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
172     def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
173       // We want to asynchronously route the message via akka and get the whole computation as a
174       // twitter future.
175       val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
176       val promise = new TPromise[UserActorResponseMessage[_]]()
177
178       val actualWork = akka.pattern.ask(actorRef, request)(
179           ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
180         asInstanceOf[AFuture[UserActorResponseMessage[_]]]
181
182       actualWork.onComplete {
183         case Left(throwable) ⇒
184           promise.setException(throwable)
185
186         case Right(value) ⇒
187           promise.setValue(value)
188       }
189
190       promise
191     }
192   }
193
194   final case class MainService() extends Service[THttpRequest, THttpResponse] {
195
196     final val actorRouterService = UserActorService()
197
198     def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
199       actorRouterService(requestMessage).transform { tryResponse ⇒
200         tryResponse match {
201           case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
202             logger.debug("{}", responseMessage)
203             logger.debug("{}", responseMessage.responseToJsonString)
204             val statusCode = responseMessage.suggestedHTTPStatus
205             val status = THttpResponseStatus.valueOf(statusCode)
206
207             responseMessage.response match {
208               case Left(errorMessage) ⇒
209                 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
210                   statusCode,
211                   errorMessage,
212                   requestMessage,
213                   responseMessage))
214
215                 stringResponse(status, errorMessage, TEXT_PLAIN)
216
217               case Right(_) ⇒
218                 stringResponse(status, responseMessage.responseToJsonString, APPLICATION_JSON)
219             }
220
221           case TThrow(throwable) ⇒
222             throw throwable
223         }
224       }
225     }
226
227     // FIXME make efficient; this partial function thing is crap for serving requests
228     def apply(request: THttpRequest): TFuture[THttpResponse] = {
229       val millis = TimeHelpers.nowMillis()
230       val uri = request.getUri
231       val method = request.getMethod
232       logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
233
234       type URIPF = PartialFunction[String, TFuture[THttpResponse]]
235       def pong(ok:Boolean) = {
236         val now = TimeHelpers.nowMillis()
237         val nowFormatted = ISODateTimeFormat.dateTime().print(now)
238         val reply = if(ok) "PONG" else "DOWN"
239         stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
240       }
241       val PingHandler: URIPF = {
242         case RESTPaths.AquariumPingPath() ⇒
243           pong(true)
244         case RESTPaths.RabbitMQPingPath() ⇒
245           pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
246         case RESTPaths.IMStorePingPath() ⇒
247           pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
248         case RESTPaths.RCStorePingPath() ⇒
249           pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
250       }
251
252       val UserActorCacheHandler: URIPF = {
253         case RESTPaths.UserActorCacheStatsPath() ⇒
254           val cacheSize = aquarium.akkaService.cacheSize
255           val stats = aquarium.akkaService.cacheStats
256
257           val map = new java.util.LinkedHashMap[String, Any]
258           map.put("cacheSize", cacheSize)
259           map.put("requestCount", stats.requestCount())
260           map.put("hitCount", stats.hitCount())
261           map.put("hitRate", stats.hitRate())
262           map.put("missCount", stats.missCount())
263           map.put("missRate", stats.missRate())
264           map.put("evictionCount", stats.evictionCount())
265           map.put("loadCount", stats.loadCount())
266           map.put("loadSuccessCount", stats.loadSuccessCount())
267           map.put("loadExceptionCount", stats.loadExceptionCount())
268           map.put("loadExceptionRate", stats.loadExceptionRate())
269           map.put("totalLoadTime", stats.totalLoadTime())
270           map.put("averageLoadPenalty", stats.averageLoadPenalty())
271
272           val json = JsonHelpers.jsonStringOfJavaMap(map)
273           stringResponseOK(json, APPLICATION_JSON)
274
275         case RESTPaths.UserActorCacheContentsPath() ⇒
276           val buffer = new scala.collection.mutable.ArrayBuffer[String]()
277           aquarium.akkaService.foreachCachedUserID(buffer.append(_))
278           val output = buffer.sorted.mkString("\n")
279           stringResponseOK(output, TEXT_PLAIN)
280
281         case RESTPaths.UserActorCacheCountPath() ⇒
282           stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
283       }
284
285       val ConfHandler: URIPF = {
286         case RESTPaths.ResourcesPath() ⇒
287           stringResponseOK("%s\n%s\n%s\n" .format(
288             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
289             ResourceLocator.ResourceNames.LOGBACK_XML,
290             ResourceLocator.ResourceNames.POLICY_JSON),
291           TEXT_PLAIN)
292
293         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
294           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
295
296         case RESTPaths.ResourcesLogbackXMLPath() ⇒
297           resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
298
299         case RESTPaths.ResourcesPolicyJSONPath() ⇒
300           resourceInfoResponse(ResourceLocator.Resources.PolicyJSONResource, TEXT_PLAIN)
301       }
302
303       val EventsHandler: URIPF = {
304         case RESTPaths.ResourceEventPath(id) ⇒
305           eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
306
307         case RESTPaths.IMEventPath(id) ⇒
308           eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
309       }
310
311       val UserHandler: URIPF = {
312         case RESTPaths.UserBalancePath(userID) ⇒
313           // /user/(.+)/balance/?
314           callUserActor(GetUserBalanceRequest(userID, millis))
315
316         case RESTPaths.UserStatePath(userID) ⇒
317           // /user/(.+)/state/?
318           callUserActor(GetUserStateRequest(userID, millis))
319
320         case RESTPaths.UserWalletPath(userID) ⇒
321           // /user/(.+)/wallet/?
322           callUserActor(GetUserWalletRequest(userID, millis))
323
324         case RESTPaths.UserBillPath(userID,st1,st2) ⇒
325           val t1 = st1.toLong
326           val t2 = st2.toLong
327           val t = Timeslot(t1,if(t2==0)Long.MaxValue else t2)
328           callUserActor(GetUserBillRequest(userID,t,millis))
329       }
330
331       val DefaultHandler: URIPF = {
332         case _ ⇒
333           statusResponse(NOT_FOUND)
334       }
335
336       val AllHandlers = List(
337         PingHandler,
338         UserActorCacheHandler,
339         ConfHandler,
340         EventsHandler,
341         UserHandler,
342         DefaultHandler
343       )
344
345       val combinedHandler =  AllHandlers.reduceLeft(_ orElse _)
346
347       combinedHandler(uri)
348     }
349   }
350
351   val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
352   lazy val server = ServerBuilder().
353     codec(Http()).
354     bindTo(new InetSocketAddress(this._port)).
355     name("HttpServer").
356     build(service)
357
358   def start(): Unit = {
359     logger.info("Starting HTTP on port %s".format(this._port))
360     // Just for the side effect
361     assert(server ne null)
362   }
363
364   def stop(): Unit = {
365     logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
366     server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))
367   }
368 }