Thought untested, the REST service now sends to Dispatcher.
authorChristos KK Loverdos <loverdos@gmail.com>
Fri, 9 Dec 2011 14:51:31 +0000 (16:51 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Fri, 9 Dec 2011 14:52:04 +0000 (16:52 +0200)
logic/src/main/scala/gr/grnet/aquarium/rest/akka/service/AquariumRESTService.scala

index fa79336..44450e4 100644 (file)
@@ -37,10 +37,13 @@ package gr.grnet.aquarium.rest.akka.service
 
 import cc.spray.can.HttpMethods.{GET, POST}
 import cc.spray.can._
-import akka.actor.Actor
 import gr.grnet.aquarium.util.Loggable
 import net.liftweb.json.JsonAST.JValue
 import net.liftweb.json.{JsonAST, Printer}
+import gr.grnet.aquarium.MasterConf
+import gr.grnet.aquarium.actor.DispatcherRole
+import akka.actor.{ActorRef, Actor}
+import gr.grnet.aquarium.processor.actor.{RESTResponse, RESTRequest}
 
 /**
  * Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
@@ -68,10 +71,10 @@ class AquariumRESTService(_id: String = "spray-root-service", version: String) e
   }
 
   protected def receive = {
-    case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) =>
+    case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) ⇒
       responder.complete(stringResponse200("{pong: %s}".format(System.currentTimeMillis())))
 
-    case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) => {
+    case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) ⇒ {
       (serverActor ? GetStats).mapTo[Stats].onComplete {
         future =>
           future.value.get match {
@@ -89,15 +92,42 @@ class AquariumRESTService(_id: String = "spray-root-service", version: String) e
       }
     }
 
-    case RequestContext(HttpRequest(POST, "/events", _, _, _), _, responder) =>
+    case RequestContext(HttpRequest(post@POST, "/events", headers, body, protocol), _, responder) ⇒
       // POST events here.
-      // FIXME: implement
-      responder.complete(stringResponse200("{pong: %s}".format(System.currentTimeMillis())))
+      val masterConf = MasterConf.MasterConf
+      val actorProvider = masterConf.actorProvider
+      val dispatcher = actorProvider.actorForRole(DispatcherRole)
+      val headersMap = headers map { h => (h.name, h.value) } toMap
+      val futureResponse = dispatcher ask RESTRequest("POST", "/events", headersMap, body)
+
+      futureResponse onComplete { fr ⇒
+        fr.value match {
+          case None ⇒
+            // TODO: Will this ever happen??
+          case Some(Left(throwable)) ⇒
+            // TODO: Log something here and give back some more detailed info
+            responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+          case Some(Right(actualResponse)) ⇒
+            actualResponse match {
+              case RESTResponse(status, headers, body) ⇒
+                responder complete {
+                  HttpResponse(
+                    status,
+                    headers map { case (k, v) => HttpHeader(k, v)} toList,
+                    body
+                  )
+                }
+              case unknownResponse ⇒
+                // TODO: Log something here and give back some more detailed info
+                responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+            }
+        }
+      }
 
-    case RequestContext(HttpRequest(_, _, _, _, _), _, responder) =>
+    case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
       responder.complete(stringResponse(404, "Unknown resource!", "text/plain"))
 
-    case Timeout(method, uri, _, _, _, complete) => complete {
+    case Timeout(method, uri, _, _, _, complete) ⇒ complete {
       HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
     }
   }