Now using schema-based messages
[aquarium] / src / main / scala / gr / grnet / aquarium / service / FinagleRESTService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message._
55 import com.ckkloverdos.resource.StreamResource
56 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
57 import akka.dispatch.{Future ⇒ AFuture}
58 import gr.grnet.aquarium.util.json.JsonHelpers
59
60 import com.ckkloverdos.maybe.Failed
61 import gr.grnet.aquarium.actor.message.GetUserStateRequest
62 import com.ckkloverdos.maybe.Just
63 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
64 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
65 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
66 import org.apache.avro.specific.SpecificRecord
67 import gr.grnet.aquarium.message.avro.AvroHelpers
68
69 /**
70  *
71  * @author Christos KK Loverdos <loverdos@gmail.com>
72  */
73
74 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
75   final val TEXT_PLAIN       = "text/plain"
76   final val APPLICATION_JSON = "application/json"
77
78   @volatile private[this] var _port: Int = _
79   @volatile private[this] var _shutdownTimeoutMillis: Long = _
80   @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
81
82   def propertyPrefix = Some(RESTService.Prefix)
83
84   /**
85    * Configure this instance with the provided properties.
86    *
87    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
88    */
89   def configure(props: Props) {
90     this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
91     this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
92     this._userActorFutureTimeoutMillis = 5000L
93
94     logger.debug("HTTP port is %s".format(this._port))
95   }
96
97   def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
98     val response = new TDefaultHttpResponse(HTTP_1_1, status)
99     response.setContent(copiedBuffer(body, UTF_8))
100     response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
101
102     TFuture.value(response)
103   }
104
105   def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
106     stringResponse(OK, body, contentType)
107   }
108
109   def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
110     stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
111   }
112
113   def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
114     val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
115
116     resource.stringContent.toMaybeEither match {
117       case Just(body) ⇒
118         stringResponseOK(fmt(body), contentType)
119
120       case Failed(e) ⇒
121         throw e
122     }
123   }
124
125   def eventInfoResponse[R <: SpecificRecord](
126       eventID: String,
127       getter: String ⇒ Option[R]
128   ): TFuture[THttpResponse] = {
129     getter(eventID) match {
130       case Some(event) ⇒
131         stringResponseOK(AvroHelpers.jsonStringOfSpecificRecord(event), APPLICATION_JSON)
132
133       case None ⇒
134         statusResponse(NOT_FOUND)
135     }
136   }
137
138   final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
139     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
140       service(request) handle {
141         case error ⇒
142           logger.error("While serving %s".format(request), error)
143           val status = INTERNAL_SERVER_ERROR
144           val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
145           errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
146
147           errorResponse
148       }
149     }
150   }
151
152   final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
153     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
154       if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
155         val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
156         aquarium.adminCookie match {
157           case Some(`headerValue`) ⇒
158             service(request)
159
160           case Some(_) ⇒
161             statusResponse(FORBIDDEN)
162
163           case None ⇒
164             statusResponse(FORBIDDEN)
165         }
166       } else {
167         service(request)
168       }
169     }
170   }
171
172   final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
173     def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
174       // We want to asynchronously route the message via akka and get the whole computation as a
175       // twitter future.
176       val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
177       val promise = new TPromise[UserActorResponseMessage[_]]()
178
179       val actualWork = akka.pattern.ask(actorRef, request)(
180           ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
181         asInstanceOf[AFuture[UserActorResponseMessage[_]]]
182
183       actualWork.onComplete {
184         case Left(throwable) ⇒
185           promise.setException(throwable)
186
187         case Right(value) ⇒
188           promise.setValue(value)
189       }
190
191       promise
192     }
193   }
194
195   final case class MainService() extends Service[THttpRequest, THttpResponse] {
196
197     final val actorRouterService = UserActorService()
198
199     def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
200       actorRouterService(requestMessage).transform { tryResponse ⇒
201         tryResponse match {
202           case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
203             logger.debug("{}", responseMessage)
204             logger.debug("{}", responseMessage.responseToJsonString)
205             val statusCode = responseMessage.suggestedHTTPStatus
206             val status = THttpResponseStatus.valueOf(statusCode)
207
208             responseMessage.response match {
209               case Left(errorMessage) ⇒
210                 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
211                   statusCode,
212                   errorMessage,
213                   requestMessage,
214                   responseMessage))
215
216                 stringResponse(status, errorMessage, TEXT_PLAIN)
217
218               case Right(_) ⇒
219                 stringResponse(status, responseMessage.responseToJsonString, APPLICATION_JSON)
220             }
221
222           case TThrow(throwable) ⇒
223             throw throwable
224         }
225       }
226     }
227
228     // FIXME make efficient; this partial function thing is crap for serving requests
229     def apply(request: THttpRequest): TFuture[THttpResponse] = {
230       val millis = TimeHelpers.nowMillis()
231       val uri = request.getUri
232       val method = request.getMethod
233       logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
234
235       type URIPF = PartialFunction[String, TFuture[THttpResponse]]
236       def pong(ok:Boolean) = {
237         val now = TimeHelpers.nowMillis()
238         val nowFormatted = ISODateTimeFormat.dateTime().print(now)
239         val reply = if(ok) "PONG" else "DOWN"
240         stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
241       }
242       val PingHandler: URIPF = {
243         case RESTPaths.AquariumPingPath() ⇒
244           pong(true)
245         case RESTPaths.RabbitMQPingPath() ⇒
246           pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
247         case RESTPaths.IMStorePingPath() ⇒
248           pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
249         case RESTPaths.RCStorePingPath() ⇒
250           pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
251       }
252
253       val UserActorCacheHandler: URIPF = {
254         case RESTPaths.UserActorCacheStatsPath() ⇒
255           val cacheSize = aquarium.akkaService.cacheSize
256           val stats = aquarium.akkaService.cacheStats
257
258           val map = new java.util.LinkedHashMap[String, Any]
259           map.put("cacheSize", cacheSize)
260           map.put("requestCount", stats.requestCount())
261           map.put("hitCount", stats.hitCount())
262           map.put("hitRate", stats.hitRate())
263           map.put("missCount", stats.missCount())
264           map.put("missRate", stats.missRate())
265           map.put("evictionCount", stats.evictionCount())
266           map.put("loadCount", stats.loadCount())
267           map.put("loadSuccessCount", stats.loadSuccessCount())
268           map.put("loadExceptionCount", stats.loadExceptionCount())
269           map.put("loadExceptionRate", stats.loadExceptionRate())
270           map.put("totalLoadTime", stats.totalLoadTime())
271           map.put("averageLoadPenalty", stats.averageLoadPenalty())
272
273           val json = JsonHelpers.jsonStringOfJavaMap(map)
274           stringResponseOK(json, APPLICATION_JSON)
275
276         case RESTPaths.UserActorCacheContentsPath() ⇒
277           val buffer = new scala.collection.mutable.ArrayBuffer[String]()
278           aquarium.akkaService.foreachCachedUserID(buffer.append(_))
279           val output = buffer.sorted.mkString("\n")
280           stringResponseOK(output, TEXT_PLAIN)
281
282         case RESTPaths.UserActorCacheCountPath() ⇒
283           stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
284       }
285
286       val ConfHandler: URIPF = {
287         case RESTPaths.ResourcesPath() ⇒
288           stringResponseOK("%s\n%s\n%s\n" .format(
289             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
290             ResourceLocator.ResourceNames.LOGBACK_XML,
291             ResourceLocator.ResourceNames.POLICY_JSON),
292           TEXT_PLAIN)
293
294         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
295           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
296
297         case RESTPaths.ResourcesLogbackXMLPath() ⇒
298           resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
299
300         case RESTPaths.ResourcesPolicyJSONPath() ⇒
301           resourceInfoResponse(ResourceLocator.Resources.PolicyJSONResource, TEXT_PLAIN)
302       }
303
304       val EventsHandler: URIPF = {
305         case RESTPaths.ResourceEventPath(id) ⇒
306           eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
307
308         case RESTPaths.IMEventPath(id) ⇒
309           eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
310       }
311
312       val UserHandler: URIPF = {
313         case RESTPaths.UserBalancePath(userID) ⇒
314           // /user/(.+)/balance/?
315           callUserActor(GetUserBalanceRequest(userID, millis))
316
317         case RESTPaths.UserStatePath(userID) ⇒
318           // /user/(.+)/state/?
319           callUserActor(GetUserStateRequest(userID, millis))
320
321         case RESTPaths.UserWalletPath(userID) ⇒
322           // /user/(.+)/wallet/?
323           callUserActor(GetUserWalletRequest(userID, millis))
324
325         case RESTPaths.UserBillPath(userID,st1,st2) ⇒
326           val t1 = st1.toLong
327           val t2 = st2.toLong
328           val t = Timeslot(t1,if(t2==0)Long.MaxValue else t2)
329           callUserActor(GetUserBillRequest(userID,t,millis))
330       }
331
332       val DefaultHandler: URIPF = {
333         case _ ⇒
334           statusResponse(NOT_FOUND)
335       }
336
337       val AllHandlers = List(
338         PingHandler,
339         UserActorCacheHandler,
340         ConfHandler,
341         EventsHandler,
342         UserHandler,
343         DefaultHandler
344       )
345
346       val combinedHandler =  AllHandlers.reduceLeft(_ orElse _)
347
348       combinedHandler(uri)
349     }
350   }
351
352   val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
353   lazy val server = ServerBuilder().
354     codec(Http()).
355     bindTo(new InetSocketAddress(this._port)).
356     name("HttpServer").
357     build(service)
358
359   def start(): Unit = {
360     logger.info("Starting HTTP on port %s".format(this._port))
361     // Just for the side effect
362     assert(server ne null)
363   }
364
365   def stop(): Unit = {
366     logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
367     server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))
368   }
369 }