Work in progres towards handling UserRequestGetBalance end-to-end.
[aquarium] / logic / src / main / scala / gr / grnet / aquarium / rest / actor / RESTActor.scala
1 package gr.grnet.aquarium.rest.actor
2
3 /*
4  * Copyright 2011 GRNET S.A. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or
7  * without modification, are permitted provided that the following
8  * conditions are met:
9  *
10  *   1. Redistributions of source code must retain the above
11  *      copyright notice, this list of conditions and the following
12  *      disclaimer.
13  *
14  *   2. Redistributions in binary form must reproduce the above
15  *      copyright notice, this list of conditions and the following
16  *      disclaimer in the documentation and/or other materials
17  *      provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  *
32  * The views and conclusions contained in the software and
33  * documentation are those of the authors and should not be
34  * interpreted as representing official policies, either expressed
35  * or implied, of GRNET S.A.
36  */
37
38 import cc.spray.can.HttpMethods.{GET, POST}
39 import cc.spray.can._
40 import gr.grnet.aquarium.util.Loggable
41 import net.liftweb.json.JsonAST.JValue
42 import net.liftweb.json.{JsonAST, Printer}
43 import gr.grnet.aquarium.MasterConf
44 import akka.actor.{ActorRef, Actor}
45 import gr.grnet.aquarium.actor.{RESTRole, AquariumActor, DispatcherRole}
46 import RESTPaths.{UserBalance}
47 import gr.grnet.aquarium.processor.actor.{UserRequestGetBalance, DispatcherMessage}
48
49 /**
50  * Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
51  *
52  * @author Christos KK Loverdos <loverdos@gmail.com>.
53  */
54 class RESTActor(_id: String) extends AquariumActor with Loggable {
55   def this() = this("spray-root-service")
56
57   self.id = _id
58
59   private def jsonResponse200(body: JValue, pretty: Boolean = false): HttpResponse = {
60     val stringBody = Printer.pretty(JsonAST.render(body))
61     stringResponse200(stringBody, "application/json")
62   }
63   
64   private def stringResponse(status: Int, stringBody: String, contentType: String = "application/json"): HttpResponse = {
65     HttpResponse(
66       status,
67       HttpHeader("Content-type", "%s;charset=utf-8".format(contentType)) :: Nil,
68       stringBody.getBytes("UTF-8")
69     )
70   }
71
72   private def stringResponse200(stringBody: String, contentType: String = "application/json"): HttpResponse = {
73     stringResponse(200, stringBody, contentType)
74   }
75
76   protected def receive = {
77     case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) ⇒
78       responder.complete(stringResponse200("{\"pong\": %s}".format(System.currentTimeMillis())))
79
80     case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) ⇒ {
81       (serverActor ? GetStats).mapTo[Stats].onComplete { future =>
82           future.value.get match {
83             case Right(stats) => responder.complete {
84               stringResponse200 (
85                 "Uptime              : " + (stats.uptime / 1000.0) + " sec\n" +
86                   "Requests dispatched : " + stats.requestsDispatched + '\n' +
87                   "Requests timed out  : " + stats.requestsTimedOut + '\n' +
88                   "Requests open       : " + stats.requestsOpen + '\n' +
89                   "Open connections    : " + stats.connectionsOpen + '\n'
90               )
91             }
92             case Left(ex) => responder.complete(stringResponse(500, "Couldn't get server stats due to " + ex, "text/plain"))
93           }
94       }
95     }
96
97     case RequestContext(HttpRequest(GET, uri, headers, body, protocol), _, responder) ⇒
98       uri match {
99         case UserBalance(userId) ⇒
100           callDispatcher(UserRequestGetBalance(userId, System.currentTimeMillis()), responder)
101         case _ ⇒
102           responder.complete(stringResponse(404, "Unknown resource!", "text/plain"))
103       }
104
105     case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
106       responder.complete(stringResponse(404, "Unknown resource!", "text/plain"))
107
108     case Timeout(method, uri, _, _, _, complete) ⇒ complete {
109       HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
110     }
111   }
112
113
114   def callDispatcher(message: DispatcherMessage, responder: RequestResponder): Unit = {
115     val masterConf = MasterConf.MasterConf
116     val actorProvider = masterConf.actorProvider
117     val dispatcher = actorProvider.actorForRole(DispatcherRole)
118     val futureResponse = dispatcher ask message
119
120     futureResponse onComplete { future ⇒
121         future.value match {
122           case None ⇒
123           // TODO: Will this ever happen??
124           case Some(Left(error)) ⇒
125             logger.error("Error serving %s: %s".format(message, error))
126             responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
127           case Some(Right(actualResponse)) ⇒
128             actualResponse match {
129               case dispatcherResponse: DispatcherMessage if(!dispatcherResponse.isError) ⇒
130                 responder.complete(HttpResponse(status = 200, body = dispatcherResponse.bodyToJson.getBytes("UTF-8"), headers = HttpHeader("Content-type", "application/json;charset=utf-8") :: Nil))
131               case dispatcherResponse: DispatcherMessage ⇒
132                 logger.error("Error serving %s: Dispatcher response is: %s".format(message, actualResponse))
133                 responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
134               case _ ⇒
135                 logger.error("Error serving %s: Dispatcher response is: %s".format(message, actualResponse))
136                 responder.complete(stringResponse(500, "Internal Server Error", "text/plain"))
137             }
138         }
139     }
140   }
141   ////////////// helpers //////////////
142
143   val defaultHeaders = List(HttpHeader("Content-Type", "text/plain"))
144
145   lazy val serverActor = Actor.registry.actorsFor("spray-can-server").head
146
147   def role = RESTRole
148 }