</repository>
<repository>
- <name>Spray</name>
- <id>spray</id>
- <url>http://repo.spray.cc/</url>
+ <name>Twitter</name>
+ <id>twitter</id>
+ <url>http://maven.twttr.com/</url>
</repository>
</repositories>
</dependency>-->
<dependency>
- <groupId>cc.spray</groupId>
- <artifactId>spray-can</artifactId>
- <version>0.9.3</version>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-core_2.9.1</artifactId>
+ <version>4.0.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-http_2.9.1</artifactId>
+ <version>4.0.2</version>
</dependency>
<dependency>
# Actor subsystem
actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService
# Class that initializes the REST service
-rest.service.class=gr.grnet.aquarium.service.RESTActorService
+rest.service.class=gr.grnet.aquarium.service.FinagleRESTService
+rest.shutdown.timeout.millis=2000
# Store subsystem
store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
# Override the user store (if present, it will not be given by the store provider above)
<logger name="com.ckkloverdos" level="WARN"/>
- <logger name="cc.spray.can" level="INFO"/>
-
<logger name="gr.grnet" level="DEBUG"/>
<root level="DEBUG">
private[this] val _isStopping = new AtomicBoolean(false)
+ override def toString = "%s/v%s".format(getClass.getName, version)
+
def isStopping() = _isStopping.get()
@inline
*/
final val restPort = IntKey("rest.port")
+ final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
+
/**
* A cookie used in every administrative REST API call, so that Aquarium knows it comes from
* an authorised client.
import gr.grnet.aquarium.service.event.AquariumCreatedEvent
import com.google.common.eventbus.Subscribe
+import gr.grnet.aquarium.util.Loggable
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-trait AquariumAwareSkeleton extends AquariumAware {
- @volatile protected var _aquarium: Aquarium = null
+trait AquariumAwareSkeleton extends AquariumAware { this: Loggable ⇒
+ @volatile private var _aquarium: Aquarium = null
- final def aquarium = _aquarium
+ final protected def aquarium = _aquarium
@Subscribe
def awareOfAquariumEx(event: AquariumCreatedEvent) = {
this._aquarium = event.aquarium
+ logger.debug("Aware of Aquarium: %s".format(this._aquarium))
}
}
propValue.toInt
}
+ checkPropsOverride(EnvKeys.restShutdownTimeoutMillis) { (envKey, propValue) ⇒
+ propValue.toLong
+ }
+
checkOptionalPropsOverride(EnvKeys.adminCookie) { (envKey, propValue) ⇒
Some(propValue)
}
aquarium.start()
val ms1 = TimeHelpers.nowMillis()
- logStarted(ms0, ms1, "Aquarium [%s]", aquarium.version)
+ logStarted(ms0, ms1, "%s", aquarium.toString)
logSeparator()
} catch {
case e: Throwable ⇒
import service.router.RouterActor
import service.pinger.PingerActor
-import service.rest.RESTActor
import service.user.{UserActor}
-import cc.spray.can.{Timeout, RequestContext}
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
import gr.grnet.aquarium.actor.message.admin.PingAllRequest
import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
*
* A role also dictates which configuration messages the respective actor handles.
*/
-sealed abstract class ActorRole(val role: String,
- val isCacheable: Boolean,
- val actorType: Class[_ <: RoleableActor],
- val handledServiceMessages: Set[Class[_]],
- val handledConfigurationMessages: Set[Class[_ <: ActorConfigurationMessage]] = Set()) {
+sealed abstract class ActorRole(
+ val role: String,
+ val isCacheable: Boolean,
+ val actorType: Class[_ <: RoleableActor],
+ val handledServiceMessages: Set[Class[_]],
+ val handledConfigurationMessages: Set[Class[_ <: ActorConfigurationMessage]] = Set()
+) {
val knownMessageTypes = handledServiceMessages ++ handledConfigurationMessages
classOf[AquariumPropertiesLoaded]))
/**
- * REST request handler.
- */
-case object RESTRole
- extends ActorRole("RESTRole",
- true,
- classOf[RESTActor],
- Set(classOf[RequestContext],
- classOf[Timeout]))
-
-/**
* User-oriented business logic handler role.
*/
case object UserActorRole
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor
-package service
-package rest
-
-import cc.spray.can.HttpMethods.GET
-import cc.spray.can._
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.util.shortInfoOf
-import akka.actor.Actor
-import gr.grnet.aquarium.actor.{RESTRole, RoleableActor, RouterRole}
-import RESTPaths._
-import gr.grnet.aquarium.util.date.TimeHelpers
-import org.joda.time.format.ISODateTimeFormat
-import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, GetUserBalanceRequest}
-import gr.grnet.aquarium.{ResourceLocator, Aquarium}
-import com.ckkloverdos.resource.StreamResource
-import com.ckkloverdos.maybe.Failed
-import java.net.InetAddress
-import gr.grnet.aquarium.event.model.ExternalEventModel
-
-/**
- * Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RESTActor private(_id: String) extends RoleableActor with Loggable {
- def this() = this("spray-root-service")
-
- self.id = _id
-
- final val TEXT_PLAIN = "text/plain"
- final val APPLICATION_JSON = "application/json"
-
- private def stringResponse(status: Int, stringBody: String, contentType: String): HttpResponse = {
- HttpResponse(
- status,
- HttpHeader("Content-type", "%s;charset=utf-8".format(contentType)) :: Nil,
- stringBody.getBytes("UTF-8")
- )
- }
-
- private def resourceInfoResponse(
- uri: String,
- responder: RequestResponder,
- resource: StreamResource,
- contentType: String
- ): Unit = {
-
- val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
- val res = resource.mapString(body ⇒ responder.complete(stringResponse(200, fmt(body), contentType)))
-
- res match {
- case Failed(e) ⇒
- logger.error("While serving %s".format(uri), e)
- responder.complete(stringResponse(501, "Internal Server Error: %s".format(shortInfoOf(e)), TEXT_PLAIN))
-
- case _ ⇒
-
- }
- }
-
- private def eventInfoResponse[E <: ExternalEventModel](
- uri: String,
- responder: RequestResponder,
- getter: String ⇒ Option[E],
- eventID: String
- ): Unit = {
-
- val toSend = getter.apply(eventID) match {
- case Some(event) ⇒
- (200, event.toJsonString, APPLICATION_JSON)
-
- case None ⇒
- (404, "Event not found", TEXT_PLAIN)
- }
-
- responder.complete(stringResponse(toSend._1, toSend._2, toSend._3))
- }
-
- def withAdminCookie(
- uri: String,
- responder: RequestResponder,
- headers: List[HttpHeader],
- remoteAddress: InetAddress
- )( f: RequestResponder ⇒ Unit): Unit = {
-
- aquarium.adminCookie match {
- case Some(adminCookie) ⇒
- headers.find(_.name.toLowerCase == Aquarium.HTTP.RESTAdminHeaderNameLowerCase) match {
- case Some(cookieHeader) if(cookieHeader.value == adminCookie) ⇒
- try f(responder)
- catch {
- case e: Throwable ⇒
- logger.error("While serving %s".format(uri), e)
- responder.complete(stringResponse(501, "Internal Server Error: %s".format(shortInfoOf(e)), TEXT_PLAIN))
- }
-
- case Some(cookieHeader) ⇒
- logger.warn("Admin request %s with bad cookie '%s' from %s".format(uri, cookieHeader.value, remoteAddress))
- responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
-
- case None ⇒
- logger.warn("Admin request %s with no cookie from %s".format(uri, remoteAddress))
- responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
- }
-
- case None ⇒
- responder.complete(stringResponse(403, "Forbidden!", TEXT_PLAIN))
- }
- }
-
- private def stringResponse200(stringBody: String, contentType: String): HttpResponse = {
- stringResponse(200, stringBody, contentType)
- }
-
- protected def receive = {
- case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) ⇒
- val now = TimeHelpers.nowMillis()
- val nowFormatted = ISODateTimeFormat.dateTime().print(now)
- responder.complete(stringResponse200("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN))
-
- case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) ⇒ {
- (serverActor ? GetStats).mapTo[Stats].onComplete {
- future =>
- future.value.get match {
- case Right(stats) => responder.complete {
- stringResponse200(
- "Uptime : " + (stats.uptime / 1000.0) + " sec\n" +
- "Requests dispatched : " + stats.requestsDispatched + '\n' +
- "Requests timed out : " + stats.requestsTimedOut + '\n' +
- "Requests open : " + stats.requestsOpen + '\n' +
- "Open connections : " + stats.connectionsOpen + '\n',
- TEXT_PLAIN
- )
- }
- case Left(ex) => responder.complete(stringResponse(500, "Couldn't get server stats due to " + ex, TEXT_PLAIN))
- }
- }
- }
-
- case RequestContext(HttpRequest(GET, uri, headers, body, protocol), remoteAddress, responder) ⇒
- def withAdminCookieHelper(f: RequestResponder ⇒ Unit): Unit = {
- withAdminCookie(uri, responder, headers, remoteAddress)(f)
- }
-
- //+ Main business logic REST URIs are matched here
- val millis = TimeHelpers.nowMillis()
- uri match {
- case UserBalancePath(userID) ⇒
- // /user/(.+)/balance/?
- callRouter(GetUserBalanceRequest(userID, millis), responder)
-
- case UserStatePath(userId) ⇒
- // /user/(.+)/state/?
- callRouter(GetUserStateRequest(userId, millis), responder)
-
-// case AdminPingAllPath() ⇒
-// withAdminCookieHelper { responder ⇒
-// callRouter(PingAllRequest(), responder)
-// }
-
- case ResourcesPath() ⇒
- withAdminCookieHelper { responder ⇒
- responder.complete(
- stringResponse200("%s\n%s\n%s\n" .format(
- ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
- ResourceLocator.ResourceNames.LOGBACK_XML,
- ResourceLocator.ResourceNames.POLICY_YAML),
- TEXT_PLAIN
- )
- )
- }
-
- case ResourcesAquariumPropertiesPath() ⇒
- withAdminCookieHelper { responder ⇒
- resourceInfoResponse(uri, responder, ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
- }
-
- case ResourcesLogbackXMLPath() ⇒
- withAdminCookieHelper { responder ⇒
- resourceInfoResponse(uri, responder, ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
- }
-
- case ResourcesPolicyYAMLPath() ⇒
- withAdminCookieHelper { responder ⇒
- resourceInfoResponse(uri, responder, ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
- }
-
- case ResourceEventPath(id) ⇒
- withAdminCookieHelper { responder ⇒
- eventInfoResponse(uri, responder, aquarium.resourceEventStore.findResourceEventByID, id)
- }
-
- case IMEventPath(id) ⇒
- withAdminCookieHelper { responder ⇒
- eventInfoResponse(uri, responder, aquarium.imEventStore.findIMEventById, id)
- }
-
- case _ ⇒
- responder.complete(stringResponse(404, "Unknown resource!", TEXT_PLAIN))
- }
- //- Main business logic REST URIs are matched here
-
- case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
- responder.complete(stringResponse(404, "Unknown resource!", TEXT_PLAIN))
-
- case Timeout(method, uri, _, _, _, complete) ⇒ complete {
- HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
- }
- }
-
- private[this]
- def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = {
- val actorProvider = aquarium.actorProvider
- val router = actorProvider.actorForRole(RouterRole)
- val futureResponse = router ask message
-
- futureResponse onComplete {
- future ⇒
- future.value match {
- case None ⇒
- // TODO: Will this ever happen??
- logger.warn("Future did not complete for %s".format(message))
- val statusCode = 500
- responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
-
- case Some(Left(error)) ⇒
- val statusCode = 500
- logger.error("Error %s serving %s: %s".format(statusCode, message, error))
- responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
-
- case Some(Right(actualResponse)) ⇒
- actualResponse match {
- case routerResponse: RouterResponseMessage[_] ⇒
- routerResponse.response match {
- case Left(errorMessage) ⇒
- val statusCode = routerResponse.suggestedHTTPStatus
-
- logger.error("Error %s '%s' serving %s. Internal response: %s".format(
- statusCode,
- errorMessage,
- message,
- actualResponse))
-
- responder.complete(stringResponse(statusCode, errorMessage, TEXT_PLAIN))
-
- case Right(response) ⇒
- responder.complete(
- HttpResponse(
- routerResponse.suggestedHTTPStatus,
- body = routerResponse.responseToJsonString.getBytes("UTF-8"),
- headers = HttpHeader("Content-type", APPLICATION_JSON+";charset=utf-8") ::
- Nil))
- }
-
- case _ ⇒
- val statusCode = 500
- logger.error("Error %s serving %s: Response is: %s".format(statusCode, message, actualResponse))
- responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
- }
- }
- }
- }
-
- ////////////// helpers //////////////
-
- final val defaultHeaders = List(HttpHeader("Content-Type", TEXT_PLAIN))
-
- lazy val serverActor = Actor.registry.actorsFor("spray-can-server").head
-
- def role = RESTRole
-}
\ No newline at end of file
val imEventDebugString = imEvent.toDebugString
- store.findIMEventById(id) match {
+ store.findIMEventByID(id) match {
case Some(_) ⇒
// Reject the duplicate
logger.debug("Rejecting duplicate ID for %s".format(imEventDebugString))
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.service
+
+import gr.grnet.aquarium.util._
+import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
+import com.ckkloverdos.props.Props
+import com.twitter.finagle.{Service, SimpleFilter}
+import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
+import com.twitter.util.{Future ⇒ TFuture, FuturePool ⇒ TFuturePool, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
+import com.twitter.finagle.builder.ServerBuilder
+import com.twitter.finagle.http.Http
+import org.jboss.netty.handler.codec.http.HttpResponseStatus._
+import org.jboss.netty.handler.codec.http.HttpVersion._
+import org.jboss.netty.buffer.ChannelBuffers._
+import org.jboss.netty.util.CharsetUtil._
+import java.net.InetSocketAddress
+import java.util.concurrent.{Executors, TimeUnit}
+import gr.grnet.aquarium.util.date.TimeHelpers
+import org.joda.time.format.ISODateTimeFormat
+import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest, RouterResponseMessage, RouterRequestMessage}
+import gr.grnet.aquarium.actor.RouterRole
+import com.ckkloverdos.resource.StreamResource
+import com.ckkloverdos.maybe.{Just, Failed}
+import gr.grnet.aquarium.event.model.ExternalEventModel
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
+ final val TEXT_PLAIN = "text/plain"
+ final val APPLICATION_JSON = "application/json"
+
+ @volatile private[this] var _port: Int = 8080
+ @volatile private[this] var _shutdownTimeoutMillis: Long = 2000
+ @volatile private[this] var _threadPoolSize: Int = 4
+ @volatile private[this] var _threadPool: TFuturePool = _
+
+ def propertyPrefix = Some(RESTService.Prefix)
+
+ /**
+ * Configure this instance with the provided properties.
+ *
+ * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+ */
+ def configure(props: Props) {
+ this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
+ this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
+
+ this._threadPool = TFuturePool(Executors.newFixedThreadPool(this._threadPoolSize))
+
+ logger.debug("HTTP port is %s".format(this._port))
+ }
+
+ def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
+ val response = new TDefaultHttpResponse(HTTP_1_1, status)
+ response.setContent(copiedBuffer(body, UTF_8))
+ response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
+
+ TFuture.value(response)
+ }
+
+ def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
+ stringResponse(OK, body, contentType)
+ }
+
+ def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
+ stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
+ }
+
+ def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
+ val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
+
+ resource.stringContent.toMaybeEither match {
+ case Just(body) ⇒
+ stringResponseOK(fmt(body), contentType)
+
+ case Failed(e) ⇒
+ throw e
+ }
+ }
+
+ def eventInfoResponse[E <: ExternalEventModel](
+ eventID: String,
+ getter: String ⇒ Option[E]
+ ): TFuture[THttpResponse] = {
+ getter(eventID) match {
+ case Some(event) ⇒
+ stringResponseOK(event.toJsonString, APPLICATION_JSON)
+
+ case None ⇒
+ statusResponse(NOT_FOUND)
+ }
+ }
+
+ final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
+ def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
+ service(request) handle {
+ case error ⇒
+ logger.error("While serving %s".format(request), error)
+ val statusCode = error match {
+ case _: IllegalArgumentException ⇒
+ FORBIDDEN
+ case _ ⇒
+ INTERNAL_SERVER_ERROR
+ }
+
+ val errorResponse = new TDefaultHttpResponse(HTTP_1_1, statusCode)
+ errorResponse.setContent(copiedBuffer(error.getStackTraceString, UTF_8))
+
+ errorResponse
+ }
+ }
+ }
+
+ final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
+ def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
+ if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
+ val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
+ aquarium.adminCookie match {
+ case Some(`headerValue`) ⇒
+ service(request)
+
+ case None ⇒
+ statusResponse(FORBIDDEN)
+ }
+ } else {
+ service(request)
+ }
+ }
+ }
+
+ final case class MainService() extends Service[THttpRequest, THttpResponse] {
+ final class ActorRouterService extends Service[RouterRequestMessage, RouterResponseMessage[_]] {
+ def apply(message: RouterRequestMessage): TFuture[RouterResponseMessage[_]] = {
+ // We want to asynchronously route the message via akka and get the whole computation as a
+ // twitter future.
+ val actorProvider = aquarium.actorProvider
+ val router = actorProvider.actorForRole(RouterRole)
+ val promise = new TPromise[RouterResponseMessage[_]]()
+
+ val actualWork = router.ask(message)
+
+ actualWork onComplete { akkaFuture ⇒
+ akkaFuture.value match {
+ case Some(eitherValue) ⇒
+ eitherValue match {
+ case Left(throwable) ⇒
+ promise.setException(throwable)
+
+ case Right(value) ⇒
+ promise.setValue(value.asInstanceOf[RouterResponseMessage[_]])
+ }
+
+ case None ⇒
+ promise.setException(new Exception("Got no response for %s".format(message)))
+ }
+ }
+
+ promise
+ }
+ }
+
+ final val actorRouterService = new ActorRouterService
+
+ def callRouter(requestMessage: RouterRequestMessage): TFuture[THttpResponse] = {
+ actorRouterService(requestMessage).transform { tryResponse ⇒
+ tryResponse match {
+ case TReturn(responseMessage: RouterResponseMessage[_]) ⇒
+ val statusCode = responseMessage.suggestedHTTPStatus
+ val status = THttpResponseStatus.valueOf(statusCode)
+
+ responseMessage.response match {
+ case Left(errorMessage) ⇒
+ logger.error("Error %s '%s' serving %s. Internal response: %s".format(
+ statusCode,
+ errorMessage,
+ requestMessage,
+ responseMessage))
+
+ stringResponse(status, errorMessage, TEXT_PLAIN)
+
+ case Right(_) ⇒
+ stringResponse(status, responseMessage.toJsonString, APPLICATION_JSON)
+ }
+
+ case TThrow(throwable) ⇒
+ val status = INTERNAL_SERVER_ERROR
+ logger.error("Error %s serving %s: %s".format(
+ status.getReasonPhrase,
+ requestMessage,
+ gr.grnet.aquarium.util.shortInfoOf(throwable)
+ ))
+
+ statusResponse(status)
+ }
+ }
+ }
+
+ def apply(request: THttpRequest): TFuture[THttpResponse] = {
+ val millis = TimeHelpers.nowMillis()
+ val uri = request.getUri
+ val method = request.getMethod
+ logger.debug("%s %s".format(method, uri))
+
+ uri match {
+ case RESTPaths.PingPath() ⇒
+ val now = TimeHelpers.nowMillis()
+ val nowFormatted = ISODateTimeFormat.dateTime().print(now)
+ stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
+
+// case RESTPaths.ResourcesPath() ⇒
+// stringResponseOK("%s\n%s\n%s\n" .format(
+// ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
+// ResourceLocator.ResourceNames.LOGBACK_XML,
+// ResourceLocator.ResourceNames.POLICY_YAML),
+// TEXT_PLAIN)
+
+ case RESTPaths.UserBalancePath(userID) ⇒
+ // /user/(.+)/balance/?
+ callRouter(GetUserBalanceRequest(userID, millis))
+
+ case RESTPaths.UserStatePath(userId) ⇒
+ // /user/(.+)/state/?
+ callRouter(GetUserStateRequest(userId, millis))
+
+ case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
+ resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
+
+ case RESTPaths.ResourcesLogbackXMLPath() ⇒
+ resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
+
+ case RESTPaths.ResourcesPolicyYAMLPath() ⇒
+ resourceInfoResponse(ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
+
+ case RESTPaths.ResourceEventPath(id) ⇒
+ eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
+
+ case RESTPaths.IMEventPath(id) ⇒
+ eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
+
+ case _ ⇒
+ statusResponse(NOT_FOUND)
+ }
+ }
+ }
+
+ val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
+ lazy val server = ServerBuilder().
+ codec(Http()).
+ bindTo(new InetSocketAddress(this._port)).
+ name("HttpServer").
+ build(service)
+
+ def start(): Unit = {
+ logger.info("Starting HTTP on port %s".format(this._port))
+ // Just for the side effect
+ assert(server ne null)
+ }
+
+ def stop(): Unit = {
+ logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
+ server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))
+ }
+}
* or implied, of GRNET S.A.
*/
-package gr.grnet.aquarium.actor
-package service
-package rest
+package gr.grnet.aquarium.service
import gr.grnet.aquarium.ResourceLocator
-
/**
* Paths recognized and served by the REST API.
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
object RESTPaths {
- final val ResourcesPath = "/resources/?".r
+ final val PingPath = "/ping".r
+
+ final val AdminPrefix = "/admin"
private def fixREDot(s: String) = s.replaceAll("""\.""", """\\.""")
- private def toResourcesPath(name: String) = "/resources/%s".format(fixREDot(name)).r
- private def toEventPath(name: String) = "/%s/([^/]+)/?".format(name).r
+ private def toResourcesPath(name: String) = AdminPrefix + "/resources/%s".format(fixREDot(name))
+ private def toEventPath(name: String) = AdminPrefix + "/%s/([^/]+)/?".format(name)
+
+ final val ResourcesPath = (AdminPrefix + "/resources/?").r
- final val ResourcesAquariumPropertiesPath = toResourcesPath(ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES)
+ final val ResourcesAquariumPropertiesPath = toResourcesPath(ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES).r
- final val ResourcesLogbackXMLPath = toResourcesPath(ResourceLocator.ResourceNames.LOGBACK_XML)
+ final val ResourcesLogbackXMLPath = toResourcesPath(ResourceLocator.ResourceNames.LOGBACK_XML).r
- final val ResourcesPolicyYAMLPath = toResourcesPath(ResourceLocator.ResourceNames.POLICY_YAML)
+ final val ResourcesPolicyYAMLPath = toResourcesPath(ResourceLocator.ResourceNames.POLICY_YAML).r
- final val ResourceEventPath = toEventPath("rcevent")
+ final val ResourceEventPath = toEventPath("rcevent").r
- final val IMEventPath = toEventPath("imevent")
+ final val IMEventPath = toEventPath("imevent").r
/**
* Use this URI path to query for the user balance. The parenthesized regular expression part
* Use this URI path to query for the user state.
*/
final val UserStatePath = "/user/([^/]+)/state/?".r
-
- /**
- * Use this administrative URI path to ping all services used by Aquarium.
- */
- final val AdminPingAllPath = "/admin/ping/all/?".r
}
\ No newline at end of file
package gr.grnet.aquarium.service
-import gr.grnet.aquarium.actor.RESTRole
-import _root_.akka.actor._
-import cc.spray.can.{ServerConfig, HttpClient, HttpServer}
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.{Configurable, AquariumAwareSkeleton, Aquarium}
-import com.ckkloverdos.props.Props
-
/**
- * REST service based on Actors and Spray.
*
- * @author Christos KK Loverdos <loverdos@gmail.com>.
+ * @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class RESTActorService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
- private[this] var _port: Int = 8080
- private[this] var _restActor: ActorRef = _
- private[this] var _serverActor: ActorRef = _
- private[this] var _clientActor: ActorRef = _
-
-
- def propertyPrefix = Some(RESTActorService.Prefix)
-
- /**
- * Configure this instance with the provided properties.
- *
- * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
- */
- def configure(props: Props) {
- this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
- logger.debug("HTTP port is %s".format(this._port))
- }
-
- def start(): Unit = {
- logger.info("Starting HTTP on port %s".format(this._port))
- this._restActor = aquarium.actorProvider.actorForRole(RESTRole)
- // Start Spray subsystem
- this._serverActor = Actor.actorOf(new HttpServer(ServerConfig(port = this._port))).start()
- this._clientActor = Actor.actorOf(new HttpClient()).start()
- }
-
- def stop(): Unit = {
- logger.info("Stopping HTTP on port %s".format(this._port))
-
- this._serverActor.stop()
- this._clientActor.stop()
- }
-}
-
-object RESTActorService {
+object RESTService {
final val Prefix = "rest"
-}
\ No newline at end of file
+}
// Always set Router at the end.
// We could definitely use some automatic dependency sorting here (topological sorting anyone?)
final val RolesToBeStarted = List(
- // ResourceProcessorRole,
- RESTRole,
PingerRole,
RouterRole)
def insertIMEvent(event: IMEventModel): IMEvent
/**
- * Find a user event by event ID
+ * Find an event by its ID
*/
- def findIMEventById(id: String): Option[IMEvent]
+ def findIMEventByID(id: String): Option[IMEvent]
/**
* Find the `CREATE` even for the given user. Note that there must be only one such event.
localEvent
}
- def findIMEventById(id: String) = imEventById.get(id)
+ def findIMEventByID(id: String) = imEventById.get(id)
/**
localEvent
}
- def findIMEventById(id: String): Option[IMEvent] = {
+ def findIMEventByID(id: String): Option[IMEvent] = {
MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
}
# Actor subsystem
actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService
# Class that initializes the REST service
-rest.service.class=gr.grnet.aquarium.service.RESTActorService
+rest.service.class=gr.grnet.aquarium.service.FinagleRESTService
+rest.shutdown.timeout.millis=2000
# Store subsystem
store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
# Override the user store (if present, it will not be given by the store provider above)
<logger name="com.ckkloverdos" level="DEBUG"/>
- <logger name="cc.spray.can" level="DEBUG"/>
-
<logger name="gr.grnet" level="DEBUG"/>
<root level="DEBUG">
}
private[this] def _testPropertyTrue(name: String): Boolean = {
- // A property is true if it is given without a value (-Dtest.enable.spray) or it is given
- // with a value that corresponds to true (-Dtest.enable.spray=true)
+ // A property is true if it is given without a value (-Dtest.enable.store) or it is given
+ // with a value that corresponds to true (-Dtest.enable.store=true)
SysProp(name).value.map(_checkValue(_, true)).getOr(false)
}
def EnableRabbitMQTests = isPropertyEnabled(PropertyNames.TestEnableRabbitMQ)
def EnableStoreTests = isPropertyEnabled(PropertyNames.TestEnableStore)
def EnablePerfTests = isPropertyEnabled(PropertyNames.TestEnablePerf)
- def EnableSprayTests = isPropertyEnabled(PropertyNames.TestEnableSpray)
def propertyValue(name: String) = SysProp(name).rawValue
}
\ No newline at end of file
val TestEnableRabbitMQ = "test.enable.rabbitmq"
val TestEnableStore = "test.enable.store"
val TestEnablePerf = "test.enable.perf"
- val TestEnableSpray = "test.enable.spray"
val TestEnableAll = "test.enable.all"
-
- // Define which store implementation to use. Overrides
- // values in aquarium.properties.
- val TestStore = "test.store"
}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.rest.actor
-
-import org.junit.Test
-import org.junit.Assert._
-import org.junit.Assume.assumeTrue
-
-import cc.spray.can.HttpMethods.{GET}
-import cc.spray.can.HttpClient._
-import cc.spray.can.HttpClient.{HttpDialog ⇒ SprayHttpDialog}
-import cc.spray.can.{HttpResponse, HttpHeader, HttpRequest}
-import gr.grnet.aquarium.util.makeString
-import gr.grnet.aquarium.converter.StdConverters
-import net.liftweb.json.JsonAST.{JValue, JInt}
-import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, AquariumException, LogicTestsAssumptions, Aquarium}
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RESTActorTest {
- @Test
- def testPing: Unit = {
- assumeTrue(LogicTestsAssumptions.EnableSprayTests)
-
- // Initialize configuration subsystem
- val aquarium = new AquariumBuilder(ResourceLocator.AquariumProperties).build()
- aquarium.start()
-
- val port = aquarium.restPort
- val dialog = SprayHttpDialog("localhost", port)
-
- val pingReq = HttpRequest(method = GET, uri = "/ping", headers = HttpHeader("Content-Type", "text/plain; charset=UTF-8")::Nil)
- dialog.send(pingReq).end onComplete { futureResp ⇒
- futureResp.value match {
- case Some(Right(HttpResponse(status, _, bytesBody, _))) ⇒
- assertTrue("Status 200 OK", status == 200)
- val stringBody = makeString(bytesBody)
- val jValue = StdConverters.AllConverters.convertEx[JValue](stringBody)
- println("!! Got stringBody = %s".format(stringBody))
- // Note that the response is in JSON format, so must parse it
- println("!! ==> jValue = %s".format(jValue))
- val pongValue = jValue \ "pong"
- println("!! ==> pongValue = %s".format(pongValue))
- assertTrue("pong Int in response", pongValue.isInstanceOf[JInt])
- case Some(Left(error)) ⇒
- fail("Got error: %s".format(error.getMessage))
- case None ⇒
- fail("Got nothing")
- }
- }
-
- aquarium.stopAfterMillis(1000)
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.rest.akka.service
-
-import org.junit.Test
-import cc.spray.can.HttpClient._
-
-import cc.spray.can.HttpMethods.GET
-import gr.grnet.aquarium.util.Loggable
-import org.junit.Assume._
-import gr.grnet.aquarium.LogicTestsAssumptions
-import cc.spray.can._
-import akka.actor.Actor
-import org.slf4j.LoggerFactory
-import akka.config.{Config ⇒ AkkaConfig}
-
-class SprayPingService(_id: String = "spray-root-service") extends Actor {
- private[this] val logger = LoggerFactory.getLogger(getClass)
-
- self.id = _id
-
- protected def receive = {
- case RequestContext(HttpRequest(GET, "/", _, _, _), _, responder) =>
- responder.complete(index)
-
- case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) =>
- responder.complete(response("PONG!"))
-
- case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) => {
- (serverActor ? GetStats).mapTo[Stats].onComplete {
- future =>
- future.value.get match {
- case Right(stats) => responder.complete {
- response {
- "Uptime : " + (stats.uptime / 1000.0) + " sec\n" +
- "Requests dispatched : " + stats.requestsDispatched + '\n' +
- "Requests timed out : " + stats.requestsTimedOut + '\n' +
- "Requests open : " + stats.requestsOpen + '\n' +
- "Open connections : " + stats.connectionsOpen + '\n'
- }
- }
- case Left(ex) => responder.complete(response("Couldn't get server stats due to " + ex, status = 500))
- }
- }
- }
-
- case RequestContext(HttpRequest(_, _, _, _, _), _, responder) =>
- responder.complete(response("Unknown resource!", 404))
-
- case Timeout(method, uri, _, _, _, complete) => complete {
- HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
- }
- }
-
- ////////////// helpers //////////////
-
- val defaultHeaders = List(HttpHeader("Content-Type", "text/plain"))
-
- lazy val serverActor = Actor.registry.actorsFor("spray-can-server").head
-
- def response(msg: String, status: Int = 200) = HttpResponse(status, defaultHeaders, msg.getBytes("ISO-8859-1"))
-
- lazy val index = HttpResponse(
- headers = List(HttpHeader("Content-Type", "text/html")),
- body =
- <html>
- <body>
- <h1>Say hello to
- <i>spray-can</i>
- !</h1>
- <p>Defined resources:</p>
- <ul>
- <li>
- <a href="/ping">/ping</a>
- </li>
- <li>
- <a href="/stats">/stats</a>
- </li>
- </ul>
- </body>
- </html>.toString.getBytes("UTF-8")
- )
-}
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class SprayPingServiceTest extends Loggable {
- final val Port = 8888
-
- @Test
- def testPing: Unit = {
- assumeTrue(LogicTestsAssumptions.EnableSprayTests)
-
- val service = Actor.actorOf(new SprayPingService("spray-root-service")).start()
- val server = Actor.actorOf(new HttpServer(ServerConfig(port = Port))).start()
- val client = Actor.actorOf(new HttpClient(ClientConfig())).start()
-
- val dialog = HttpDialog("localhost", Port)
- val result = dialog.send(HttpRequest(method = GET, uri = "/ping", headers = HttpHeader("Content-Type", "text/plain; charset=UTF-8")::Nil)).end
- result onComplete { future =>
- future.value match {
- case Some(Right(response)) =>
- logger.info("Response class : %s".format(response.getClass))
- logger.info("Response status : %s".format(response.status))
- logger.info("Response headers: %s".format(response.headers.map(hh => (hh.name, hh.value)).mkString(", ")))
- logger.info("Response body : %s".format(response.bodyAsString))
- case other =>
- logger.error("Error: %s".format(other))
- }
-
- server.stop()
- client.stop()
- service.stop()
- }
-
- Thread sleep 100
- }
-}
\ No newline at end of file