Add ping functionality to two of the stores
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.computation.UserState.{JsonNames ⇒ UserStateJsonNames}
40 import gr.grnet.aquarium.util.json.JsonSupport
41 import collection.mutable.ListBuffer
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames}
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.event.model.WalletEntry.{JsonNames ⇒ WalletJsonNames}
48 import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
49 import java.util.Date
50 import gr.grnet.aquarium.logic.accounting.Policy
51 import com.mongodb._
52 import org.bson.types.ObjectId
53 import com.ckkloverdos.maybe.Maybe
54 import gr.grnet.aquarium.util._
55 import gr.grnet.aquarium.converter.Conversions
56 import gr.grnet.aquarium.computation.UserState
57 import gr.grnet.aquarium.event.model.{ExternalEventModel, WalletEntry, PolicyEntry}
58
59 /**
60  * Mongodb implementation of the various aquarium stores.
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  * @author Georgios Gousios <gousiosg@gmail.com>
64  */
65 class MongoDBStore(
66     val mongo: Mongo,
67     val database: String,
68     val username: String,
69     val password: String)
70   extends ResourceEventStore
71   with UserStateStore
72   with WalletEntryStore
73   with IMEventStore
74   with PolicyStore
75   with Loggable {
76
77   override type IMEvent = MongoDBIMEvent
78   override type ResourceEvent = MongoDBResourceEvent
79
80   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
81   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
82   private[store] lazy val imEvents         = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
83   private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
84   private[store] lazy val walletEntries    = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
85   private[store] lazy val policyEntries    = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
86
87   private[this] def getCollection(name: String): DBCollection = {
88     val db = mongo.getDB(database)
89     //logger.debug("Authenticating to mongo")
90     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
91       throw new StoreException("Could not authenticate user %s".format(username))
92     }
93     db.getCollection(name)
94   }
95
96   private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = {
97     if (one.occurredMillis > two.occurredMillis) false
98     else if (one.occurredMillis < two.occurredMillis) true
99     else true
100   }
101
102   //+ResourceEventStore
103   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
104     MongoDBResourceEvent.fromOther(event, null)
105   }
106
107   def pingResourceEventStore(): Unit = {
108     MongoDBStore.ping(mongo)
109   }
110
111   def insertResourceEvent(event: ResourceEventModel) = {
112     val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
113     MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
114     localEvent
115   }
116
117   def findResourceEventById(id: String): Option[ResourceEvent] = {
118     MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
119   }
120
121   def findResourceEventsByUserId(userId: String)
122                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
123     val query = new BasicDBObject(ResourceEventNames.userID, userId)
124
125     MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
126   }
127
128   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
129     val query = new BasicDBObject()
130     query.put(ResourceEventNames.userID, userId)
131     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
132     
133     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
134
135     val cursor = resourceEvents.find(query).sort(sort)
136
137     try {
138       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
139       while(cursor.hasNext) {
140         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
141       }
142       buffer.toList.sortWith(_sortByTimestampAsc)
143     } finally {
144       cursor.close()
145     }
146   }
147
148   def findResourceEventHistory(userId: String, resName: String,
149                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
150     val query = new BasicDBObject()
151     query.put(ResourceEventNames.userID, userId)
152     query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
153     query.put(ResourceEventNames.resource, resName)
154
155     instid match {
156       case Some(id) =>
157         Policy.policy.findResource(resName) match {
158           case Some(y) => query.put(ResourceEventNames.details,
159             new BasicDBObject(y.descriminatorField, instid.get))
160           case None =>
161         }
162       case None =>
163     }
164
165     val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
166     val cursor = resourceEvents.find(query).sort(sort)
167
168     try {
169       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
170       while(cursor.hasNext) {
171         buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
172       }
173       buffer.toList.sortWith(_sortByTimestampAsc)
174     } finally {
175       cursor.close()
176     }
177   }
178
179   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
180     val query = new BasicDBObject()
181     query.put(ResourceEventNames.userID, userId)
182     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
183     query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
184
185     // Sort them by increasing order for occurred time
186     val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
187
188     MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
189   }
190   
191   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
192     // FIXME: Implement
193     0L
194   }
195
196   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
197                                                     startMillis: Long,
198                                                     stopMillis: Long): List[ResourceEvent] = {
199     // FIXME: Implement
200     Nil
201   }
202   //-ResourceEventStore
203
204   //+ UserStateStore
205   def insertUserState(userState: UserState) = {
206     MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject)
207   }
208
209   def findUserStateByUserID(userID: String): Option[UserState] = {
210     val query = new BasicDBObject(UserStateJsonNames.userID, userID)
211     val cursor = userStates find query
212
213     withCloseable(cursor) { cursor ⇒
214       if(cursor.hasNext)
215         Some(MongoDBStore.dbObjectToUserState(cursor.next()))
216       else
217         None
218     }
219   }
220
221
222   def findLatestUserStateByUserID(userID: String) = {
223     // FIXME: implement
224     null
225   }
226
227   def findLatestUserStateForEndOfBillingMonth(userId: String,
228                                               yearOfBillingMonth: Int,
229                                               billingMonth: Int): Option[UserState] = {
230     None // FIXME: implement
231   }
232
233   def deleteUserState(userId: String) = {
234     val query = new BasicDBObject(UserStateJsonNames.userID, userId)
235     userStates.findAndRemove(query)
236   }
237   //- UserStateStore
238
239   //+WalletEntryStore
240   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
241     Maybe {
242       MongoDBStore.storeAny[WalletEntry](
243         entry,
244         walletEntries,
245         WalletJsonNames.id,
246         (e) => e.id,
247         MongoDBStore.jsonSupportToDBObject)
248     }
249   }
250
251   def findWalletEntryById(id: String): Maybe[WalletEntry] = {
252     MongoDBStore.findBy(WalletJsonNames.id, id, walletEntries, MongoDBStore.dbObjectToWalletEntry): Maybe[WalletEntry]
253   }
254
255   def findUserWalletEntries(userId: String) = {
256     // TODO: optimize
257     findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
258   }
259
260   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
261     val q = new BasicDBObject()
262     // TODO: Is this the correct way for an AND query?
263     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
264     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime))
265     q.put(WalletJsonNames.userId, userId)
266
267     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
268   }
269
270   def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
271     val q = new BasicDBObject()
272     q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
273     q.put(WalletJsonNames.userId, userId)
274
275     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
276   }
277
278   def findLatestUserWalletEntries(userId: String) = {
279     Maybe {
280       val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order
281       val cursor = walletEntries.find().sort(orderBy)
282
283       try {
284         val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
285         if(cursor.hasNext) {
286           val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
287           buffer += walletEntry
288
289           var _previousOccurredMillis = walletEntry.occurredMillis
290           var _ok = true
291
292           while(cursor.hasNext && _ok) {
293             val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
294             var currentOccurredMillis = walletEntry.occurredMillis
295             _ok = currentOccurredMillis == _previousOccurredMillis
296             
297             if(_ok) {
298               buffer += walletEntry
299             }
300           }
301
302           buffer.toList
303         } else {
304           null
305         }
306       } finally {
307         cursor.close()
308       }
309     }
310   }
311
312   def findPreviousEntry(userId: String, resource: String,
313                         instanceId: String,
314                         finalized: Option[Boolean]): List[WalletEntry] = {
315     val q = new BasicDBObject()
316     q.put(WalletJsonNames.userId, userId)
317     q.put(WalletJsonNames.resource, resource)
318     q.put(WalletJsonNames.instanceId, instanceId)
319     finalized match {
320       case Some(x) => q.put(WalletJsonNames.finalized, x)
321       case None =>
322     }
323
324     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
325   }
326   //-WalletEntryStore
327
328   //+IMEventStore
329   def createIMEventFromJson(json: String) = {
330     MongoDBStore.createIMEventFromJson(json)
331   }
332
333   def createIMEventFromOther(event: IMEventModel) = {
334     MongoDBStore.createIMEventFromOther(event)
335   }
336
337   def pingIMEventStore(): Unit = {
338     MongoDBStore.ping(mongo)
339   }
340
341   def insertIMEvent(event: IMEventModel): IMEvent = {
342     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
343     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
344     localEvent
345   }
346
347   def findIMEventById(id: String): Option[IMEvent] = {
348     MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
349   }
350   //-IMEventStore
351
352   //+PolicyStore
353   def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
354     val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
355       new BasicDBObject("$gt", after))
356     MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
357   }
358
359   def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
360
361
362   def updatePolicyEntry(policy: PolicyEntry) = {
363     //Find the entry
364     val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
365     val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
366     policyEntries.update(query, policyObject, true, false)
367   }
368   
369   def findPolicyEntry(id: String) = {
370     MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
371   }
372
373   //-PolicyStore
374 }
375
376 object MongoDBStore {
377   object JsonNames {
378     final val _id = "_id"
379   }
380
381   /**
382    * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
383    *
384    * Resource events are coming from all systems handling billable resources.
385    */
386   final val RESOURCE_EVENTS_COLLECTION = "resevents"
387
388   /**
389    * Collection holding the snapshots of [[gr.grnet.aquarium.computation.UserState]].
390    *
391    * [[gr.grnet.aquarium.computation.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user.UserActor]]s.
392    */
393   final val USER_STATES_COLLECTION = "userstates"
394
395   /**
396    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
397    *
398    * User events are coming from the IM module (external).
399    */
400   final val IM_EVENTS_COLLECTION = "imevents"
401
402   /**
403    * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s that could not be parsed to normal objects.
404    *
405    * We of course assume at least a valid JSON representation.
406    *
407    * User events are coming from the IM module (external).
408    */
409   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
410
411   /**
412    * Collection holding [[gr.grnet.aquarium.event.model.WalletEntry]].
413    *
414    * Wallet entries are generated internally in Aquarium.
415    */
416   final val WALLET_ENTRIES_COLLECTION = "wallets"
417
418   /**
419    * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
420    */
421 //  final val POLICIES_COLLECTION = "policies"
422
423   /**
424    * Collection holding [[gr.grnet.aquarium.event.model.PolicyEntry]].
425    */
426   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
427
428   def dbObjectToUserState(dbObj: DBObject): UserState = {
429     UserState.fromJson(JSON.serialize(dbObj))
430   }
431
432   def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
433     WalletEntry.fromJson(JSON.serialize(dbObj))
434   }
435
436   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
437     PolicyEntry.fromJson(JSON.serialize(dbObj))
438   }
439
440   def ping(mongo: Mongo): Unit = {
441     // This requires a network roundtrip
442     mongo.isLocked
443   }
444
445   def findBy[A >: Null <: AnyRef](name: String,
446                                   value: String,
447                                   collection: DBCollection,
448                                   deserializer: (DBObject) => A) : Option[A] = {
449     val query = new BasicDBObject(name, value)
450     val cursor = collection find query
451
452     withCloseable(cursor) { cursor ⇒
453       if(cursor.hasNext)
454         Some(deserializer apply cursor.next)
455       else
456         None
457     }
458   }
459
460   def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
461                                   (deserializer: (DBObject) => A)
462                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
463     val cursor0 = collection find query
464     val cursor = if(orderBy ne null) {
465       cursor0 sort orderBy
466     } else {
467       cursor0
468     } // I really know that docs say that it is the same cursor.
469
470     if(!cursor.hasNext) {
471       cursor.close()
472       Nil
473     } else {
474       val buff = new ListBuffer[A]()
475
476       while(cursor.hasNext) {
477         buff += deserializer apply cursor.next
478       }
479
480       cursor.close()
481
482       sortWith match {
483         case Some(sorter) => buff.toList.sortWith(sorter)
484         case None => buff.toList
485       }
486     }
487   }
488
489   def storeUserState(userState: UserState, collection: DBCollection) = {
490     storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
491   }
492   
493   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
494     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
495   }
496
497   def storeAny[A](any: A,
498                   collection: DBCollection,
499                   idName: String,
500                   idValueProvider: (A) => String,
501                   serializer: (A) => DBObject) : RecordID = {
502
503     val dbObject = serializer apply any
504     val _id = new ObjectId()
505     dbObject.put("_id", _id)
506     val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
507     writeResult.getLastError().throwOnError()
508
509     RecordID(dbObject.get("_id").toString)
510   }
511
512   // FIXME: consolidate
513   def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
514     val dbObject = serializer apply obj
515     val objectId = obj._id  match {
516       case null ⇒
517         val _id = new ObjectId()
518         dbObject.put("_id", _id)
519         _id
520
521       case _id ⇒
522         _id
523     }
524
525     dbObject.put(JsonNames._id, objectId)
526
527     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
528
529     obj
530   }
531
532   def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = {
533     val dbObject = serializer apply obj
534     val objectId = obj._id  match {
535       case null ⇒
536         val _id = new ObjectId()
537         dbObject.put("_id", _id)
538         _id
539
540       case _id ⇒
541         _id
542     }
543
544     dbObject.put(JsonNames._id, objectId)
545
546     collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
547   }
548
549   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
550     Conversions.jsonSupportToDBObject(jsonSupport)
551   }
552
553   final def isLocalIMEvent(event: IMEventModel) = event match {
554     case _: MongoDBIMEvent ⇒ true
555     case _ ⇒ false
556   }
557
558   final def createIMEventFromJson(json: String) = {
559     MongoDBIMEvent.fromJsonString(json)
560   }
561
562   final def createIMEventFromOther(event: IMEventModel) = {
563     MongoDBIMEvent.fromOther(event, null)
564   }
565
566   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
567     MongoDBIMEvent.fromJsonBytes(jsonBytes)
568   }
569 }