Refactor Aquarium to make it more configurable
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / rest / RESTActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package rest
39
40 import cc.spray.can.HttpMethods.GET
41 import cc.spray.can._
42 import gr.grnet.aquarium.util.Loggable
43 import gr.grnet.aquarium.util.shortInfoOf
44 import akka.actor.Actor
45 import gr.grnet.aquarium.actor.{RESTRole, RoleableActor, RouterRole}
46 import RESTPaths._
47 import gr.grnet.aquarium.util.date.TimeHelpers
48 import org.joda.time.format.ISODateTimeFormat
49 import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, GetUserBalanceRequest}
50 import gr.grnet.aquarium.{ResourceLocator, Aquarium}
51 import com.ckkloverdos.resource.StreamResource
52 import com.ckkloverdos.maybe.Failed
53 import java.net.InetAddress
54 import gr.grnet.aquarium.event.model.ExternalEventModel
55
56 /**
57  * Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
58  *
59  * @author Christos KK Loverdos <loverdos@gmail.com>.
60  */
61 class RESTActor private(_id: String) extends RoleableActor with Loggable {
62   def this() = this("spray-root-service")
63
64   self.id = _id
65
66   final val TEXT_PLAIN       = "text/plain"
67   final val APPLICATION_JSON = "application/json"
68
69   private def stringResponse(status: Int, stringBody: String, contentType: String): HttpResponse = {
70     HttpResponse(
71       status,
72       HttpHeader("Content-type", "%s;charset=utf-8".format(contentType)) :: Nil,
73       stringBody.getBytes("UTF-8")
74     )
75   }
76
77   private def resourceInfoResponse(
78       uri: String,
79       responder: RequestResponder,
80       resource: StreamResource,
81       contentType: String
82   ): Unit = {
83
84     val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
85     val res = resource.mapString(body ⇒ responder.complete(stringResponse(200, fmt(body), contentType)))
86
87     res match {
88       case Failed(e) ⇒
89         logger.error("While serving %s".format(uri), e)
90         responder.complete(stringResponse(501, "Internal Server Error: %s".format(shortInfoOf(e)), TEXT_PLAIN))
91
92       case _ ⇒
93
94     }
95   }
96
97   private def eventInfoResponse[E <: ExternalEventModel](
98       uri: String,
99       responder: RequestResponder,
100       getter: String ⇒ Option[E],
101       eventID: String
102   ): Unit = {
103
104     val toSend = getter.apply(eventID) match {
105      case Some(event) ⇒
106        (200, event.toJsonString, APPLICATION_JSON)
107
108      case None ⇒
109        (404, "Event not found", TEXT_PLAIN)
110    }
111
112    responder.complete(stringResponse(toSend._1, toSend._2, toSend._3))
113   }
114
115   def withAdminCookie(
116       uri: String,
117       responder: RequestResponder,
118       headers: List[HttpHeader],
119       remoteAddress: InetAddress
120   )(  f: RequestResponder ⇒ Unit): Unit = {
121
122     aquarium.adminCookie match {
123       case Some(adminCookie) ⇒
124         headers.find(_.name.toLowerCase == Aquarium.HTTP.RESTAdminHeaderNameLowerCase) match {
125           case Some(cookieHeader) if(cookieHeader.value == adminCookie) ⇒
126             try f(responder)
127             catch {
128               case e: Throwable ⇒
129                 logger.error("While serving %s".format(uri), e)
130                 responder.complete(stringResponse(501, "Internal Server Error: %s".format(shortInfoOf(e)), TEXT_PLAIN))
131             }
132
133           case Some(cookieHeader) ⇒
134             logger.warn("Admin request %s with bad cookie '%s' from %s".format(uri, cookieHeader.value, remoteAddress))
135             responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
136
137           case None ⇒
138             logger.warn("Admin request %s with no cookie from %s".format(uri, remoteAddress))
139             responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
140         }
141
142       case None ⇒
143         responder.complete(stringResponse(403, "Forbidden!", TEXT_PLAIN))
144     }
145   }
146
147   private def stringResponse200(stringBody: String, contentType: String): HttpResponse = {
148     stringResponse(200, stringBody, contentType)
149   }
150
151   protected def receive = {
152     case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) ⇒
153       val now = TimeHelpers.nowMillis()
154       val nowFormatted = ISODateTimeFormat.dateTime().print(now)
155       responder.complete(stringResponse200("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN))
156
157     case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) ⇒ {
158       (serverActor ? GetStats).mapTo[Stats].onComplete {
159         future =>
160           future.value.get match {
161             case Right(stats) => responder.complete {
162               stringResponse200(
163                 "Uptime              : " + (stats.uptime / 1000.0) + " sec\n" +
164                 "Requests dispatched : " + stats.requestsDispatched + '\n' +
165                 "Requests timed out  : " + stats.requestsTimedOut + '\n' +
166                 "Requests open       : " + stats.requestsOpen + '\n' +
167                 "Open connections    : " + stats.connectionsOpen + '\n',
168                 TEXT_PLAIN
169               )
170             }
171             case Left(ex) => responder.complete(stringResponse(500, "Couldn't get server stats due to " + ex, TEXT_PLAIN))
172           }
173       }
174     }
175
176     case RequestContext(HttpRequest(GET, uri, headers, body, protocol), remoteAddress, responder) ⇒
177       def withAdminCookieHelper(f: RequestResponder ⇒ Unit): Unit = {
178         withAdminCookie(uri, responder, headers, remoteAddress)(f)
179       }
180
181       //+ Main business logic REST URIs are matched here
182       val millis = TimeHelpers.nowMillis()
183       uri match {
184         case UserBalancePath(userID) ⇒
185           // /user/(.+)/balance/?
186           callRouter(GetUserBalanceRequest(userID, millis), responder)
187
188         case UserStatePath(userId) ⇒
189           // /user/(.+)/state/?
190           callRouter(GetUserStateRequest(userId, millis), responder)
191
192 //        case AdminPingAllPath() ⇒
193 //          withAdminCookieHelper { responder ⇒
194 //            callRouter(PingAllRequest(), responder)
195 //          }
196
197         case ResourcesPath() ⇒
198           withAdminCookieHelper { responder ⇒
199             responder.complete(
200               stringResponse200("%s\n%s\n%s\n" .format(
201                   ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
202                   ResourceLocator.ResourceNames.LOGBACK_XML,
203                   ResourceLocator.ResourceNames.POLICY_YAML),
204                 TEXT_PLAIN
205               )
206             )
207           }
208
209         case ResourcesAquariumPropertiesPath() ⇒
210           withAdminCookieHelper { responder ⇒
211             resourceInfoResponse(uri, responder, ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
212           }
213
214         case ResourcesLogbackXMLPath() ⇒
215           withAdminCookieHelper { responder ⇒
216             resourceInfoResponse(uri, responder, ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
217           }
218
219         case ResourcesPolicyYAMLPath() ⇒
220           withAdminCookieHelper { responder ⇒
221             resourceInfoResponse(uri, responder, ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
222           }
223
224         case ResourceEventPath(id) ⇒
225           withAdminCookieHelper { responder ⇒
226             eventInfoResponse(uri, responder, aquarium.resourceEventStore.findResourceEventByID, id)
227           }
228
229         case IMEventPath(id) ⇒
230           withAdminCookieHelper { responder ⇒
231             eventInfoResponse(uri, responder, aquarium.imEventStore.findIMEventById, id)
232           }
233
234         case _ ⇒
235           responder.complete(stringResponse(404, "Unknown resource!", TEXT_PLAIN))
236       }
237     //- Main business logic REST URIs are matched here
238
239     case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
240       responder.complete(stringResponse(404, "Unknown resource!", TEXT_PLAIN))
241
242     case Timeout(method, uri, _, _, _, complete) ⇒ complete {
243       HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
244     }
245   }
246
247   private[this]
248   def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = {
249     val actorProvider = aquarium.actorProvider
250     val router = actorProvider.actorForRole(RouterRole)
251     val futureResponse = router ask message
252
253     futureResponse onComplete {
254       future ⇒
255         future.value match {
256           case None ⇒
257             // TODO: Will this ever happen??
258             logger.warn("Future did not complete for %s".format(message))
259             val statusCode = 500
260             responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
261
262           case Some(Left(error)) ⇒
263             val statusCode = 500
264             logger.error("Error %s serving %s: %s".format(statusCode, message, error))
265             responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
266
267           case Some(Right(actualResponse)) ⇒
268             actualResponse match {
269               case routerResponse: RouterResponseMessage[_] ⇒
270                 routerResponse.response match {
271                   case Left(errorMessage) ⇒
272                     val statusCode = routerResponse.suggestedHTTPStatus
273
274                     logger.error("Error %s '%s' serving %s. Internal response: %s".format(
275                       statusCode,
276                       errorMessage,
277                       message,
278                       actualResponse))
279
280                     responder.complete(stringResponse(statusCode, errorMessage, TEXT_PLAIN))
281
282                   case Right(response) ⇒
283                     responder.complete(
284                       HttpResponse(
285                         routerResponse.suggestedHTTPStatus,
286                         body = routerResponse.responseToJsonString.getBytes("UTF-8"),
287                         headers = HttpHeader("Content-type", APPLICATION_JSON+";charset=utf-8") ::
288                           Nil))
289                 }
290
291               case _ ⇒
292                 val statusCode = 500
293                 logger.error("Error %s serving %s: Response is: %s".format(statusCode, message, actualResponse))
294                 responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
295             }
296         }
297     }
298   }
299
300   ////////////// helpers //////////////
301
302   final val defaultHeaders = List(HttpHeader("Content-Type", TEXT_PLAIN))
303
304   lazy val serverActor = Actor.registry.actorsFor("spray-can-server").head
305
306   def role = RESTRole
307 }