import gr.grnet.aquarium.event.model.ExternalEventModel
import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
import akka.dispatch.{Future ⇒ AFuture}
+import com.fasterxml.jackson.databind.ObjectMapper
+import java.util
/**
*
final val TEXT_PLAIN = "text/plain"
final val APPLICATION_JSON = "application/json"
- @volatile private[this] var _port: Int = 8080
- @volatile private[this] var _shutdownTimeoutMillis: Long = 2000
- @volatile private[this] var _threadPoolSize: Int = 4
- @volatile private[this] var _threadPool: TFuturePool = _
+ @volatile private[this] var _port: Int = _
+ @volatile private[this] var _shutdownTimeoutMillis: Long = _
+ @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
def propertyPrefix = Some(RESTService.Prefix)
def configure(props: Props) {
this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
-
- this._threadPool = TFuturePool(Executors.newFixedThreadPool(this._threadPoolSize))
+ this._userActorFutureTimeoutMillis = 5000L
logger.debug("HTTP port is %s".format(this._port))
}
}
}
- final case class MainService() extends Service[THttpRequest, THttpResponse] {
- final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
- def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
- // We want to asynchronously route the message via akka and get the whole computation as a
- // twitter future.
- val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
- val promise = new TPromise[UserActorResponseMessage[_]]()
-
- val actualWork = akka.pattern.ask(actorRef, request)(ATimeout(ADuration(500, TimeUnit.MILLISECONDS))).
- asInstanceOf[AFuture[UserActorResponseMessage[_]]]
-
- actualWork.onComplete {
- case Left(throwable) ⇒
- promise.setException(throwable)
-
- case Right(value) ⇒
- promise.setValue(value)
- }
+ final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
+ def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
+ // We want to asynchronously route the message via akka and get the whole computation as a
+ // twitter future.
+ val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
+ val promise = new TPromise[UserActorResponseMessage[_]]()
+
+ val actualWork = akka.pattern.ask(actorRef, request)(
+ ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
+ asInstanceOf[AFuture[UserActorResponseMessage[_]]]
+
+ actualWork.onComplete {
+ case Left(throwable) ⇒
+ promise.setException(throwable)
- promise
+ case Right(value) ⇒
+ promise.setValue(value)
}
+
+ promise
}
+ }
+
+ final case class MainService() extends Service[THttpRequest, THttpResponse] {
final val actorRouterService = UserActorService()
}
case TThrow(throwable) ⇒
- val status = INTERNAL_SERVER_ERROR
- logger.error("Error %s serving %s: %s".format(
- status.getReasonPhrase,
- requestMessage,
- gr.grnet.aquarium.util.shortInfoOf(throwable)
- ))
-
- statusResponse(status)
+ throw throwable
}
}
}
+ // FIXME make efficient; this partial function thing is crap for serving requests
def apply(request: THttpRequest): TFuture[THttpResponse] = {
val millis = TimeHelpers.nowMillis()
val uri = request.getUri
val method = request.getMethod
logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
- uri match {
+ type URIPF = PartialFunction[String, TFuture[THttpResponse]]
+
+ val PingHandler: URIPF = {
case RESTPaths.PingPath() ⇒
val now = TimeHelpers.nowMillis()
val nowFormatted = ISODateTimeFormat.dateTime().print(now)
stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
+ }
-// case RESTPaths.ResourcesPath() ⇒
-// stringResponseOK("%s\n%s\n%s\n" .format(
-// ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
-// ResourceLocator.ResourceNames.LOGBACK_XML,
-// ResourceLocator.ResourceNames.POLICY_YAML),
-// TEXT_PLAIN)
+ val UserActorCacheHandler: URIPF = {
+ case RESTPaths.UserActorCacheStatsPath() ⇒
+ val cacheSize = aquarium.akkaService.cacheSize
+ val stats = aquarium.akkaService.cacheStats
+
+ val mapper = new ObjectMapper()
+ val writer = mapper.writerWithDefaultPrettyPrinter()
+
+ val map = new java.util.LinkedHashMap[String, Any]
+ map.put("cacheSize", cacheSize)
+ map.put("requestCount", stats.requestCount())
+ map.put("hitCount", stats.hitCount())
+ map.put("hitRate", stats.hitRate())
+ map.put("missCount", stats.missCount())
+ map.put("missRate", stats.missRate())
+ map.put("evictionCount", stats.evictionCount())
+ map.put("loadCount", stats.loadCount())
+ map.put("loadSuccessCount", stats.loadSuccessCount())
+ map.put("loadExceptionCount", stats.loadExceptionCount())
+ map.put("loadExceptionRate", stats.loadExceptionRate())
+ map.put("totalLoadTime", stats.totalLoadTime())
+ map.put("averageLoadPenalty", stats.averageLoadPenalty())
+
+ val json = writer.writeValueAsString(map)
+ stringResponseOK(json, APPLICATION_JSON)
+
+
+ case RESTPaths.UserActorCacheContentsPath() ⇒
+ val buffer = new scala.collection.mutable.ArrayBuffer[String]()
+ aquarium.akkaService.foreachCachedUserID(buffer.append(_))
+ val output = buffer.sorted.mkString("\n")
+ stringResponseOK(output, TEXT_PLAIN)
+
+ case RESTPaths.UserActorCacheCountPath() ⇒
+ stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
+ }
- case RESTPaths.UserBalancePath(userID) ⇒
- // /user/(.+)/balance/?
- callUserActor(GetUserBalanceRequest(userID, millis))
-
- case RESTPaths.UserStatePath(userId) ⇒
- // /user/(.+)/state/?
- callUserActor(GetUserStateRequest(userId, millis))
+ val ConfHandler: URIPF = {
+ case RESTPaths.ResourcesPath() ⇒
+ stringResponseOK("%s\n%s\n%s\n" .format(
+ ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+ ResourceLocator.ResourceNames.LOGBACK_XML,
+ ResourceLocator.ResourceNames.POLICY_YAML),
+ TEXT_PLAIN)
case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
case RESTPaths.ResourcesPolicyYAMLPath() ⇒
resourceInfoResponse(ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
+ }
+ val EventsHandler: URIPF = {
case RESTPaths.ResourceEventPath(id) ⇒
eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
case RESTPaths.IMEventPath(id) ⇒
eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
+ }
+
+ val UserHandler: URIPF = {
+ case RESTPaths.UserBalancePath(userID) ⇒
+ // /user/(.+)/balance/?
+ callUserActor(GetUserBalanceRequest(userID, millis))
+ case RESTPaths.UserStatePath(userId) ⇒
+ // /user/(.+)/state/?
+ callUserActor(GetUserStateRequest(userId, millis))
+ }
+
+ val DefaultHandler: URIPF = {
case _ ⇒
statusResponse(NOT_FOUND)
}
+
+ val AllHandlers = List(
+ PingHandler,
+ UserActorCacheHandler,
+ ConfHandler,
+ EventsHandler,
+ UserHandler,
+ DefaultHandler
+ )
+
+ val combinedHandler = AllHandlers.reduceLeft(_ orElse _)
+
+ combinedHandler(uri)
}
}