2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, FuturePool ⇒ TFuturePool, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest, UserActorResponseMessage}
55 import com.ckkloverdos.resource.StreamResource
56 import com.ckkloverdos.maybe.{Just, Failed}
57 import gr.grnet.aquarium.event.model.ExternalEventModel
58 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
59 import akka.dispatch.{Future ⇒ AFuture}
60 import com.fasterxml.jackson.databind.ObjectMapper
65 * @author Christos KK Loverdos <loverdos@gmail.com>
68 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
69 final val TEXT_PLAIN = "text/plain"
70 final val APPLICATION_JSON = "application/json"
72 @volatile private[this] var _port: Int = _
73 @volatile private[this] var _shutdownTimeoutMillis: Long = _
74 @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
76 def propertyPrefix = Some(RESTService.Prefix)
79 * Configure this instance with the provided properties.
81 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
83 def configure(props: Props) {
84 this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
85 this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
86 this._userActorFutureTimeoutMillis = 5000L
88 logger.debug("HTTP port is %s".format(this._port))
91 def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
92 val response = new TDefaultHttpResponse(HTTP_1_1, status)
93 response.setContent(copiedBuffer(body, UTF_8))
94 response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
96 TFuture.value(response)
99 def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
100 stringResponse(OK, body, contentType)
103 def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
104 stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
107 def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
108 val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
110 resource.stringContent.toMaybeEither match {
112 stringResponseOK(fmt(body), contentType)
119 def eventInfoResponse[E <: ExternalEventModel](
121 getter: String ⇒ Option[E]
122 ): TFuture[THttpResponse] = {
123 getter(eventID) match {
125 stringResponseOK(event.toJsonString, APPLICATION_JSON)
128 statusResponse(NOT_FOUND)
132 final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
133 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
134 service(request) handle {
136 logger.error("While serving %s".format(request), error)
137 val status = INTERNAL_SERVER_ERROR
138 val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
139 errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
146 final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
147 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
148 if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
149 val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
150 aquarium.adminCookie match {
151 case Some(`headerValue`) ⇒
155 statusResponse(FORBIDDEN)
158 statusResponse(FORBIDDEN)
166 final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
167 def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
168 // We want to asynchronously route the message via akka and get the whole computation as a
170 val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
171 val promise = new TPromise[UserActorResponseMessage[_]]()
173 val actualWork = akka.pattern.ask(actorRef, request)(
174 ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
175 asInstanceOf[AFuture[UserActorResponseMessage[_]]]
177 actualWork.onComplete {
178 case Left(throwable) ⇒
179 promise.setException(throwable)
182 promise.setValue(value)
189 final case class MainService() extends Service[THttpRequest, THttpResponse] {
191 final val actorRouterService = UserActorService()
193 def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
194 actorRouterService(requestMessage).transform { tryResponse ⇒
196 case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
197 val statusCode = responseMessage.suggestedHTTPStatus
198 val status = THttpResponseStatus.valueOf(statusCode)
200 responseMessage.response match {
201 case Left(errorMessage) ⇒
202 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
208 stringResponse(status, errorMessage, TEXT_PLAIN)
211 stringResponse(status, responseMessage.toJsonString, APPLICATION_JSON)
214 case TThrow(throwable) ⇒
220 // FIXME make efficient; this partial function thing is crap for serving requests
221 def apply(request: THttpRequest): TFuture[THttpResponse] = {
222 val millis = TimeHelpers.nowMillis()
223 val uri = request.getUri
224 val method = request.getMethod
225 logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
227 type URIPF = PartialFunction[String, TFuture[THttpResponse]]
229 val PingHandler: URIPF = {
230 case RESTPaths.PingPath() ⇒
231 val now = TimeHelpers.nowMillis()
232 val nowFormatted = ISODateTimeFormat.dateTime().print(now)
233 stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
236 val UserActorCacheHandler: URIPF = {
237 case RESTPaths.UserActorCacheStatsPath() ⇒
238 val cacheSize = aquarium.akkaService.cacheSize
239 val stats = aquarium.akkaService.cacheStats
241 val mapper = new ObjectMapper()
242 val writer = mapper.writerWithDefaultPrettyPrinter()
244 val map = new java.util.LinkedHashMap[String, Any]
245 map.put("cacheSize", cacheSize)
246 map.put("requestCount", stats.requestCount())
247 map.put("hitCount", stats.hitCount())
248 map.put("hitRate", stats.hitRate())
249 map.put("missCount", stats.missCount())
250 map.put("missRate", stats.missRate())
251 map.put("evictionCount", stats.evictionCount())
252 map.put("loadCount", stats.loadCount())
253 map.put("loadSuccessCount", stats.loadSuccessCount())
254 map.put("loadExceptionCount", stats.loadExceptionCount())
255 map.put("loadExceptionRate", stats.loadExceptionRate())
256 map.put("totalLoadTime", stats.totalLoadTime())
257 map.put("averageLoadPenalty", stats.averageLoadPenalty())
259 val json = writer.writeValueAsString(map)
260 stringResponseOK(json, APPLICATION_JSON)
263 case RESTPaths.UserActorCacheContentsPath() ⇒
264 val buffer = new scala.collection.mutable.ArrayBuffer[String]()
265 aquarium.akkaService.foreachCachedUserID(buffer.append(_))
266 val output = buffer.sorted.mkString("\n")
267 stringResponseOK(output, TEXT_PLAIN)
269 case RESTPaths.UserActorCacheCountPath() ⇒
270 stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
273 val ConfHandler: URIPF = {
274 case RESTPaths.ResourcesPath() ⇒
275 stringResponseOK("%s\n%s\n%s\n" .format(
276 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
277 ResourceLocator.ResourceNames.LOGBACK_XML,
278 ResourceLocator.ResourceNames.POLICY_YAML),
281 case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
282 resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
284 case RESTPaths.ResourcesLogbackXMLPath() ⇒
285 resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
287 case RESTPaths.ResourcesPolicyYAMLPath() ⇒
288 resourceInfoResponse(ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
291 val EventsHandler: URIPF = {
292 case RESTPaths.ResourceEventPath(id) ⇒
293 eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
295 case RESTPaths.IMEventPath(id) ⇒
296 eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
299 val UserHandler: URIPF = {
300 case RESTPaths.UserBalancePath(userID) ⇒
301 // /user/(.+)/balance/?
302 callUserActor(GetUserBalanceRequest(userID, millis))
304 case RESTPaths.UserStatePath(userId) ⇒
305 // /user/(.+)/state/?
306 callUserActor(GetUserStateRequest(userId, millis))
309 val DefaultHandler: URIPF = {
311 statusResponse(NOT_FOUND)
314 val AllHandlers = List(
316 UserActorCacheHandler,
323 val combinedHandler = AllHandlers.reduceLeft(_ orElse _)
329 val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
330 lazy val server = ServerBuilder().
332 bindTo(new InetSocketAddress(this._port)).
336 def start(): Unit = {
337 logger.info("Starting HTTP on port %s".format(this._port))
338 // Just for the side effect
339 assert(server ne null)
343 logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
344 server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))