import cc.spray.can.HttpMethods.{GET, POST}
import cc.spray.can._
-import akka.actor.Actor
import gr.grnet.aquarium.util.Loggable
import net.liftweb.json.JsonAST.JValue
import net.liftweb.json.{JsonAST, Printer}
+import gr.grnet.aquarium.MasterConf
+import gr.grnet.aquarium.actor.DispatcherRole
+import akka.actor.{ActorRef, Actor}
+import gr.grnet.aquarium.processor.actor.{RESTResponse, RESTRequest}
/**
* Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
}
protected def receive = {
- case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) =>
+ case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) ⇒
responder.complete(stringResponse200("{pong: %s}".format(System.currentTimeMillis())))
- case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) => {
+ case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) ⇒ {
(serverActor ? GetStats).mapTo[Stats].onComplete {
future =>
future.value.get match {
}
}
- case RequestContext(HttpRequest(POST, "/events", _, _, _), _, responder) =>
+ case RequestContext(HttpRequest(post@POST, "/events", headers, body, protocol), _, responder) ⇒
// POST events here.
- // FIXME: implement
- responder.complete(stringResponse200("{pong: %s}".format(System.currentTimeMillis())))
+ val masterConf = MasterConf.MasterConf
+ val actorProvider = masterConf.actorProvider
+ val dispatcher = actorProvider.actorForRole(DispatcherRole)
+ val headersMap = headers map { h => (h.name, h.value) } toMap
+ val futureResponse = dispatcher ask RESTRequest("POST", "/events", headersMap, body)
+
+ futureResponse onComplete { fr ⇒
+ fr.value match {
+ case None ⇒
+ // TODO: Will this ever happen??
+ case Some(Left(throwable)) ⇒
+ // TODO: Log something here and give back some more detailed info
+ responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+ case Some(Right(actualResponse)) ⇒
+ actualResponse match {
+ case RESTResponse(status, headers, body) ⇒
+ responder complete {
+ HttpResponse(
+ status,
+ headers map { case (k, v) => HttpHeader(k, v)} toList,
+ body
+ )
+ }
+ case unknownResponse ⇒
+ // TODO: Log something here and give back some more detailed info
+ responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
+ }
+ }
+ }
- case RequestContext(HttpRequest(_, _, _, _, _), _, responder) =>
+ case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
responder.complete(stringResponse(404, "Unknown resource!", "text/plain"))
- case Timeout(method, uri, _, _, _, complete) => complete {
+ case Timeout(method, uri, _, _, _, complete) ⇒ complete {
HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
}
}