5da6ecd18258218a97da7ddd3008c3270456f383
[aquarium] / src / main / scala / gr / grnet / aquarium / service / FinagleRESTService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, FuturePool ⇒ TFuturePool, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest, RouterResponseMessage, RouterRequestMessage}
55 import gr.grnet.aquarium.actor.RouterRole
56 import com.ckkloverdos.resource.StreamResource
57 import com.ckkloverdos.maybe.{Just, Failed}
58 import gr.grnet.aquarium.event.model.ExternalEventModel
59
60 /**
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  */
64
65 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
66   final val TEXT_PLAIN       = "text/plain"
67   final val APPLICATION_JSON = "application/json"
68
69   @volatile private[this] var _port: Int = 8080
70   @volatile private[this] var _shutdownTimeoutMillis: Long = 2000
71   @volatile private[this] var _threadPoolSize: Int = 4
72   @volatile private[this] var _threadPool: TFuturePool = _
73
74   def propertyPrefix = Some(RESTService.Prefix)
75
76   /**
77    * Configure this instance with the provided properties.
78    *
79    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
80    */
81   def configure(props: Props) {
82     this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
83     this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
84
85     this._threadPool = TFuturePool(Executors.newFixedThreadPool(this._threadPoolSize))
86
87     logger.debug("HTTP port is %s".format(this._port))
88   }
89
90   def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
91     val response = new TDefaultHttpResponse(HTTP_1_1, status)
92     response.setContent(copiedBuffer(body, UTF_8))
93     response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
94
95     TFuture.value(response)
96   }
97
98   def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
99     stringResponse(OK, body, contentType)
100   }
101
102   def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
103     stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
104   }
105
106   def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
107     val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
108
109     resource.stringContent.toMaybeEither match {
110       case Just(body) ⇒
111         stringResponseOK(fmt(body), contentType)
112
113       case Failed(e) ⇒
114         throw e
115     }
116   }
117
118   def eventInfoResponse[E <: ExternalEventModel](
119       eventID: String,
120       getter: String ⇒ Option[E]
121   ): TFuture[THttpResponse] = {
122     getter(eventID) match {
123       case Some(event) ⇒
124         stringResponseOK(event.toJsonString, APPLICATION_JSON)
125
126       case None ⇒
127         statusResponse(NOT_FOUND)
128     }
129   }
130
131   final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
132     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
133       service(request) handle {
134         case error ⇒
135           logger.error("While serving %s".format(request), error)
136           val statusCode = error match {
137             case _: IllegalArgumentException ⇒
138               FORBIDDEN
139             case _ ⇒
140               INTERNAL_SERVER_ERROR
141           }
142
143           val errorResponse = new TDefaultHttpResponse(HTTP_1_1, statusCode)
144           errorResponse.setContent(copiedBuffer(error.getStackTraceString, UTF_8))
145
146           errorResponse
147       }
148     }
149   }
150
151   final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
152     def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
153       if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
154         val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
155         aquarium.adminCookie match {
156           case Some(`headerValue`) ⇒
157             service(request)
158
159           case None ⇒
160             statusResponse(FORBIDDEN)
161         }
162       } else {
163         service(request)
164       }
165     }
166   }
167
168   final case class MainService() extends Service[THttpRequest, THttpResponse] {
169     final class ActorRouterService extends Service[RouterRequestMessage, RouterResponseMessage[_]] {
170       def apply(message: RouterRequestMessage): TFuture[RouterResponseMessage[_]] = {
171         // We want to asynchronously route the message via akka and get the whole computation as a
172         // twitter future.
173         val actorProvider = aquarium.actorProvider
174         val router = actorProvider.actorForRole(RouterRole)
175         val promise = new TPromise[RouterResponseMessage[_]]()
176
177         val actualWork = router.ask(message)
178
179         actualWork onComplete { akkaFuture ⇒
180           akkaFuture.value match {
181             case Some(eitherValue) ⇒
182               eitherValue match {
183                 case Left(throwable) ⇒
184                   promise.setException(throwable)
185
186                 case Right(value) ⇒
187                   promise.setValue(value.asInstanceOf[RouterResponseMessage[_]])
188               }
189
190             case None ⇒
191               promise.setException(new Exception("Got no response for %s".format(message)))
192           }
193         }
194
195         promise
196       }
197     }
198
199     final val actorRouterService = new ActorRouterService
200
201     def callRouter(requestMessage: RouterRequestMessage): TFuture[THttpResponse] = {
202       actorRouterService(requestMessage).transform { tryResponse ⇒
203         tryResponse match {
204           case TReturn(responseMessage: RouterResponseMessage[_]) ⇒
205             val statusCode = responseMessage.suggestedHTTPStatus
206             val status = THttpResponseStatus.valueOf(statusCode)
207
208             responseMessage.response match {
209               case Left(errorMessage) ⇒
210                 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
211                   statusCode,
212                   errorMessage,
213                   requestMessage,
214                   responseMessage))
215
216                 stringResponse(status, errorMessage, TEXT_PLAIN)
217
218               case Right(_) ⇒
219                 stringResponse(status, responseMessage.toJsonString, APPLICATION_JSON)
220             }
221
222           case TThrow(throwable) ⇒
223             val status = INTERNAL_SERVER_ERROR
224             logger.error("Error %s serving %s: %s".format(
225               status.getReasonPhrase,
226               requestMessage,
227               gr.grnet.aquarium.util.shortInfoOf(throwable)
228             ))
229
230             statusResponse(status)
231         }
232       }
233     }
234
235     def apply(request: THttpRequest): TFuture[THttpResponse] = {
236       val millis = TimeHelpers.nowMillis()
237       val uri = request.getUri
238       val method = request.getMethod
239       logger.debug("%s %s".format(method, uri))
240
241       uri match {
242         case RESTPaths.PingPath() ⇒
243           val now = TimeHelpers.nowMillis()
244           val nowFormatted = ISODateTimeFormat.dateTime().print(now)
245           stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
246
247 //        case RESTPaths.ResourcesPath() ⇒
248 //          stringResponseOK("%s\n%s\n%s\n" .format(
249 //            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
250 //            ResourceLocator.ResourceNames.LOGBACK_XML,
251 //            ResourceLocator.ResourceNames.POLICY_YAML),
252 //          TEXT_PLAIN)
253
254         case RESTPaths.UserBalancePath(userID) ⇒
255           // /user/(.+)/balance/?
256           callRouter(GetUserBalanceRequest(userID, millis))
257
258         case RESTPaths.UserStatePath(userId) ⇒
259           // /user/(.+)/state/?
260           callRouter(GetUserStateRequest(userId, millis))
261
262         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
263           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
264
265         case RESTPaths.ResourcesLogbackXMLPath() ⇒
266           resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
267
268         case RESTPaths.ResourcesPolicyYAMLPath() ⇒
269           resourceInfoResponse(ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
270
271         case RESTPaths.ResourceEventPath(id) ⇒
272           eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
273
274         case RESTPaths.IMEventPath(id) ⇒
275           eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
276
277         case _ ⇒
278           statusResponse(NOT_FOUND)
279       }
280     }
281   }
282
283   val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
284   lazy val server = ServerBuilder().
285     codec(Http()).
286     bindTo(new InetSocketAddress(this._port)).
287     name("HttpServer").
288     build(service)
289
290   def start(): Unit = {
291     logger.info("Starting HTTP on port %s".format(this._port))
292     // Just for the side effect
293     assert(server ne null)
294   }
295
296   def stop(): Unit = {
297     logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
298     server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))
299   }
300 }