48b5a2d595b863e2d7846ae52bc5ed609850b289
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.computation.state.UserState.{JsonNames ⇒ UserStateJsonNames}
40 import gr.grnet.aquarium.util.json.JsonSupport
41 import collection.mutable.ListBuffer
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames}
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
46 import gr.grnet.aquarium.store._
47 import com.mongodb._
48 import org.bson.types.ObjectId
49 import gr.grnet.aquarium.util._
50 import gr.grnet.aquarium.converter.Conversions
51 import gr.grnet.aquarium.computation.state.UserState
52 import gr.grnet.aquarium.event.model.ExternalEventModel
53 import gr.grnet.aquarium.computation.BillingMonthInfo
54 import gr.grnet.aquarium.policy.PolicyModel
55 import gr.grnet.aquarium.{Aquarium, AquariumException}
56 import collection.immutable.SortedMap
57 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
58
59 /**
60  * Mongodb implementation of the various aquarium stores.
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  * @author Georgios Gousios <gousiosg@gmail.com>
64  * @author Prodromos Gerakios <pgerakio@grnet.gr>
65  */
66 class MongoDBStore(
67     val aquarium: Aquarium,
68     val mongo: Mongo,
69     val database: String,
70     val username: String,
71     val password: String)
72   extends ResourceEventStore
73   with UserStateStore
74   with IMEventStore
75   with PolicyStore
76   with Loggable {
77
78   override type IMEvent = MongoDBIMEvent
79   override type ResourceEvent = MongoDBResourceEvent
80   override type Policy = MongoDBPolicy
81
82   private[store] lazy val resourceEvents = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
83   private[store] lazy val userStates = getCollection(MongoDBStore.USER_STATES_COLLECTION)
84   private[store] lazy val imEvents = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
85   private[store] lazy val policies = getCollection(MongoDBStore.POLICY_COLLECTION)
86
87   private[this] def getCollection(name: String): DBCollection = {
88     val db = mongo.getDB(database)
89     //logger.debug("Authenticating to mongo")
90     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
91       throw new AquariumException("Could not authenticate user %s".format(username))
92     }
93     db.getCollection(name)
94   }
95
96   //+ResourceEventStore
97   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
98     MongoDBResourceEvent.fromOther(event, null)
99   }
100
101   def pingResourceEventStore(): Unit = synchronized {
102     MongoDBStore.ping(mongo)
103   }
104
105   def insertResourceEvent(event: ResourceEventModel) = {
106     val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
107     MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
108     localEvent
109   }
110
111   def findResourceEventByID(id: String): Option[ResourceEvent] = {
112     MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
113   }
114
115   def findResourceEventsByUserID(userId: String)
116                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
117     val query = new BasicDBObject(ResourceEventNames.userID, userId)
118
119     MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
120   }
121
122   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
123     val query = new BasicDBObjectBuilder().
124       add(ResourceEventModel.Names.userID, userID).
125       // received within the period
126       add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)).
127       add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)).
128       // occurred outside the period
129       add("$or", {
130         val dbList = new BasicDBList()
131         dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis)))
132         dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis)))
133         dbList
134       }).
135       get()
136
137     resourceEvents.count(query)
138   }
139
140   def foreachResourceEventOccurredInPeriod(
141       userID: String,
142       startMillis: Long,
143       stopMillis: Long
144   )(f: ResourceEvent ⇒ Unit): Unit = {
145
146     val query = new BasicDBObjectBuilder().
147       add(ResourceEventModel.Names.userID, userID).
148       add(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gte", startMillis)).
149       add(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lte", stopMillis)).
150       get()
151
152     val sorter = new BasicDBObject(ResourceEventModel.Names.occurredMillis, 1)
153     val cursor = resourceEvents.find(query).sort(sorter)
154
155     withCloseable(cursor) { cursor ⇒
156       while(cursor.hasNext) {
157         val nextDBObject = cursor.next()
158         val nextEvent = MongoDBResourceEvent.fromDBObject(nextDBObject)
159
160         f(nextEvent)
161       }
162     }
163   }
164   //-ResourceEventStore
165
166   //+ UserStateStore
167   def insertUserState(userState: UserState) = {
168     MongoDBStore.insertObject(
169       userState.copy(_id = new ObjectId().toString),
170       userStates,
171       MongoDBStore.jsonSupportToDBObject
172     )
173   }
174
175   def findUserStateByUserID(userID: String): Option[UserState] = {
176     val query = new BasicDBObject(UserStateJsonNames.userID, userID)
177     val cursor = userStates find query
178
179     MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
180   }
181
182   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
183     val query = new BasicDBObjectBuilder().
184       add(UserState.JsonNames.userID, userID).
185       add(UserState.JsonNames.isFullBillingMonthState, true).
186       add(UserState.JsonNames.theFullBillingMonth_year, bmi.year).
187       add(UserState.JsonNames.theFullBillingMonth_month, bmi.month).
188       get()
189
190     // Descending order, so that the latest comes first
191     val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1)
192
193     val cursor = userStates.find(query).sort(sorter)
194
195     MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
196   }
197   //- UserStateStore
198
199   //+IMEventStore
200   def createIMEventFromJson(json: String) = {
201     MongoDBStore.createIMEventFromJson(json)
202   }
203
204   def createIMEventFromOther(event: IMEventModel) = {
205     MongoDBStore.createIMEventFromOther(event)
206   }
207
208   def pingIMEventStore(): Unit = {
209     MongoDBStore.ping(mongo)
210   }
211
212   def insertIMEvent(event: IMEventModel): IMEvent = {
213     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
214     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
215     localEvent
216   }
217
218   def findIMEventByID(id: String): Option[IMEvent] = {
219     MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
220   }
221
222
223   /**
224    * Find the `CREATE` even for the given user. Note that there must be only one such event.
225    */
226   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
227     val query = new BasicDBObjectBuilder().
228       add(IMEventNames.userID, userID).
229       add(IMEventNames.eventType, IMEventModel.EventTypeNames.create).get()
230
231     // Normally one such event is allowed ...
232     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
233
234     MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
235   }
236
237   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
238     val query = new BasicDBObject(IMEventNames.userID, userID)
239     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
240
241     MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
242   }
243
244   /**
245    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
246    * the given function `f`.
247    *
248    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
249    */
250   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
251     val query = new BasicDBObject(IMEventNames.userID, userID)
252     val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
253
254     withCloseable(cursor) { cursor ⇒
255       while(cursor.hasNext) {
256         val model = MongoDBIMEvent.fromDBObject(cursor.next())
257         f(model)
258       }
259     }
260   }
261   //-IMEventStore
262
263
264
265   //+PolicyStore
266   def loadPoliciesAfter(after: Long): List[Policy] = {
267     // FIXME implement
268     throw new UnsupportedOperationException
269   }
270
271
272   def findPolicyByID(id: String) = {
273     // FIXME implement
274     throw new UnsupportedOperationException
275   }
276
277   /**
278    * Store an accounting policy.
279    */
280   def insertPolicy(policy: PolicyModel): Policy = {
281     val dbPolicy = MongoDBPolicy.fromOther(policy, new ObjectId().toStringMongod)
282     MongoDBStore.insertObject(dbPolicy, policies, MongoDBStore.jsonSupportToDBObject)
283   }
284
285   def loadValidPolicyAt(atMillis: Long): Option[Policy] = {
286     throw new UnsupportedOperationException
287   }
288
289   def loadAndSortPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, Policy] = {
290     throw new UnsupportedOperationException
291   }
292   //-PolicyStore
293 }
294
295 object MongoDBStore {
296   object JsonNames {
297     final val _id = "_id"
298   }
299
300   /**
301    * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
302    *
303    * Resource events are coming from all systems handling billable resources.
304    */
305   final val RESOURCE_EVENTS_COLLECTION = "resevents"
306
307   /**
308    * Collection holding the snapshots of [[gr.grnet.aquarium.computation.state.UserState]].
309    *
310    * [[gr.grnet.aquarium.computation.state.UserState]] is held internally within
311    * [[gr.grnet.aquarium.actor.service.user .UserActor]]s.
312    */
313   final val USER_STATES_COLLECTION = "userstates"
314
315   /**
316    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
317    *
318    * User events are coming from the IM module (external).
319    */
320   final val IM_EVENTS_COLLECTION = "imevents"
321
322   /**
323    * Collection holding [[gr.grnet.aquarium.policy.PolicyModel]]s.
324    */
325   final val POLICY_COLLECTION = "policies"
326
327   def dbObjectToUserState(dbObj: DBObject): UserState = {
328     UserState.fromJson(JSON.serialize(dbObj))
329   }
330
331   def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
332     withCloseable(cursor) { cursor ⇒
333       if(cursor.hasNext) {
334         Some(f(cursor.next()))
335       } else {
336         None
337       }
338     }
339   }
340
341   def ping(mongo: Mongo): Unit = synchronized {
342     // This requires a network roundtrip
343     mongo.isLocked
344   }
345
346   def findBy[A >: Null <: AnyRef](name: String,
347                                   value: String,
348                                   collection: DBCollection,
349                                   deserializer: (DBObject) => A) : Option[A] = {
350     val query = new BasicDBObject(name, value)
351     val cursor = collection find query
352
353     withCloseable(cursor) { cursor ⇒
354       if(cursor.hasNext)
355         Some(deserializer apply cursor.next)
356       else
357         None
358     }
359   }
360
361   def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
362                                   (deserializer: (DBObject) => A)
363                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
364     val cursor0 = collection find query
365     val cursor = if(orderBy ne null) {
366       cursor0 sort orderBy
367     } else {
368       cursor0
369     } // I really know that docs say that it is the same cursor.
370
371     if(!cursor.hasNext) {
372       cursor.close()
373       Nil
374     } else {
375       val buff = new ListBuffer[A]()
376
377       while(cursor.hasNext) {
378         buff += deserializer apply cursor.next
379       }
380
381       cursor.close()
382
383       sortWith match {
384         case Some(sorter) => buff.toList.sortWith(sorter)
385         case None => buff.toList
386       }
387     }
388   }
389
390   def storeUserState(userState: UserState, collection: DBCollection) = {
391     storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
392   }
393
394   def storeAny[A](any: A,
395                   collection: DBCollection,
396                   idName: String,
397                   idValueProvider: (A) => String,
398                   serializer: (A) => DBObject) : RecordID = {
399
400     val dbObject = serializer apply any
401     val _id = new ObjectId()
402     dbObject.put("_id", _id)
403     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
404     writeResult.getLastError().throwOnError()
405
406     RecordID(dbObject.get("_id").toString)
407   }
408
409   def insertObject[A <: AnyRef](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) : A = {
410     collection.insert(serializer apply obj, WriteConcern.JOURNAL_SAFE)
411     obj
412   }
413
414   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
415     Conversions.jsonSupportToDBObject(jsonSupport)
416   }
417
418   final def isLocalIMEvent(event: IMEventModel) = event match {
419     case _: MongoDBIMEvent ⇒ true
420     case _ ⇒ false
421   }
422
423   final def createIMEventFromJson(json: String) = {
424     MongoDBIMEvent.fromJsonString(json)
425   }
426
427   final def createIMEventFromOther(event: IMEventModel) = {
428     MongoDBIMEvent.fromOther(event, null)
429   }
430
431   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
432     MongoDBIMEvent.fromJsonBytes(jsonBytes)
433   }
434 }