2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import cc.spray.can.HttpMethods.GET
42 import gr.grnet.aquarium.util.Loggable
43 import gr.grnet.aquarium.util.shortInfoOf
44 import akka.actor.Actor
45 import gr.grnet.aquarium.actor.{RESTRole, RoleableActor, RouterRole}
47 import gr.grnet.aquarium.util.date.TimeHelpers
48 import org.joda.time.format.ISODateTimeFormat
49 import gr.grnet.aquarium.actor.message.{RouterResponseMessage, GetUserStateRequest, RouterRequestMessage, GetUserBalanceRequest}
50 import gr.grnet.aquarium.{ResourceLocator, Aquarium}
51 import com.ckkloverdos.resource.StreamResource
52 import com.ckkloverdos.maybe.Failed
53 import java.net.InetAddress
54 import gr.grnet.aquarium.event.model.ExternalEventModel
57 * Spray-based REST service. This is the outer-world's interface to Aquarium functionality.
59 * @author Christos KK Loverdos <loverdos@gmail.com>.
61 class RESTActor private(_id: String) extends RoleableActor with Loggable {
62 def this() = this("spray-root-service")
66 final val TEXT_PLAIN = "text/plain"
67 final val APPLICATION_JSON = "application/json"
69 private def stringResponse(status: Int, stringBody: String, contentType: String): HttpResponse = {
72 HttpHeader("Content-type", "%s;charset=utf-8".format(contentType)) :: Nil,
73 stringBody.getBytes("UTF-8")
77 private def resourceInfoResponse(
79 responder: RequestResponder,
80 resource: StreamResource,
84 val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
85 val res = resource.mapString(body ⇒ responder.complete(stringResponse(200, fmt(body), contentType)))
89 logger.error("While serving %s".format(uri), e)
90 responder.complete(stringResponse(501, "Internal Server Error: %s".format(shortInfoOf(e)), TEXT_PLAIN))
97 private def eventInfoResponse[E <: ExternalEventModel](
99 responder: RequestResponder,
100 getter: String ⇒ Option[E],
104 val toSend = getter.apply(eventID) match {
106 (200, event.toJsonString, APPLICATION_JSON)
109 (404, "Event not found", TEXT_PLAIN)
112 responder.complete(stringResponse(toSend._1, toSend._2, toSend._3))
117 responder: RequestResponder,
118 headers: List[HttpHeader],
119 remoteAddress: InetAddress
120 )( f: RequestResponder ⇒ Unit): Unit = {
122 aquarium.adminCookie match {
123 case Some(adminCookie) ⇒
124 headers.find(_.name.toLowerCase == Aquarium.HTTP.RESTAdminHeaderNameLowerCase) match {
125 case Some(cookieHeader) if(cookieHeader.value == adminCookie) ⇒
129 logger.error("While serving %s".format(uri), e)
130 responder.complete(stringResponse(501, "Internal Server Error: %s".format(shortInfoOf(e)), TEXT_PLAIN))
133 case Some(cookieHeader) ⇒
134 logger.warn("Admin request %s with bad cookie '%s' from %s".format(uri, cookieHeader.value, remoteAddress))
135 responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
138 logger.warn("Admin request %s with no cookie from %s".format(uri, remoteAddress))
139 responder.complete(stringResponse(401, "Unauthorized!", TEXT_PLAIN))
143 responder.complete(stringResponse(403, "Forbidden!", TEXT_PLAIN))
147 private def stringResponse200(stringBody: String, contentType: String): HttpResponse = {
148 stringResponse(200, stringBody, contentType)
151 protected def receive = {
152 case RequestContext(HttpRequest(GET, "/ping", _, _, _), _, responder) ⇒
153 val now = TimeHelpers.nowMillis()
154 val nowFormatted = ISODateTimeFormat.dateTime().print(now)
155 responder.complete(stringResponse200("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN))
157 case RequestContext(HttpRequest(GET, "/stats", _, _, _), _, responder) ⇒ {
158 (serverActor ? GetStats).mapTo[Stats].onComplete {
160 future.value.get match {
161 case Right(stats) => responder.complete {
163 "Uptime : " + (stats.uptime / 1000.0) + " sec\n" +
164 "Requests dispatched : " + stats.requestsDispatched + '\n' +
165 "Requests timed out : " + stats.requestsTimedOut + '\n' +
166 "Requests open : " + stats.requestsOpen + '\n' +
167 "Open connections : " + stats.connectionsOpen + '\n',
171 case Left(ex) => responder.complete(stringResponse(500, "Couldn't get server stats due to " + ex, TEXT_PLAIN))
176 case RequestContext(HttpRequest(GET, uri, headers, body, protocol), remoteAddress, responder) ⇒
177 def withAdminCookieHelper(f: RequestResponder ⇒ Unit): Unit = {
178 withAdminCookie(uri, responder, headers, remoteAddress)(f)
181 //+ Main business logic REST URIs are matched here
182 val millis = TimeHelpers.nowMillis()
184 case UserBalancePath(userID) ⇒
185 // /user/(.+)/balance/?
186 callRouter(GetUserBalanceRequest(userID, millis), responder)
188 case UserStatePath(userId) ⇒
189 // /user/(.+)/state/?
190 callRouter(GetUserStateRequest(userId, millis), responder)
192 // case AdminPingAllPath() ⇒
193 // withAdminCookieHelper { responder ⇒
194 // callRouter(PingAllRequest(), responder)
197 case ResourcesPath() ⇒
198 withAdminCookieHelper { responder ⇒
200 stringResponse200("%s\n%s\n%s\n" .format(
201 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
202 ResourceLocator.ResourceNames.LOGBACK_XML,
203 ResourceLocator.ResourceNames.POLICY_YAML),
209 case ResourcesAquariumPropertiesPath() ⇒
210 withAdminCookieHelper { responder ⇒
211 resourceInfoResponse(uri, responder, ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
214 case ResourcesLogbackXMLPath() ⇒
215 withAdminCookieHelper { responder ⇒
216 resourceInfoResponse(uri, responder, ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
219 case ResourcesPolicyYAMLPath() ⇒
220 withAdminCookieHelper { responder ⇒
221 resourceInfoResponse(uri, responder, ResourceLocator.Resources.PolicyYAMLResource, TEXT_PLAIN)
224 case ResourceEventPath(id) ⇒
225 withAdminCookieHelper { responder ⇒
226 eventInfoResponse(uri, responder, aquarium.resourceEventStore.findResourceEventByID, id)
229 case IMEventPath(id) ⇒
230 withAdminCookieHelper { responder ⇒
231 eventInfoResponse(uri, responder, aquarium.imEventStore.findIMEventById, id)
235 responder.complete(stringResponse(404, "Unknown resource!", TEXT_PLAIN))
237 //- Main business logic REST URIs are matched here
239 case RequestContext(HttpRequest(_, _, _, _, _), _, responder) ⇒
240 responder.complete(stringResponse(404, "Unknown resource!", TEXT_PLAIN))
242 case Timeout(method, uri, _, _, _, complete) ⇒ complete {
243 HttpResponse(status = 500).withBody("The " + method + " request to '" + uri + "' has timed out...")
248 def callRouter(message: RouterRequestMessage, responder: RequestResponder): Unit = {
249 val actorProvider = aquarium.actorProvider
250 val router = actorProvider.actorForRole(RouterRole)
251 val futureResponse = router ask message
253 futureResponse onComplete {
257 // TODO: Will this ever happen??
258 logger.warn("Future did not complete for %s".format(message))
260 responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
262 case Some(Left(error)) ⇒
264 logger.error("Error %s serving %s: %s".format(statusCode, message, error))
265 responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
267 case Some(Right(actualResponse)) ⇒
268 actualResponse match {
269 case routerResponse: RouterResponseMessage[_] ⇒
270 routerResponse.response match {
271 case Left(errorMessage) ⇒
272 val statusCode = routerResponse.suggestedHTTPStatus
274 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
280 responder.complete(stringResponse(statusCode, errorMessage, TEXT_PLAIN))
282 case Right(response) ⇒
285 routerResponse.suggestedHTTPStatus,
286 body = routerResponse.responseToJsonString.getBytes("UTF-8"),
287 headers = HttpHeader("Content-type", APPLICATION_JSON+";charset=utf-8") ::
293 logger.error("Error %s serving %s: Response is: %s".format(statusCode, message, actualResponse))
294 responder.complete(stringResponse(statusCode, "Internal Server Error", TEXT_PLAIN))
300 ////////////// helpers //////////////
302 final val defaultHeaders = List(HttpHeader("Content-Type", TEXT_PLAIN))
304 lazy val serverActor = Actor.registry.actorsFor("spray-can-server").head