User actor code reorganization
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import gr.grnet.aquarium.util.Loggable
39 import com.mongodb.util.JSON
40 import gr.grnet.aquarium.user.UserState
41 import gr.grnet.aquarium.user.UserState.{JsonNames => UserStateJsonNames}
42 import gr.grnet.aquarium.util.displayableObjectInfo
43 import gr.grnet.aquarium.util.json.JsonSupport
44 import collection.mutable.ListBuffer
45 import gr.grnet.aquarium.store._
46 import gr.grnet.aquarium.logic.events.ResourceEvent.{JsonNames => ResourceJsonNames}
47 import gr.grnet.aquarium.logic.events.UserEvent.{JsonNames => UserEventJsonNames}
48 import gr.grnet.aquarium.logic.events.WalletEntry.{JsonNames => WalletJsonNames}
49 import gr.grnet.aquarium.logic.events.PolicyEntry.{JsonNames => PolicyJsonNames}
50 import java.util.Date
51 import gr.grnet.aquarium.logic.accounting.Policy
52 import gr.grnet.aquarium.logic.events._
53 import com.mongodb._
54 import com.ckkloverdos.maybe.{NoVal, Maybe}
55 import gr.grnet.aquarium.logic.accounting.dsl.{DSLResource, Timeslot, DSLPolicy, DSLComplexResource}
56 import org.bson.types.ObjectId
57 import gr.grnet.aquarium.actor.service.user.UserActor
58
59 /**
60  * Mongodb implementation of the various aquarium stores.
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  * @author Georgios Gousios <gousiosg@gmail.com>
64  */
65 class MongoDBStore(
66     val mongo: Mongo,
67     val database: String,
68     val username: String,
69     val password: String)
70   extends ResourceEventStore
71   with UserStateStore
72   with WalletEntryStore
73   with UserEventStore
74   with PolicyStore
75   with Loggable {
76
77   private[store] lazy val resourceEvents     = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
78   private[store] lazy val userStates         = getCollection(MongoDBStore.USER_STATES_COLLECTION)
79   private[store] lazy val userEvents         = getCollection(MongoDBStore.USER_EVENTS_COLLECTION)
80   private[store] lazy val unparsedUserEvents = getCollection(MongoDBStore.UNPARSED_USER_EVENTS_COLLECTION)
81   private[store] lazy val walletEntries      = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
82 //  private[store] lazy val policies       = getCollection(MongoDBStore.POLICIES_COLLECTION)
83   private[store] lazy val policyEntries      = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
84
85   private[this] def getCollection(name: String): DBCollection = {
86     val db = mongo.getDB(database)
87     //logger.debug("Authenticating to mongo")
88     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
89       throw new StoreException("Could not authenticate user %s".format(username))
90     }
91     db.getCollection(name)
92   }
93
94   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
95     if (one.occurredMillis > two.occurredMillis) false
96     else if (one.occurredMillis < two.occurredMillis) true
97     else true
98   }
99
100   private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
101     if (one.occurredMillis < two.occurredMillis) false
102     else if (one.occurredMillis > two.occurredMillis) true
103     else true
104   }
105
106   //+ResourceEventStore
107   def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
108     MongoDBStore.storeAquariumEvent(event, resourceEvents)
109
110   def findResourceEventById(id: String): Maybe[ResourceEvent] =
111     MongoDBStore.findById(id, resourceEvents, MongoDBStore.dbObjectToResourceEvent)
112
113   def findResourceEventsByUserId(userId: String)
114                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
115     val query = new BasicDBObject(ResourceJsonNames.userId, userId)
116
117     MongoDBStore.runQuery(query, resourceEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
118   }
119
120   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
121     val query = new BasicDBObject()
122     query.put(ResourceJsonNames.userId, userId)
123     query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", timestamp))
124     
125     val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
126
127     val cursor = resourceEvents.find(query).sort(sort)
128
129     try {
130       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
131       while(cursor.hasNext) {
132         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
133       }
134       buffer.toList.sortWith(_sortByTimestampAsc)
135     } finally {
136       cursor.close()
137     }
138   }
139
140   def findResourceEventHistory(userId: String, resName: String,
141                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
142     val query = new BasicDBObject()
143     query.put(ResourceJsonNames.userId, userId)
144     query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo))
145     query.put(ResourceJsonNames.resource, resName)
146
147     instid match {
148       case Some(id) =>
149         Policy.policy.findResource(resName) match {
150           case Some(y) => query.put(ResourceJsonNames.details,
151             new BasicDBObject(y.descriminatorField, instid.get))
152           case None =>
153         }
154       case None =>
155     }
156
157     val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
158     val cursor = resourceEvents.find(query).sort(sort)
159
160     try {
161       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
162       while(cursor.hasNext) {
163         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
164       }
165       buffer.toList.sortWith(_sortByTimestampAsc)
166     } finally {
167       cursor.close()
168     }
169   }
170
171   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
172     val query = new BasicDBObject()
173     query.put(ResourceJsonNames.userId, userId)
174     query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
175     query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
176
177     // Sort them by increasing order for occurred time
178     val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
179
180     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBStore.dbObjectToResourceEvent)(None)
181   }
182   
183   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = {
184     Maybe {
185       // FIXME: Implement
186       0L
187     }
188   }
189
190   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
191                                                     startMillis: Long,
192                                                     stopMillis: Long): List[ResourceEvent] = {
193     // FIXME: Implement
194     Nil
195   }
196   //-ResourceEventStore
197
198   //+ UserStateStore
199   def storeUserState(userState: UserState): Maybe[RecordID] = {
200     MongoDBStore.storeUserState(userState, userStates)
201   }
202
203   def findUserStateByUserId(userId: String): Maybe[UserState] = {
204     Maybe {
205       val query = new BasicDBObject(UserStateJsonNames.userId, userId)
206       val cursor = userStates find query
207
208       try {
209         if(cursor.hasNext)
210           MongoDBStore.dbObjectToUserState(cursor.next())
211         else
212           null
213       } finally {
214         cursor.close()
215       }
216     }
217   }
218
219   def findLatestUserStateForEndOfBillingMonth(userId: String,
220                                               yearOfBillingMonth: Int,
221                                               billingMonth: Int): Maybe[UserState] = {
222     NoVal // FIXME: implement
223   }
224
225   def deleteUserState(userId: String) = {
226     val query = new BasicDBObject(UserStateJsonNames.userId, userId)
227     userStates.findAndRemove(query)
228   }
229   //- UserStateStore
230
231   //+WalletEntryStore
232   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
233     MongoDBStore.storeAquariumEvent(entry, walletEntries)
234
235   def findWalletEntryById(id: String): Maybe[WalletEntry] =
236     MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry)
237
238   def findUserWalletEntries(userId: String) = {
239     // TODO: optimize
240     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
241   }
242
243   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
244     val q = new BasicDBObject()
245     // TODO: Is this the correct way for an AND query?
246     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
247     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime))
248     q.put(WalletJsonNames.userId, userId)
249
250     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
251   }
252
253   def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
254     val q = new BasicDBObject()
255     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
256     q.put(WalletJsonNames.userId, userId)
257
258     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
259   }
260
261   def findLatestUserWalletEntries(userId: String) = {
262     Maybe {
263       val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order
264       val cursor = walletEntries.find().sort(orderBy)
265
266       try {
267         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
268         if(cursor.hasNext) {
269           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
270           buffer += walletEntry
271
272           var _previousOccurredMillis = walletEntry.occurredMillis
273           var _ok = true
274
275           while(cursor.hasNext && _ok) {
276             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
277             var currentOccurredMillis = walletEntry.occurredMillis
278             _ok = currentOccurredMillis == _previousOccurredMillis
279             
280             if(_ok) {
281               buffer += walletEntry
282             }
283           }
284
285           buffer.toList
286         } else {
287           null
288         }
289       } finally {
290         cursor.close()
291       }
292     }
293   }
294
295   def findPreviousEntry(userId: String, resource: String,
296                         instanceId: String,
297                         finalized: Option[Boolean]): List[WalletEntry] = {
298     val q = new BasicDBObject()
299     q.put(WalletJsonNames.userId, userId)
300     q.put(WalletJsonNames.resource, resource)
301     q.put(WalletJsonNames.instanceId, instanceId)
302     finalized match {
303       case Some(x) => q.put(WalletJsonNames.finalized, x)
304       case None =>
305     }
306
307     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
308   }
309   //-WalletEntryStore
310
311   //+UserEventStore
312   def storeUnparsed(json: String): Maybe[RecordID] = {
313     MongoDBStore.storeJustJson(json, unparsedUserEvents)
314   }
315
316   def storeUserEvent(event: UserEvent): Maybe[RecordID] =
317     MongoDBStore.storeAny[UserEvent](event, userEvents, UserEventJsonNames.userID,
318       _.userID, MongoDBStore.jsonSupportToDBObject)
319
320
321   def findUserEventById(id: String): Maybe[UserEvent] =
322     MongoDBStore.findById[UserEvent](id, userEvents, MongoDBStore.dbObjectToUserEvent)
323
324   def findUserEventsByUserId(userId: String): List[UserEvent] = {
325     val query = new BasicDBObject(UserEventJsonNames.userID, userId)
326     MongoDBStore.runQuery(query, userEvents)(MongoDBStore.dbObjectToUserEvent)(Some(_sortByTimestampAsc))
327   }
328   //-UserEventStore
329
330   //+PolicyStore
331   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
332     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
333       new BasicDBObject("$gt", after))
334     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
335   }
336
337   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
338
339
340   def updatePolicyEntry(policy: PolicyEntry) = {
341     //Find the entry
342     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
343     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
344     policyEntries.update(query, policyObject, true, false)
345   }
346   
347   def findPolicyEntry(id: String) =
348     MongoDBStore.findById[PolicyEntry](id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
349
350   //-PolicyStore
351 }
352
353 object MongoDBStore {
354   object JsonNames {
355     final val _id = "_id"
356   }
357
358   /**
359    * Collection holding the [[gr.grnet.aquarium.logic.events.ResourceEvent]]s.
360    *
361    * Resource events are coming from all systems handling billable resources.
362    */
363   final val RESOURCE_EVENTS_COLLECTION = "resevents"
364
365   /**
366    * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]].
367    *
368    * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.user.actor.UserActor]]s.
369    */
370   final val USER_STATES_COLLECTION = "userstates"
371
372   /**
373    * Collection holding [[gr.grnet.aquarium.logic.events.UserEvent]]s.
374    *
375    * User events are coming from the IM module (external).
376    */
377   final val USER_EVENTS_COLLECTION = "userevents"
378
379   /**
380    * Collection holding [[gr.grnet.aquarium.logic.events.UserEvent]]s that could not be parsed to normal objects.
381    *
382    * We of course assume at least a valid JSON representation.
383    *
384    * User events are coming from the IM module (external).
385    */
386   final val UNPARSED_USER_EVENTS_COLLECTION = "unparsed_userevents"
387
388   /**
389    * Collection holding [[gr.grnet.aquarium.logic.events.WalletEntry]].
390    *
391    * Wallet entries are generated internally in Aquarium.
392    */
393   final val WALLET_ENTRIES_COLLECTION = "wallets"
394
395   /**
396    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
397    */
398 //  final val POLICIES_COLLECTION = "policies"
399
400   /**
401    * Collection holding [[gr.grnet.aquarium.logic.events.PolicyEntry]].
402    */
403   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
404
405   /* TODO: Some of the following methods rely on JSON (de-)serialization).
406   * A method based on proper object serialization would be much faster.
407   */
408   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
409     ResourceEvent.fromJson(JSON.serialize(dbObject))
410   }
411
412   def dbObjectToUserState(dbObj: DBObject): UserState = {
413     UserState.fromJson(JSON.serialize(dbObj))
414   }
415
416   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
417     WalletEntry.fromJson(JSON.serialize(dbObj))
418   }
419
420   def dbObjectToUserEvent(dbObj: DBObject): UserEvent = {
421     UserEvent.fromJson(JSON.serialize(dbObj))
422   }
423
424   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
425     PolicyEntry.fromJson(JSON.serialize(dbObj))
426   }
427
428   def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
429     val query = new BasicDBObject(ResourceJsonNames.id, id)
430     val cursor = collection find query
431
432     try {
433       if(cursor.hasNext)
434         deserializer apply cursor.next
435       else
436         null: A // will be transformed to NoVal by the Maybe polymorphic constructor
437     } finally {
438       cursor.close()
439     }
440   }
441
442   def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
443                                   (deserializer: (DBObject) => A)
444                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
445     val cursor0 = collection find query
446     val cursor = if(orderBy ne null) {
447       cursor0 sort orderBy
448     } else {
449       cursor0
450     } // I really know that docs say that it is the same cursor.
451
452     if(!cursor.hasNext) {
453       cursor.close()
454       Nil
455     } else {
456       val buff = new ListBuffer[A]()
457
458       while(cursor.hasNext) {
459         buff += deserializer apply cursor.next
460       }
461
462       cursor.close()
463
464       sortWith match {
465         case Some(sorter) => buff.toList.sortWith(sorter)
466         case None => buff.toList
467       }
468     }
469   }
470
471   def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
472     storeAny[A](event, collection, ResourceJsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
473   }
474
475   def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
476     storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
477   }
478   
479   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
480     storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject)
481   }
482
483   def storeJustJson(json: String, collection: DBCollection): Maybe[RecordID] = {
484     Maybe {
485       val dbObj = jsonStringToDBObject(json)
486       val writeResult = collection insert dbObj
487       writeResult.getLastError().throwOnError()
488       val objectId = dbObj.get("_id").asInstanceOf[ObjectId]
489
490       RecordID(objectId.toString)
491     }
492   }
493
494   def storeAny[A](any: A,
495                   collection: DBCollection,
496                   idName: String,
497                   idValueProvider: (A) => String,
498                   serializer: (A) => DBObject) : Maybe[RecordID] = {
499     import com.ckkloverdos.maybe.effect
500
501     Maybe {
502       val dbObj = serializer apply any
503       val writeResult = collection insert dbObj
504       writeResult.getLastError().throwOnError()
505
506       // Get back to retrieve unique id
507       val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
508       cursor
509     } flatMap { cursor ⇒
510       effect {
511         if(cursor.hasNext)
512           RecordID(cursor.next().get(JsonNames._id).toString)
513         else
514           throw new StoreException("Could not store %s to %s".format(any, collection))
515       } {} { cursor.close() }
516     }
517   }
518
519   def jsonSupportToDBObject(any: JsonSupport): DBObject = {
520     jsonStringToDBObject(any.toJson)
521   }
522
523   def jsonStringToDBObject(json: String): DBObject = {
524     JSON.parse(json) match {
525       case dbObject: DBObject ⇒
526         dbObject
527       case _ ⇒
528         throw new StoreException("Could not transform %s -> %s".format(json.getClass, classOf[DBObject].getName))
529     }
530   }
531 }