2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import gr.grnet.aquarium.util._
39 import gr.grnet.aquarium.{ResourceLocator, Aquarium, Configurable, AquariumAwareSkeleton}
40 import com.ckkloverdos.props.Props
41 import com.twitter.finagle.{Service, SimpleFilter}
42 import org.jboss.netty.handler.codec.http.{HttpResponseStatus ⇒ THttpResponseStatus, DefaultHttpResponse ⇒ TDefaultHttpResponse, HttpResponse ⇒ THttpResponse, HttpRequest ⇒ THttpRequest}
43 import com.twitter.util.{Future ⇒ TFuture, Promise ⇒ TPromise, Return ⇒ TReturn, Throw ⇒ TThrow, Duration}
44 import com.twitter.finagle.builder.ServerBuilder
45 import com.twitter.finagle.http.Http
46 import org.jboss.netty.handler.codec.http.HttpResponseStatus._
47 import org.jboss.netty.handler.codec.http.HttpVersion._
48 import org.jboss.netty.buffer.ChannelBuffers._
49 import org.jboss.netty.util.CharsetUtil._
50 import java.net.InetSocketAddress
51 import java.util.concurrent.{TimeUnit}
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import org.joda.time.format.ISODateTimeFormat
54 import gr.grnet.aquarium.actor.message._
55 import com.ckkloverdos.resource.StreamResource
56 import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
57 import akka.dispatch.{Future ⇒ AFuture}
58 import gr.grnet.aquarium.util.json.JsonHelpers
60 import com.ckkloverdos.maybe.Failed
61 import gr.grnet.aquarium.actor.message.GetUserStateRequest
62 import com.ckkloverdos.maybe.Just
63 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
64 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
65 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
66 import org.apache.avro.specific.SpecificRecord
67 import gr.grnet.aquarium.message.avro.AvroHelpers
71 * @author Christos KK Loverdos <loverdos@gmail.com>
74 class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Configurable with Loggable {
75 final val TEXT_PLAIN = "text/plain"
76 final val APPLICATION_JSON = "application/json"
78 @volatile private[this] var _port: Int = _
79 @volatile private[this] var _shutdownTimeoutMillis: Long = _
80 @volatile private[this] var _userActorFutureTimeoutMillis: Long = _
82 def propertyPrefix = Some(RESTService.Prefix)
85 * Configure this instance with the provided properties.
87 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
89 def configure(props: Props) {
90 this._port = props.getIntEx(Aquarium.EnvKeys.restPort.name)
91 this._shutdownTimeoutMillis = props.getLongEx(Aquarium.EnvKeys.restShutdownTimeoutMillis.name)
92 this._userActorFutureTimeoutMillis = 5000L
94 logger.debug("HTTP port is %s".format(this._port))
97 def stringResponse(status: THttpResponseStatus, body: String, contentType: String) = {
98 val response = new TDefaultHttpResponse(HTTP_1_1, status)
99 response.setContent(copiedBuffer(body, UTF_8))
100 response.setHeader("Content-type", "%s;charset=utf-8".format(contentType))
102 TFuture.value(response)
105 def stringResponseOK(body: String, contentType: String): TFuture[THttpResponse] = {
106 stringResponse(OK, body, contentType)
109 def statusResponse(status: THttpResponseStatus): TFuture[THttpResponse] = {
110 stringResponse(status, status.getReasonPhrase, TEXT_PLAIN)
113 def resourceInfoResponse(resource: StreamResource, contentType: String): TFuture[THttpResponse] = {
114 val fmt = (body: String) ⇒ "%s\n\n%s".format(resource.url, body)
116 resource.stringContent.toMaybeEither match {
118 stringResponseOK(fmt(body), contentType)
125 def eventInfoResponse[R <: SpecificRecord](
127 getter: String ⇒ Option[R]
128 ): TFuture[THttpResponse] = {
129 getter(eventID) match {
131 stringResponseOK(AvroHelpers.jsonStringOfSpecificRecord(event), APPLICATION_JSON)
134 statusResponse(NOT_FOUND)
138 final case class ExceptionHandler() extends SimpleFilter[THttpRequest, THttpResponse] {
139 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
140 service(request) handle {
142 logger.error("While serving %s".format(request), error)
143 val status = INTERNAL_SERVER_ERROR
144 val errorResponse = new TDefaultHttpResponse(HTTP_1_1, status)
145 errorResponse.setContent(copiedBuffer(status.getReasonPhrase, UTF_8))
152 final case class AdminChecker() extends SimpleFilter[THttpRequest, THttpResponse] {
153 def apply(request: THttpRequest, service: Service[THttpRequest, THttpResponse]): TFuture[THttpResponse] = {
154 if(request.getUri.startsWith(RESTPaths.AdminPrefix)) {
155 val headerValue = request.getHeader(Aquarium.HTTP.RESTAdminHeaderName)
156 aquarium.adminCookie match {
157 case Some(`headerValue`) ⇒
161 statusResponse(FORBIDDEN)
164 statusResponse(FORBIDDEN)
172 final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
173 def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
174 // We want to asynchronously route the message via akka and get the whole computation as a
176 val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
177 val promise = new TPromise[UserActorResponseMessage[_]]()
179 val actualWork = akka.pattern.ask(actorRef, request)(
180 ATimeout(ADuration(_userActorFutureTimeoutMillis, TimeUnit.MILLISECONDS))).
181 asInstanceOf[AFuture[UserActorResponseMessage[_]]]
183 actualWork.onComplete {
184 case Left(throwable) ⇒
185 promise.setException(throwable)
188 promise.setValue(value)
195 final case class MainService() extends Service[THttpRequest, THttpResponse] {
197 final val actorRouterService = UserActorService()
199 def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
200 actorRouterService(requestMessage).transform { tryResponse ⇒
202 case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
203 logger.debug("{}", responseMessage)
204 logger.debug("{}", responseMessage.responseToJsonString)
205 val statusCode = responseMessage.suggestedHTTPStatus
206 val status = THttpResponseStatus.valueOf(statusCode)
208 responseMessage.response match {
209 case Left(errorMessage) ⇒
210 logger.error("Error %s '%s' serving %s. Internal response: %s".format(
216 stringResponse(status, errorMessage, TEXT_PLAIN)
219 stringResponse(status, responseMessage.responseToJsonString, APPLICATION_JSON)
222 case TThrow(throwable) ⇒
228 // FIXME make efficient; this partial function thing is crap for serving requests
229 def apply(request: THttpRequest): TFuture[THttpResponse] = {
230 val millis = TimeHelpers.nowMillis()
231 val uri = request.getUri
232 val method = request.getMethod
233 logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
235 type URIPF = PartialFunction[String, TFuture[THttpResponse]]
236 def pong(ok:Boolean) = {
237 val now = TimeHelpers.nowMillis()
238 val nowFormatted = ISODateTimeFormat.dateTime().print(now)
239 val reply = if(ok) "PONG" else "DOWN"
240 stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
242 val PingHandler: URIPF = {
243 case RESTPaths.AquariumPingPath() ⇒
245 case RESTPaths.RabbitMQPingPath() ⇒
246 pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
247 case RESTPaths.IMStorePingPath() ⇒
248 pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
249 case RESTPaths.RCStorePingPath() ⇒
250 pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
253 val UserActorCacheHandler: URIPF = {
254 case RESTPaths.UserActorCacheStatsPath() ⇒
255 val cacheSize = aquarium.akkaService.cacheSize
256 val stats = aquarium.akkaService.cacheStats
258 val map = new java.util.LinkedHashMap[String, Any]
259 map.put("cacheSize", cacheSize)
260 map.put("requestCount", stats.requestCount())
261 map.put("hitCount", stats.hitCount())
262 map.put("hitRate", stats.hitRate())
263 map.put("missCount", stats.missCount())
264 map.put("missRate", stats.missRate())
265 map.put("evictionCount", stats.evictionCount())
266 map.put("loadCount", stats.loadCount())
267 map.put("loadSuccessCount", stats.loadSuccessCount())
268 map.put("loadExceptionCount", stats.loadExceptionCount())
269 map.put("loadExceptionRate", stats.loadExceptionRate())
270 map.put("totalLoadTime", stats.totalLoadTime())
271 map.put("averageLoadPenalty", stats.averageLoadPenalty())
273 val json = JsonHelpers.jsonStringOfJavaMap(map)
274 stringResponseOK(json, APPLICATION_JSON)
276 case RESTPaths.UserActorCacheContentsPath() ⇒
277 val buffer = new scala.collection.mutable.ArrayBuffer[String]()
278 aquarium.akkaService.foreachCachedUserID(buffer.append(_))
279 val output = buffer.sorted.mkString("\n")
280 stringResponseOK(output, TEXT_PLAIN)
282 case RESTPaths.UserActorCacheCountPath() ⇒
283 stringResponseOK(aquarium.akkaService.cacheSize.toString, TEXT_PLAIN)
286 val ConfHandler: URIPF = {
287 case RESTPaths.ResourcesPath() ⇒
288 stringResponseOK("%s\n%s\n%s\n" .format(
289 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
290 ResourceLocator.ResourceNames.LOGBACK_XML,
291 ResourceLocator.ResourceNames.POLICY_JSON),
294 case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
295 resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
297 case RESTPaths.ResourcesLogbackXMLPath() ⇒
298 resourceInfoResponse(ResourceLocator.Resources.LogbackXMLResource, TEXT_PLAIN)
300 case RESTPaths.ResourcesPolicyJSONPath() ⇒
301 resourceInfoResponse(ResourceLocator.Resources.PolicyJSONResource, TEXT_PLAIN)
304 val EventsHandler: URIPF = {
305 case RESTPaths.ResourceEventPath(id) ⇒
306 eventInfoResponse(id, aquarium.resourceEventStore.findResourceEventByID)
308 case RESTPaths.IMEventPath(id) ⇒
309 eventInfoResponse(id, aquarium.imEventStore.findIMEventByID)
312 val UserHandler: URIPF = {
313 case RESTPaths.UserBalancePath(userID) ⇒
314 // /user/(.+)/balance/?
315 callUserActor(GetUserBalanceRequest(userID, millis))
317 case RESTPaths.UserStatePath(userID) ⇒
318 // /user/(.+)/state/?
319 callUserActor(GetUserStateRequest(userID, millis))
321 case RESTPaths.UserWalletPath(userID) ⇒
322 // /user/(.+)/wallet/?
323 callUserActor(GetUserWalletRequest(userID, millis))
325 case RESTPaths.UserBillPath(userID,st1,st2) ⇒
328 val t = Timeslot(t1,if(t2==0)Long.MaxValue else t2)
329 callUserActor(GetUserBillRequest(userID,t,millis))
332 val DefaultHandler: URIPF = {
334 statusResponse(NOT_FOUND)
337 val AllHandlers = List(
339 UserActorCacheHandler,
346 val combinedHandler = AllHandlers.reduceLeft(_ orElse _)
352 val service = ExceptionHandler() andThen AdminChecker() andThen MainService()
353 lazy val server = ServerBuilder().
355 bindTo(new InetSocketAddress(this._port)).
359 def start(): Unit = {
360 logger.info("Starting HTTP on port %s".format(this._port))
361 // Just for the side effect
362 assert(server ne null)
366 logger.info("Stopping HTTP on port %s, waiting for at most %s ms".format(this._port, this._shutdownTimeoutMillis))
367 server.close(Duration(this._shutdownTimeoutMillis, TimeUnit.MILLISECONDS))