2602e0fd148bb4d92835eccdbe2c5e68bae5732c
[aquarium] / src / main / scala / gr / grnet / aquarium / service / FinagleRESTService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, FuturePool ⇒ TFuturePool, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest, UserActorResponseMessage}
55 import com.ckkloverdos.resource.StreamResource
56 import com.ckkloverdos.maybe.{Just, Failed}
57 import gr.grnet.aquarium.event.model.ExternalEventModel
58 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
59 import akka.dispatch.{Future ⇒ AFuture}
60 import com.fasterxml.jackson.databind.ObjectMapper
61 import java.util
62
63 /**
64  *
65  * @author Christos KK Loverdos <loverdos@gmail.com>
66  */
67
68 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
69   final val TEXT_PLAIN       = "text/plain"
70   final val APPLICATION_JSON = "application/json"
71
72   @volatile private[this] var _port: Int = _
73   @volatile private[this] var _shutdownTimeoutMillis: Long = _
74   @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
75
76   def propertyPrefix = Some(RESTService.Prefix)
77
78   /**
79    * Configure this instance with the provided properties.
80    *
81    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
82    */
83   def configure(props: Props) {
84     this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
85     this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
86     this._userActorFutureTimeoutMillis = 5000L
87
88     logger.debug("HTTP port is %s".format(this._port))
89   }
90
91   def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
92     val response = new TDefaultHttpResponse(HTTP_1_1, status)
93     response.setContent(copiedBuffer(body, UTF_8))
94     response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
95
96     TFuture.value(response)
97   }
98
99   def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
100     stringResponse(OK, body, contentType)
101   }
102
103   def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
104     stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
105   }
106
107   def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
108     val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
109
110     resource.stringContent.toMaybeEither match {
111       case Just(body) ⇒
112         stringResponseOK(fmt(body), contentType)
113
114       case Failed(e) ⇒
115         throw e
116     }
117   }
118
119   def eventInfoResponse[E <: ExternalEventModel](
120       eventID: String,
121       getter: String ⇒ Option[E]
122   ): TFuture[THttpResponse] = {
123     getter(eventID) match {
124       case Some(event) ⇒
125         stringResponseOK(event.toJsonString, APPLICATION_JSON)
126
127       case None ⇒
128         statusResponse(NOT_FOUND)
129     }
130   }
131
132   final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
133     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
134       service(request) handle {
135         case error ⇒
136           logger.error("While serving %s".format(request), error)
137           val status = INTERNAL_SERVER_ERROR
138           val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
139           errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
140
141           errorResponse
142       }
143     }
144   }
145
146   final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
147     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
148       if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
149         val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
150         aquarium.adminCookie match {
151           case Some(`headerValue`) ⇒
152             service(request)
153
154           case Some(_) ⇒
155             statusResponse(FORBIDDEN)
156
157           case None ⇒
158             statusResponse(FORBIDDEN)
159         }
160       } else {
161         service(request)
162       }
163     }
164   }
165
166   final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
167     def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
168       // We want to asynchronously route the message via akka and get the whole computation as a
169       // twitter future.
170       val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
171       val promise = new TPromise[UserActorResponseMessage[_]]()
172
173       val actualWork = akka.pattern.ask(actorRef, request)(
174           ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
175         asInstanceOf[AFuture[UserActorResponseMessage[_]]]
176
177       actualWork.onComplete {
178         case Left(throwable) ⇒
179           promise.setException(throwable)
180
181         case Right(value) ⇒
182           promise.setValue(value)
183       }
184
185       promise
186     }
187   }
188
189   final case class MainService() extends Service[THttpRequest, THttpResponse] {
190
191     final val actorRouterService = UserActorService()
192
193     def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
194       actorRouterService(requestMessage).transform { tryResponse ⇒
195         tryResponse match {
196           case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
197             val statusCode = responseMessage.suggestedHTTPStatus
198             val status = THttpResponseStatus.valueOf(statusCode)
199
200             responseMessage.response match {
201               case Left(errorMessage) ⇒
202                 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
203                   statusCode,
204                   errorMessage,
205                   requestMessage,
206                   responseMessage))
207
208                 stringResponse(status, errorMessage, TEXT_PLAIN)
209
210               case Right(_) ⇒
211                 stringResponse(status, responseMessage.toJsonString, APPLICATION_JSON)
212             }
213
214           case TThrow(throwable) ⇒
215             throw throwable
216         }
217       }
218     }
219
220     // FIXME make efficient; this partial function thing is crap for serving requests
221     def apply(request: THttpRequest): TFuture[THttpResponse] = {
222       val millis = TimeHelpers.nowMillis()
223       val uri = request.getUri
224       val method = request.getMethod
225       logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
226
227       type URIPF = PartialFunction[String, TFuture[THttpResponse]]
228
229       val PingHandler: URIPF = {
230         case RESTPaths.PingPath() ⇒
231           val now = TimeHelpers.nowMillis()
232           val nowFormatted = ISODateTimeFormat.dateTime().print(now)
233           stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
234       }
235
236       val UserActorCacheHandler: URIPF = {
237         case RESTPaths.UserActorCacheStatsPath() ⇒
238           val cacheSize = aquarium.akkaService.cacheSize
239           val stats = aquarium.akkaService.cacheStats
240
241           val mapper = new ObjectMapper()
242           val writer = mapper.writerWithDefaultPrettyPrinter()
243           
244           val map = new java.util.LinkedHashMap[String, Any]
245           map.put("cacheSize", cacheSize)
246           map.put("requestCount", stats.requestCount())
247           map.put("hitCount", stats.hitCount())
248           map.put("hitRate", stats.hitRate())
249           map.put("missCount", stats.missCount())
250           map.put("missRate", stats.missRate())
251           map.put("evictionCount", stats.evictionCount())
252           map.put("loadCount", stats.loadCount())
253           map.put("loadSuccessCount", stats.loadSuccessCount())
254           map.put("loadExceptionCount", stats.loadExceptionCount())
255           map.put("loadExceptionRate", stats.loadExceptionRate())
256           map.put("totalLoadTime", stats.totalLoadTime())
257           map.put("averageLoadPenalty", stats.averageLoadPenalty())
258
259           val json = writer.writeValueAsString(map)
260           stringResponseOK(json, APPLICATION_JSON)
261
262
263         case RESTPaths.UserActorCacheContentsPath() ⇒
264           val buffer = new scala.collection.mutable.ArrayBuffer[String]()
265           aquarium.akkaService.foreachCachedUserID(buffer.append(_))
266           val output = buffer.sorted.mkString("\n")
267           stringResponseOK(output, TEXT_PLAIN)
268
269         case RESTPaths.UserActorCacheCountPath() ⇒
270           stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
271       }
272
273       val ConfHandler: URIPF = {
274         case RESTPaths.ResourcesPath() ⇒
275           stringResponseOK("%s\n%s\n%s\n" .format(
276             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
277             ResourceLocator.ResourceNames.LOGBACK_XML,
278             ResourceLocator.ResourceNames.POLICY_YAML),
279           TEXT_PLAIN)
280
281         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
282           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
283
284         case RESTPaths.ResourcesLogbackXMLPath() ⇒
285           resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
286
287         case RESTPaths.ResourcesPolicyYAMLPath() ⇒
288           resourceInfoResponse(ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
289       }
290
291       val EventsHandler: URIPF = {
292         case RESTPaths.ResourceEventPath(id) ⇒
293           eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
294
295         case RESTPaths.IMEventPath(id) ⇒
296           eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
297       }
298
299       val UserHandler: URIPF = {
300         case RESTPaths.UserBalancePath(userID) ⇒
301           // /user/(.+)/balance/?
302           callUserActor(GetUserBalanceRequest(userID, millis))
303
304         case RESTPaths.UserStatePath(userId) ⇒
305           // /user/(.+)/state/?
306           callUserActor(GetUserStateRequest(userId, millis))
307       }
308
309       val DefaultHandler: URIPF = {
310         case _ ⇒
311           statusResponse(NOT_FOUND)
312       }
313
314       val AllHandlers = List(
315         PingHandler,
316         UserActorCacheHandler,
317         ConfHandler,
318         EventsHandler,
319         UserHandler,
320         DefaultHandler
321       )
322
323       val combinedHandler =  AllHandlers.reduceLeft(_ orElse _)
324
325       combinedHandler(uri)
326     }
327   }
328
329   val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
330   lazy val server = ServerBuilder().
331     codec(Http()).
332     bindTo(new InetSocketAddress(this._port)).
333     name("HttpServer").
334     build(service)
335
336   def start(): Unit = {
337     logger.info("Starting HTTP on port %s".format(this._port))
338     // Just for the side effect
339     assert(server ne null)
340   }
341
342   def stop(): Unit = {
343     logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
344     server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))
345   }
346 }