More internal info exposed via the protected REST api
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 2 Jul 2012 11:00:37 +0000 (14:00 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 2 Jul 2012 11:00:37 +0000 (14:00 +0300)
src/main/scala/gr/grnet/aquarium/service/FinagleRESTService.scala
src/main/scala/gr/grnet/aquarium/service/RESTPaths.scala

index eb15ae2..2d59fdb 100644 (file)
@@ -57,6 +57,8 @@ import com.ckkloverdos.maybe.{Just, Failed}
 import gr.grnet.aquarium.event.model.ExternalEventModel
 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
 import akka.dispatch.{Future ⇒ AFuture}
+import com.fasterxml.jackson.databind.ObjectMapper
+import java.util
 
 /**
  *
@@ -67,10 +69,9 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
   final val TEXT_PLAIN       = "text/plain"
   final val APPLICATION_JSON = "application/json"
 
-  @volatile private[this] var _port: Int = 8080
-  @volatile private[this] var _shutdownTimeoutMillis: Long = 2000
-  @volatile private[this] var _threadPoolSize: Int = 4
-  @volatile private[this] var _threadPool: TFuturePool = _
+  @volatile private[this] var _port: Int = _
+  @volatile private[this] var _shutdownTimeoutMillis: Long = _
+  @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
 
   def propertyPrefix = Some(RESTService.Prefix)
 
@@ -82,8 +83,7 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
   def configure(props: Props) {
     this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
     this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
-
-    this._threadPool = TFuturePool(Executors.newFixedThreadPool(this._threadPoolSize))
+    this._userActorFutureTimeoutMillis = 5000L
 
     logger.debug("HTTP port is %s".format(this._port))
   }
@@ -160,28 +160,30 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
     }
   }
 
-  final case class MainService() extends Service[THttpRequest, THttpResponse] {
-    final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
-      def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
-        // We want to asynchronously route the message via akka and get the whole computation as a
-        // twitter future.
-        val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
-        val promise = new TPromise[UserActorResponseMessage[_]]()
-
-        val actualWork = akka.pattern.ask(actorRef, request)(ATimeout(ADuration(500, TimeUnit.MILLISECONDS))).
-          asInstanceOf[AFuture[UserActorResponseMessage[_]]]
-
-        actualWork.onComplete {
-          case Left(throwable) ⇒
-            promise.setException(throwable)
-
-          case Right(value) ⇒
-            promise.setValue(value)
-        }
+  final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
+    def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
+      // We want to asynchronously route the message via akka and get the whole computation as a
+      // twitter future.
+      val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
+      val promise = new TPromise[UserActorResponseMessage[_]]()
+
+      val actualWork = akka.pattern.ask(actorRef, request)(
+          ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
+        asInstanceOf[AFuture[UserActorResponseMessage[_]]]
+
+      actualWork.onComplete {
+        case Left(throwable) ⇒
+          promise.setException(throwable)
 
-        promise
+        case Right(value) ⇒
+          promise.setValue(value)
       }
+
+      promise
     }
+  }
+
+  final case class MainService() extends Service[THttpRequest, THttpResponse] {
 
     final val actorRouterService = UserActorService()
 
@@ -207,44 +209,71 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
             }
 
           case TThrow(throwable) ⇒
-            val status = INTERNAL_SERVER_ERROR
-            logger.error("Error %s serving %s: %s".format(
-              status.getReasonPhrase,
-              requestMessage,
-              gr.grnet.aquarium.util.shortInfoOf(throwable)
-            ))
-
-            statusResponse(status)
+            throw throwable
         }
       }
     }
 
+    // FIXME make efficient; this partial function thing is crap for serving requests
     def apply(request: THttpRequest): TFuture[THttpResponse] = {
       val millis = TimeHelpers.nowMillis()
       val uri = request.getUri
       val method = request.getMethod
       logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
 
-      uri match {
+      type URIPF = PartialFunction[String, TFuture[THttpResponse]]
+
+      val PingHandler: URIPF = {
         case RESTPaths.PingPath() ⇒
           val now = TimeHelpers.nowMillis()
           val nowFormatted = ISODateTimeFormat.dateTime().print(now)
           stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
+      }
 
-//        case RESTPaths.ResourcesPath() ⇒
-//          stringResponseOK("%s\n%s\n%s\n" .format(
-//            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
-//            ResourceLocator.ResourceNames.LOGBACK_XML,
-//            ResourceLocator.ResourceNames.POLICY_YAML),
-//          TEXT_PLAIN)
+      val UserActorCacheHandler: URIPF = {
+        case RESTPaths.UserActorCacheStatsPath() ⇒
+          val cacheSize = aquarium.akkaService.cacheSize
+          val stats = aquarium.akkaService.cacheStats
+
+          val mapper = new ObjectMapper()
+          val writer = mapper.writerWithDefaultPrettyPrinter()
+          
+          val map = new java.util.LinkedHashMap[String, Any]
+          map.put("cacheSize", cacheSize)
+          map.put("requestCount", stats.requestCount())
+          map.put("hitCount", stats.hitCount())
+          map.put("hitRate", stats.hitRate())
+          map.put("missCount", stats.missCount())
+          map.put("missRate", stats.missRate())
+          map.put("evictionCount", stats.evictionCount())
+          map.put("loadCount", stats.loadCount())
+          map.put("loadSuccessCount", stats.loadSuccessCount())
+          map.put("loadExceptionCount", stats.loadExceptionCount())
+          map.put("loadExceptionRate", stats.loadExceptionRate())
+          map.put("totalLoadTime", stats.totalLoadTime())
+          map.put("averageLoadPenalty", stats.averageLoadPenalty())
+
+          val json = writer.writeValueAsString(map)
+          stringResponseOK(json, APPLICATION_JSON)
+
+
+        case RESTPaths.UserActorCacheContentsPath() ⇒
+          val buffer = new scala.collection.mutable.ArrayBuffer[String]()
+          aquarium.akkaService.foreachCachedUserID(buffer.append(_))
+          val output = buffer.sorted.mkString("\n")
+          stringResponseOK(output, TEXT_PLAIN)
+
+        case RESTPaths.UserActorCacheCountPath() ⇒
+          stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
+      }
 
-        case RESTPaths.UserBalancePath(userID) ⇒
-          // /user/(.+)/balance/?
-          callUserActor(GetUserBalanceRequest(userID, millis))
-
-        case RESTPaths.UserStatePath(userId) ⇒
-          // /user/(.+)/state/?
-          callUserActor(GetUserStateRequest(userId, millis))
+      val ConfHandler: URIPF = {
+        case RESTPaths.ResourcesPath() ⇒
+          stringResponseOK("%s\n%s\n%s\n" .format(
+            ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+            ResourceLocator.ResourceNames.LOGBACK_XML,
+            ResourceLocator.ResourceNames.POLICY_YAML),
+          TEXT_PLAIN)
 
         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
@@ -254,16 +283,43 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
 
         case RESTPaths.ResourcesPolicyYAMLPath() ⇒
           resourceInfoResponse(ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
+      }
 
+      val EventsHandler: URIPF = {
         case RESTPaths.ResourceEventPath(id) ⇒
           eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
 
         case RESTPaths.IMEventPath(id) ⇒
           eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
+      }
+
+      val UserHandler: URIPF = {
+        case RESTPaths.UserBalancePath(userID) ⇒
+          // /user/(.+)/balance/?
+          callUserActor(GetUserBalanceRequest(userID, millis))
 
+        case RESTPaths.UserStatePath(userId) ⇒
+          // /user/(.+)/state/?
+          callUserActor(GetUserStateRequest(userId, millis))
+      }
+
+      val DefaultHandler: URIPF = {
         case _ ⇒
           statusResponse(NOT_FOUND)
       }
+
+      val AllHandlers = List(
+        PingHandler,
+        UserActorCacheHandler,
+        ConfHandler,
+        EventsHandler,
+        UserHandler,
+        DefaultHandler
+      )
+
+      val combinedHandler =  AllHandlers.reduceLeft(_ orElse _)
+
+      combinedHandler(uri)
     }
   }
 
index 983e694..00e9b9f 100644 (file)
@@ -73,4 +73,8 @@ object RESTPaths {
    * Use this URI path to query for the user state.
    */
   final val UserStatePath = "/user/([^/]+)/state/?".r
+
+  final val UserActorCacheContentsPath = (AdminPrefix + "/cache/actor/user/contents").r
+  final val UserActorCacheCountPath    = (AdminPrefix + "/cache/actor/user/size").r
+  final val UserActorCacheStatsPath    = (AdminPrefix + "/cache/actor/user/stats").r
 }
\ No newline at end of file