2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, FuturePool ⇒ TFuturePool, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message._
55 import com.ckkloverdos.resource.StreamResource
56 import com.ckkloverdos.maybe.{Just, Failed}
57 import gr.grnet.aquarium.event.model.ExternalEventModel
58 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
59 import akka.dispatch.{Future ⇒ AFuture}
60 import com.fasterxml.jackson.databind.ObjectMapper
64 import com.ckkloverdos.maybe.Failed
65 import gr.grnet.aquarium.actor.message.GetUserStateRequest
67 import com.ckkloverdos.maybe.Just
68 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
69 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
70 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
74 * @author Christos KK Loverdos <loverdos@gmail.com>
77 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
78 final val TEXT_PLAIN = "text/plain"
79 final val APPLICATION_JSON = "application/json"
81 @volatile private[this] var _port: Int = _
82 @volatile private[this] var _shutdownTimeoutMillis: Long = _
83 @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
85 def propertyPrefix = Some(RESTService.Prefix)
88 * Configure this instance with the provided properties.
90 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
92 def configure(props: Props) {
93 this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
94 this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
95 this._userActorFutureTimeoutMillis = 5000L
97 logger.debug("HTTP port is %s".format(this._port))
100 def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
101 val response = new TDefaultHttpResponse(HTTP_1_1, status)
102 response.setContent(copiedBuffer(body, UTF_8))
103 response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
105 TFuture.value(response)
108 def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
109 stringResponse(OK, body, contentType)
112 def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
113 stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
116 def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
117 val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
119 resource.stringContent.toMaybeEither match {
121 stringResponseOK(fmt(body), contentType)
128 def eventInfoResponse[E <: ExternalEventModel](
130 getter: String ⇒ Option[E]
131 ): TFuture[THttpResponse] = {
132 getter(eventID) match {
134 stringResponseOK(event.toJsonString, APPLICATION_JSON)
137 statusResponse(NOT_FOUND)
141 final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
142 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
143 service(request) handle {
145 logger.error("While serving %s".format(request), error)
146 val status = INTERNAL_SERVER_ERROR
147 val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
148 errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
155 final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
156 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
157 if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
158 val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
159 aquarium.adminCookie match {
160 case Some(`headerValue`) ⇒
164 statusResponse(FORBIDDEN)
167 statusResponse(FORBIDDEN)
175 final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
176 def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
177 // We want to asynchronously route the message via akka and get the whole computation as a
179 val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
180 val promise = new TPromise[UserActorResponseMessage[_]]()
182 val actualWork = akka.pattern.ask(actorRef, request)(
183 ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
184 asInstanceOf[AFuture[UserActorResponseMessage[_]]]
186 actualWork.onComplete {
187 case Left(throwable) ⇒
188 promise.setException(throwable)
191 promise.setValue(value)
198 final case class MainService() extends Service[THttpRequest, THttpResponse] {
200 final val actorRouterService = UserActorService()
202 def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
203 actorRouterService(requestMessage).transform { tryResponse ⇒
205 case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
206 logger.debug("{}", responseMessage)
207 logger.debug("{}", responseMessage.responseToJsonString)
208 val statusCode = responseMessage.suggestedHTTPStatus
209 val status = THttpResponseStatus.valueOf(statusCode)
211 responseMessage.response match {
212 case Left(errorMessage) ⇒
213 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
219 stringResponse(status, errorMessage, TEXT_PLAIN)
222 stringResponse(status, responseMessage.responseToJsonString, APPLICATION_JSON)
225 case TThrow(throwable) ⇒
231 // FIXME make efficient; this partial function thing is crap for serving requests
232 def apply(request: THttpRequest): TFuture[THttpResponse] = {
233 val millis = TimeHelpers.nowMillis()
234 val uri = request.getUri
235 val method = request.getMethod
236 logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
238 type URIPF = PartialFunction[String, TFuture[THttpResponse]]
239 def pong(ok:Boolean) = {
240 val now = TimeHelpers.nowMillis()
241 val nowFormatted = ISODateTimeFormat.dateTime().print(now)
242 val reply = if(ok) "PONG" else "DOWN"
243 stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
245 val PingHandler: URIPF = {
246 case RESTPaths.AquariumPingPath() ⇒
248 case RESTPaths.RabbitMQPingPath() ⇒
249 pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
250 case RESTPaths.IMStorePingPath() ⇒
251 pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
252 case RESTPaths.RCStorePingPath() ⇒
253 pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
256 val UserActorCacheHandler: URIPF = {
257 case RESTPaths.UserActorCacheStatsPath() ⇒
258 val cacheSize = aquarium.akkaService.cacheSize
259 val stats = aquarium.akkaService.cacheStats
261 val mapper = new ObjectMapper()
262 val writer = mapper.writerWithDefaultPrettyPrinter()
264 val map = new java.util.LinkedHashMap[String, Any]
265 map.put("cacheSize", cacheSize)
266 map.put("requestCount", stats.requestCount())
267 map.put("hitCount", stats.hitCount())
268 map.put("hitRate", stats.hitRate())
269 map.put("missCount", stats.missCount())
270 map.put("missRate", stats.missRate())
271 map.put("evictionCount", stats.evictionCount())
272 map.put("loadCount", stats.loadCount())
273 map.put("loadSuccessCount", stats.loadSuccessCount())
274 map.put("loadExceptionCount", stats.loadExceptionCount())
275 map.put("loadExceptionRate", stats.loadExceptionRate())
276 map.put("totalLoadTime", stats.totalLoadTime())
277 map.put("averageLoadPenalty", stats.averageLoadPenalty())
279 val json = writer.writeValueAsString(map)
280 stringResponseOK(json, APPLICATION_JSON)
283 case RESTPaths.UserActorCacheContentsPath() ⇒
284 val buffer = new scala.collection.mutable.ArrayBuffer[String]()
285 aquarium.akkaService.foreachCachedUserID(buffer.append(_))
286 val output = buffer.sorted.mkString("\n")
287 stringResponseOK(output, TEXT_PLAIN)
289 case RESTPaths.UserActorCacheCountPath() ⇒
290 stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
293 val ConfHandler: URIPF = {
294 case RESTPaths.ResourcesPath() ⇒
295 stringResponseOK("%s\n%s\n%s\n" .format(
296 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
297 ResourceLocator.ResourceNames.LOGBACK_XML,
298 ResourceLocator.ResourceNames.POLICY_JSON),
301 case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
302 resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
304 case RESTPaths.ResourcesLogbackXMLPath() ⇒
305 resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
307 case RESTPaths.ResourcesPolicyJSONPath() ⇒
308 resourceInfoResponse(ResourceLocator.Resources.PolicyJSONResource, TEXT_PLAIN)
311 val EventsHandler: URIPF = {
312 case RESTPaths.ResourceEventPath(id) ⇒
313 eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
315 case RESTPaths.IMEventPath(id) ⇒
316 eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
319 val UserHandler: URIPF = {
320 case RESTPaths.UserBalancePath(userID) ⇒
321 // /user/(.+)/balance/?
322 callUserActor(GetUserBalanceRequest(userID, millis))
324 case RESTPaths.UserStatePath(userID) ⇒
325 // /user/(.+)/state/?
326 callUserActor(GetUserStateRequest(userID, millis))
328 case RESTPaths.UserWalletPath(userID) ⇒
329 // /user/(.+)/wallet/?
330 callUserActor(GetUserWalletRequest(userID, millis))
332 case RESTPaths.UserBillPath(userID,st1,st2) ⇒
335 val t = Timeslot(t1,if(t2==0)Long.MaxValue else t2)
336 callUserActor(GetUserBillRequest(userID,t,millis))
339 val DefaultHandler: URIPF = {
341 statusResponse(NOT_FOUND)
344 val AllHandlers = List(
346 UserActorCacheHandler,
353 val combinedHandler = AllHandlers.reduceLeft(_ orElse _)
359 val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
360 lazy val server = ServerBuilder().
362 bindTo(new InetSocketAddress(this._port)).
366 def start(): Unit = {
367 logger.info("Starting HTTP on port %s".format(this._port))
368 // Just for the side effect
369 assert(server ne null)
373 logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
374 server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))