<type>zip</type>
<scope>provided</scope>
</dependency>
+
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.9.1</artifactId>
<version>2.4-M5</version>
</dependency>
+
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json-ext_2.9.1</artifactId>
<version>2.4-M5</version>
</dependency>
+
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.9</version>
</dependency>
+
<dependency>
<groupId>com.kenai.crontab-parser</groupId>
<artifactId>crontab-parser</artifactId>
<version>1.0.1</version>
</dependency>
+
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.5.0</version>
</dependency>
+
<dependency>
<groupId>com.ckkloverdos</groupId>
<artifactId>streamresource_2.9.1</artifactId>
package gr.grnet.aquarium
-import actor.ActorProvider
+import actor.{DispatcherRole, ActorProvider}
import com.ckkloverdos.resource._
import com.ckkloverdos.sys.SysProp
import com.ckkloverdos.props.Props
import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
+import processor.actor.ConfigureDispatcher
import rest.RESTService
/**
def defaultClassLoader = Thread.currentThread().getContextClassLoader
def startServices(): Unit = {
- _actorProvider.start()
_restService.start()
+ _actorProvider.start()
+
+ _actorProvider.actorForRole(DispatcherRole) ! ConfigureDispatcher(this)
}
def stopServices(): Unit = {
}
object SimpleLocalActorProvider {
- final val KnownRoles = List(DispatcherRole, ResourceProcessorRole, RESTRole)
+ // Always set Dispatcher at the end.
+ final val KnownRoles = List(
+ ResourceProcessorRole,
+ RESTRole,
+ DispatcherRole)
lazy val ActorClassByRole: Map[ActorRole, Class[_ <: AquariumActor]] =
KnownRoles map { role ⇒
package gr.grnet.aquarium.processor.actor
-import gr.grnet.aquarium.logic.events.ResourceEvent
-import akka.actor.ActorRef
-import gr.grnet.aquarium.actor.{DispatcherRole, AquariumActor}
+import gr.grnet.aquarium.actor.{ActorProvider, DispatcherRole, AquariumActor}
+import gr.grnet.aquarium.util.Loggable
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-class DispatcherActor extends AquariumActor {
- def role = DispatcherRole
+class DispatcherActor extends AquariumActor with Loggable {
+ private[this] var _actorProvider: ActorProvider = _
- def resourceEventProcessor: ActorRef =
- AquariumActorFactory.get.makeResourceEventProcessor
+ def role = DispatcherRole
protected def receive = {
- case message: ResourceEvent =>
- // Dispatch to resource processor
- resourceEventProcessor forward message
+ case ConfigureDispatcher(masterConf) ⇒
+ this._actorProvider = masterConf.actorProvider
+ logger.info("Received actorProvider = %s".format(this._actorProvider))
+
+ case UserBalanceRequest(userId) ⇒
+ self reply UserBalanceResponse(userId, 1000.0)
+
}
}
\ No newline at end of file
package gr.grnet.aquarium.processor.actor
+import gr.grnet.aquarium.MasterConf
+
/**
* This is the base class of the messages the Dispatcher understands.
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-sealed trait DispatcherMessage
+sealed trait DispatcherMessage {
+ def bodyToJson: String = "{}"
+ def isError: Boolean = false
+}
+
+case class ConfigureDispatcher(masterConf: MasterConf) extends DispatcherMessage
+
+case class UserBalanceRequest(userId: String) extends DispatcherMessage
+case class UserBalanceResponse(userId: String, balance: Double) extends DispatcherMessage
-/// REST-related messages
-case class RESTRequest(method: String, uri: String, headers: Map[String, String], body: Array[Byte]) extends DispatcherMessage
-case class RESTResponse(status: Int, headers: Map[String, String], body: Array[Byte]) extends DispatcherMessage
\ No newline at end of file
import net.liftweb.json.{JsonAST, Printer}
import gr.grnet.aquarium.MasterConf
import akka.actor.{ActorRef, Actor}
-import gr.grnet.aquarium.processor.actor.{RESTResponse, RESTRequest}
import gr.grnet.aquarium.actor.{RESTRole, AquariumActor, DispatcherRole}
+import RESTPaths.{UserBalance}
+import gr.grnet.aquarium.processor.actor.{UserBalanceRequest, DispatcherMessage}
/**
* Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
}
}
- case RequestContext(HttpRequest(post@POST, "/events", headers, body, protocol), _, responder) ⇒
- // POST events here.
- val masterConf = MasterConf.MasterConf
- val actorProvider = masterConf.actorProvider
- val dispatcher = actorProvider.actorForRole(DispatcherRole)
- val headersMap = headers map { h => (h.name, h.value) } toMap
- val futureResponse = dispatcher ask RESTRequest("POST", "/events", headersMap, body)
-
- futureResponse onComplete { fr ⇒
- fr.value match {
- case None ⇒
- // TODO: Will this ever happen??
- case Some(Left(throwable)) ⇒
- // TODO: Log something here and give back some more detailed info
- responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
- case Some(Right(actualResponse)) ⇒
- actualResponse match {
- case RESTResponse(status, headers, body) ⇒
- responder complete {
- HttpResponse(
- status,
- headers map { case (k, v) => HttpHeader(k, v)} toList,
- body
- )
- }
- case unknownResponse ⇒
- // TODO: Log something here and give back some more detailed info
- responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
- }
- }
+ case RequestContext(HttpRequest(GET, uri, headers, body, protocol), _, responder) ⇒
+ uri match {
+ case UserBalance(userId) ⇒
+ callDispatcher(UserBalanceRequest(userId), responder)
+ case _ ⇒
+ responder.complete(stringResponse(404, "Unknown resource!", "text/plain"))
}
case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
}
}
+
+ def callDispatcher(message: DispatcherMessage, responder: RequestResponder): Unit = {
+ val masterConf = MasterConf.MasterConf
+ val actorProvider = masterConf.actorProvider
+ val dispatcher = actorProvider.actorForRole(DispatcherRole)
+ val futureResponse = dispatcher ask message
+
+ futureResponse onComplete { future ⇒
+ future.value match {
+ case None ⇒
+ // TODO: Will this ever happen??
+ case Some(Left(error)) ⇒
+ logger.error("Error serving %s: %s".format(message, error))
+ responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+ case Some(Right(actualResponse)) ⇒
+ actualResponse match {
+ case dispatcherResponse: DispatcherMessage if(!dispatcherResponse.isError) ⇒
+ responder.complete(HttpResponse(status = 200, body = dispatcherResponse.bodyToJson.getBytes("UTF-8"), headers = HttpHeader("Content-type", "application/json;charset=utf-8") :: Nil))
+ case dispatcherResponse: DispatcherMessage ⇒
+ logger.error("Error serving %s: Dispatcher response is: %s".format(message, actualResponse))
+ responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+ case _ ⇒
+ logger.error("Error serving %s: Dispatcher response is: %s".format(message, actualResponse))
+ responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+ }
+ }
+ }
+ }
////////////// helpers //////////////
val defaultHeaders = List(HttpHeader("Content-Type", "text/plain"))
--- /dev/null
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.rest.actor
+
+/**
+ * Paths recognized and served by the REST API.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+object RESTPaths {
+ final val UserBalance = "/user/(.+)/balance".r
+}
\ No newline at end of file
override def parallelExecution = false
val repo0 = "aquarium nexus" at "http://aquarium.dev.grnet.gr:8081/nexus/content/groups/public"
- //val repo1 = "java.net.maven2" at "http://download.java.net/maven/2/"
- //val repo2 = "EclipseLink Repo" at "http://download.eclipse.org/rt/eclipselink/maven.repo"
- //val repo3 = "jboss" at "http://repository.jboss.org/nexus/content/groups/public/"
- //val repo4 = "sonatype" at "http://oss.sonatype.org/content/groups/public/"
- //val repo5 = "jcrontab" at "http://kenai.com/projects/crontab-parser/sources/maven-repo/content/"
- // val repo6 = "typsafe" at "http://repo.typesafe.com/typesafe/releases/"
- // val repo7 = "akka" at "http://akka.io/repository/"
- // repo8 = "twitter" at "http://maven.twttr.com"
+ val repo1 = "java.net.maven2" at "http://download.java.net/maven/2/"
+ val repo2 = "EclipseLink Repo" at "http://download.eclipse.org/rt/eclipselink/maven.repo"
+ val repo3 = "jboss" at "http://repository.jboss.org/nexus/content/groups/public/"
+ val repo4 = "sonatype" at "http://oss.sonatype.org/content/groups/public/"
+ val repo5 = "jcrontab" at "http://kenai.com/projects/crontab-parser/sources/maven-repo/content/"
+ val repo6 = "typsafe" at "http://repo.typesafe.com/typesafe/releases/"
+ val repo7 = "akka" at "http://akka.io/repository/"
+ val repo8 = "twitter" at "http://maven.twttr.com"
+ val repo9 = "tools-snapshots" at "http://scala-tools.org/repo-snapshots"
val lib_slf4j = "org.slf4j" % "slf4j-api" % "1.6.1" withSources()
val lib_h2 = "com.h2database" % "h2" % "1.3.160" withSources()
val lib_javaxrs = "javax.ws.rs" % "jsr311-api" % "1.1.1" withSources()
val lib_spray_can = "cc.spray.can" % "spray-can" % "0.9.2-SNAPSHOT" withSources()
+ val lib_spray_server= "cc.spray.can" % "spray-server" % "0.9.0-SNAPSHOT" withSources()
val lib_converter = "com.ckkloverdos" % "converter_2.9.1" % "0.3.0" withSources()
val lib_streamresource = "com.ckkloverdos" % "streamresource_2.9.1" % "0.2.0" withSources()