2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{Executors, TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message._
55 import com.ckkloverdos.resource.StreamResource
56 import gr.grnet.aquarium.event.model.ExternalEventModel
57 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
58 import akka.dispatch.{Future ⇒ AFuture}
59 import gr.grnet.aquarium.util.json.JsonHelpers
61 import com.ckkloverdos.maybe.Failed
62 import gr.grnet.aquarium.actor.message.GetUserStateRequest
63 import com.ckkloverdos.maybe.Just
64 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
65 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
66 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
70 * @author Christos KK Loverdos <loverdos@gmail.com>
73 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
74 final val TEXT_PLAIN = "text/plain"
75 final val APPLICATION_JSON = "application/json"
77 @volatile private[this] var _port: Int = _
78 @volatile private[this] var _shutdownTimeoutMillis: Long = _
79 @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
81 def propertyPrefix = Some(RESTService.Prefix)
84 * Configure this instance with the provided properties.
86 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
88 def configure(props: Props) {
89 this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
90 this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
91 this._userActorFutureTimeoutMillis = 5000L
93 logger.debug("HTTP port is %s".format(this._port))
96 def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
97 val response = new TDefaultHttpResponse(HTTP_1_1, status)
98 response.setContent(copiedBuffer(body, UTF_8))
99 response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
101 TFuture.value(response)
104 def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
105 stringResponse(OK, body, contentType)
108 def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
109 stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
112 def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
113 val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
115 resource.stringContent.toMaybeEither match {
117 stringResponseOK(fmt(body), contentType)
124 def eventInfoResponse[E <: ExternalEventModel](
126 getter: String ⇒ Option[E]
127 ): TFuture[THttpResponse] = {
128 getter(eventID) match {
130 stringResponseOK(event.toJsonString, APPLICATION_JSON)
133 statusResponse(NOT_FOUND)
137 final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
138 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
139 service(request) handle {
141 logger.error("While serving %s".format(request), error)
142 val status = INTERNAL_SERVER_ERROR
143 val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
144 errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
151 final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
152 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
153 if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
154 val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
155 aquarium.adminCookie match {
156 case Some(`headerValue`) ⇒
160 statusResponse(FORBIDDEN)
163 statusResponse(FORBIDDEN)
171 final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
172 def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
173 // We want to asynchronously route the message via akka and get the whole computation as a
175 val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
176 val promise = new TPromise[UserActorResponseMessage[_]]()
178 val actualWork = akka.pattern.ask(actorRef, request)(
179 ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
180 asInstanceOf[AFuture[UserActorResponseMessage[_]]]
182 actualWork.onComplete {
183 case Left(throwable) ⇒
184 promise.setException(throwable)
187 promise.setValue(value)
194 final case class MainService() extends Service[THttpRequest, THttpResponse] {
196 final val actorRouterService = UserActorService()
198 def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
199 actorRouterService(requestMessage).transform { tryResponse ⇒
201 case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
202 logger.debug("{}", responseMessage)
203 logger.debug("{}", responseMessage.responseToJsonString)
204 val statusCode = responseMessage.suggestedHTTPStatus
205 val status = THttpResponseStatus.valueOf(statusCode)
207 responseMessage.response match {
208 case Left(errorMessage) ⇒
209 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
215 stringResponse(status, errorMessage, TEXT_PLAIN)
218 stringResponse(status, responseMessage.responseToJsonString, APPLICATION_JSON)
221 case TThrow(throwable) ⇒
227 // FIXME make efficient; this partial function thing is crap for serving requests
228 def apply(request: THttpRequest): TFuture[THttpResponse] = {
229 val millis = TimeHelpers.nowMillis()
230 val uri = request.getUri
231 val method = request.getMethod
232 logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
234 type URIPF = PartialFunction[String, TFuture[THttpResponse]]
235 def pong(ok:Boolean) = {
236 val now = TimeHelpers.nowMillis()
237 val nowFormatted = ISODateTimeFormat.dateTime().print(now)
238 val reply = if(ok) "PONG" else "DOWN"
239 stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
241 val PingHandler: URIPF = {
242 case RESTPaths.AquariumPingPath() ⇒
244 case RESTPaths.RabbitMQPingPath() ⇒
245 pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
246 case RESTPaths.IMStorePingPath() ⇒
247 pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
248 case RESTPaths.RCStorePingPath() ⇒
249 pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
252 val UserActorCacheHandler: URIPF = {
253 case RESTPaths.UserActorCacheStatsPath() ⇒
254 val cacheSize = aquarium.akkaService.cacheSize
255 val stats = aquarium.akkaService.cacheStats
257 val map = new java.util.LinkedHashMap[String, Any]
258 map.put("cacheSize", cacheSize)
259 map.put("requestCount", stats.requestCount())
260 map.put("hitCount", stats.hitCount())
261 map.put("hitRate", stats.hitRate())
262 map.put("missCount", stats.missCount())
263 map.put("missRate", stats.missRate())
264 map.put("evictionCount", stats.evictionCount())
265 map.put("loadCount", stats.loadCount())
266 map.put("loadSuccessCount", stats.loadSuccessCount())
267 map.put("loadExceptionCount", stats.loadExceptionCount())
268 map.put("loadExceptionRate", stats.loadExceptionRate())
269 map.put("totalLoadTime", stats.totalLoadTime())
270 map.put("averageLoadPenalty", stats.averageLoadPenalty())
272 val json = JsonHelpers.jsonStringOfJavaMap(map)
273 stringResponseOK(json, APPLICATION_JSON)
275 case RESTPaths.UserActorCacheContentsPath() ⇒
276 val buffer = new scala.collection.mutable.ArrayBuffer[String]()
277 aquarium.akkaService.foreachCachedUserID(buffer.append(_))
278 val output = buffer.sorted.mkString("\n")
279 stringResponseOK(output, TEXT_PLAIN)
281 case RESTPaths.UserActorCacheCountPath() ⇒
282 stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
285 val ConfHandler: URIPF = {
286 case RESTPaths.ResourcesPath() ⇒
287 stringResponseOK("%s\n%s\n%s\n" .format(
288 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
289 ResourceLocator.ResourceNames.LOGBACK_XML,
290 ResourceLocator.ResourceNames.POLICY_JSON),
293 case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
294 resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
296 case RESTPaths.ResourcesLogbackXMLPath() ⇒
297 resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
299 case RESTPaths.ResourcesPolicyJSONPath() ⇒
300 resourceInfoResponse(ResourceLocator.Resources.PolicyJSONResource, TEXT_PLAIN)
303 val EventsHandler: URIPF = {
304 case RESTPaths.ResourceEventPath(id) ⇒
305 eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
307 case RESTPaths.IMEventPath(id) ⇒
308 eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
311 val UserHandler: URIPF = {
312 case RESTPaths.UserBalancePath(userID) ⇒
313 // /user/(.+)/balance/?
314 callUserActor(GetUserBalanceRequest(userID, millis))
316 case RESTPaths.UserStatePath(userID) ⇒
317 // /user/(.+)/state/?
318 callUserActor(GetUserStateRequest(userID, millis))
320 case RESTPaths.UserWalletPath(userID) ⇒
321 // /user/(.+)/wallet/?
322 callUserActor(GetUserWalletRequest(userID, millis))
324 case RESTPaths.UserBillPath(userID,st1,st2) ⇒
327 val t = Timeslot(t1,if(t2==0)Long.MaxValue else t2)
328 callUserActor(GetUserBillRequest(userID,t,millis))
331 val DefaultHandler: URIPF = {
333 statusResponse(NOT_FOUND)
336 val AllHandlers = List(
338 UserActorCacheHandler,
345 val combinedHandler = AllHandlers.reduceLeft(_ orElse _)
351 val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
352 lazy val server = ServerBuilder().
354 bindTo(new InetSocketAddress(this._port)).
358 def start(): Unit = {
359 logger.info("Starting HTTP on port %s".format(this._port))
360 // Just for the side effect
361 assert(server ne null)
365 logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
366 server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))