Return the number of resource events processed
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.mongodb
37
38 import collection.immutable
39 import com.mongodb._
40 import gr.grnet.aquarium.computation.BillingMonthInfo
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
43 import gr.grnet.aquarium.message.MessageConstants
44 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
45 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.util._
48 import gr.grnet.aquarium.util.json.JsonSupport
49 import gr.grnet.aquarium.{Aquarium, AquariumException}
50 import org.apache.avro.specific.SpecificRecord
51 import org.bson.types.ObjectId
52
53 /**
54  * Mongodb implementation of the various aquarium stores.
55  *
56  * @author Christos KK Loverdos <loverdos@gmail.com>
57  * @author Georgios Gousios <gousiosg@gmail.com>
58  * @author Prodromos Gerakios <pgerakio@grnet.gr>
59  */
60 class MongoDBStore(
61     val aquarium: Aquarium,
62     val mongo: Mongo,
63     val database: String,
64     val username: String,
65     val password: String)
66   extends ResourceEventStore
67   with UserStateStore
68   with IMEventStore
69   with PolicyStore
70   with Loggable {
71
72   private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection)
73   private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection)
74   private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection)
75   private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection)
76
77   private[this] def doAuthenticate(db: DB) {
78     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
79       throw new AquariumException("Could not authenticate user %s".format(username))
80     }
81   }
82
83   private[this] def getCollection(name: String): DBCollection = {
84     val db = mongo.getDB(database)
85     doAuthenticate(db)
86     db.getCollection(name)
87   }
88
89   //+ResourceEventStore
90   def pingResourceEventStore(): Unit = synchronized {
91     getCollection(MongoDBStore.ResourceEventCollection)
92     MongoDBStore.ping(mongo)
93   }
94
95   def insertResourceEvent(event: ResourceEventMsg) = {
96     val mongoID = new ObjectId()
97     event.setInStoreID(mongoID.toStringMongod)
98
99     val dbObject = new BasicDBObjectBuilder().
100       add(MongoDBStore.JsonNames._id, mongoID).
101       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
102       add(MongoDBStore.JsonNames.userID, event.getUserID).
103       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
104       add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
105     get()
106
107     MongoDBStore.insertDBObject(dbObject, resourceEvents)
108     event
109   }
110
111   def findResourceEventByID(id: String): Option[ResourceEventMsg] = {
112     val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id)
113     for {
114       dbObject ← dbObjectOpt
115       payload = dbObject.get(MongoDBStore.JsonNames.payload)
116       msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg)
117     } yield msg
118   }
119
120   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
121     val query = new BasicDBObjectBuilder().
122       add(MongoDBStore.JsonNames.userID, userID).
123       // received within the period
124       add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)).
125       add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)).
126       // occurred outside the period
127       add("$or", {
128         val dbList = new BasicDBList()
129         dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis)))
130         dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis)))
131         dbList
132       }).
133       get()
134
135     resourceEvents.count(query)
136   }
137
138   def foreachResourceEventOccurredInPeriod(
139       userID: String,
140       startMillis: Long,
141       stopMillis: Long
142   )(f: ResourceEventMsg ⇒ Unit): Long = {
143     var _counter= 0L
144     val query = new BasicDBObjectBuilder().
145       add(MongoDBStore.JsonNames.userID, userID).
146       add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
147       add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)).
148       get()
149
150     val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)
151     val cursor = resourceEvents.find(query).sort(sorter)
152
153     withCloseable(cursor) { cursor ⇒
154       while(cursor.hasNext) {
155         val nextDBObject = cursor.next()
156         val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
157         val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg)
158
159         f(nextEvent)
160         _counter += 1
161       }
162     }
163
164     _counter
165   }
166   //-ResourceEventStore
167
168   //+ UserStateStore
169   def findUserStateByUserID(userID: String) = {
170     val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID)
171     for {
172       dbObject <- dbObjectOpt
173       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
174       msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg)
175     } yield {
176       msg
177     }
178   }
179
180   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
181     val query = new BasicDBObjectBuilder().
182       add(MongoDBStore.JsonNames.userID, userID).
183       add(MongoDBStore.JsonNames.isForFullMonth, true).
184       add(MongoDBStore.JsonNames.billingYear, bmi.year).
185       add(MongoDBStore.JsonNames.billingMonth, bmi.month).
186       get()
187
188     // Descending order, so that the latest comes first
189     val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
190
191     val cursor = userStates.find(query).sort(sorter)
192
193     withCloseable(cursor) { cursor ⇒
194       MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
195     }
196   }
197
198   def findLatestUserState(userID: String) = {
199     val query = new BasicDBObjectBuilder().
200       add(MongoDBStore.JsonNames.userID, userID).
201       get()
202
203     // Descending order, so that the latest comes first
204     val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
205
206     val cursor = userStates.find(query).sort(sorter)
207
208     withCloseable(cursor) { cursor ⇒
209       MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
210     }
211   }
212
213   /**
214    * Stores a user state.
215    */
216   def insertUserState(event: UserStateMsg)= {
217     val mongoID = new ObjectId()
218     event.setInStoreID(mongoID.toStringMongod)
219
220     val dbObject = new BasicDBObjectBuilder().
221       add(MongoDBStore.JsonNames._id, mongoID).
222       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
223       add(MongoDBStore.JsonNames.userID, event.getUserID).
224       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
225       add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth).
226       add(MongoDBStore.JsonNames.billingYear, event.getBillingYear).
227       add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth).
228       add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay).
229     get()
230
231     MongoDBStore.insertDBObject(dbObject, userStates)
232     event
233   }
234   //- UserStateStore
235
236   //+IMEventStore
237   def pingIMEventStore(): Unit = {
238     getCollection(MongoDBStore.IMEventCollection)
239     MongoDBStore.ping(mongo)
240   }
241
242   def insertIMEvent(event: IMEventMsg) = {
243     val mongoID = new ObjectId()
244     event.setInStoreID(mongoID.toStringMongod)
245
246     val dbObject = new BasicDBObjectBuilder().
247       add(MongoDBStore.JsonNames._id, mongoID).
248       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
249       add(MongoDBStore.JsonNames.userID, event.getUserID).
250       add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase).
251       add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
252       add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
253     get()
254
255     MongoDBStore.insertDBObject(dbObject, imEvents)
256     event
257   }
258
259   def findIMEventByID(id: String) = {
260     val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id)
261     for {
262       dbObject ← dbObjectOpt
263       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
264       msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
265     } yield {
266       msg
267     }
268   }
269
270
271   /**
272    * Find the `CREATE` even for the given user. Note that there must be only one such event.
273    */
274   def findCreateIMEventByUserID(userID: String) = {
275     val query = new BasicDBObjectBuilder().
276       add(MongoDBStore.JsonNames.userID, userID).
277       add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get()
278
279     // Normally one such event is allowed ...
280     val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
281
282     val dbObjectOpt = withCloseable(cursor) { cursor ⇒
283       if(cursor.hasNext) {
284         Some(cursor.next())
285       } else {
286         None
287       }
288     }
289
290     for {
291       dbObject <- dbObjectOpt
292       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
293       msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
294     } yield {
295       msg
296     }
297   }
298
299   /**
300    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
301    * the given function `f`.
302    *
303    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
304    */
305   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
306     val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID)
307     val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
308
309     var _shouldContinue = true
310     withCloseable(cursor) { cursor ⇒
311       while(_shouldContinue && cursor.hasNext) {
312         val dbObject = cursor.next()
313         val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
314         val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
315
316         _shouldContinue = f(msg)
317       }
318     }
319
320     _shouldContinue
321   }
322   //-IMEventStore
323
324   //+PolicyStore
325   def foreachPolicy[U](f: PolicyMsg ⇒ U) {
326     val cursor = policies.find()
327     withCloseable(cursor) { cursor ⇒
328       while(cursor.hasNext) {
329         val dbObject = cursor.next()
330         val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
331         val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg)
332         f(policy)
333       }
334     }
335   }
336
337   def insertPolicy(policy: PolicyMsg): PolicyMsg = {
338     val mongoID = new ObjectId()
339     policy.setInStoreID(mongoID.toStringMongod)
340     val dbObject = new BasicDBObjectBuilder().
341       add(MongoDBStore.JsonNames._id, mongoID).
342       add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis).
343       add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis).
344       add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)).
345     get()
346
347     MongoDBStore.insertDBObject(dbObject, policies)
348     policy
349   }
350
351   def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = {
352     // FIXME Inefficient
353     var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
354     foreachPolicy(_policies += _)
355     _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption
356   }
357
358   def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = {
359     // FIXME Inefficient
360     var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
361     foreachPolicy(_policies += _)
362
363     immutable.SortedMap(_policies.
364       from(MessageFactory.newDummyPolicyMsgAt(fromMillis)).
365       to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq.
366       map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _*
367     )
368   }
369   //-PolicyStore
370 }
371
372 object MongoDBStore {
373   final val JsonNames = gr.grnet.aquarium.util.json.JsonNames
374
375   final val ResourceEventCollection = "resevents"
376
377   final val UserStateCollection = "userstates"
378
379   final val UserAgreementHistoryCollection = "useragreementhistory"
380
381   final val IMEventCollection = "imevents"
382
383   final val PolicyCollection = "policies"
384
385   def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
386     withCloseable(cursor) { cursor ⇒
387       if(cursor.hasNext) {
388         Some(f(cursor.next()))
389       } else {
390         None
391       }
392     }
393   }
394
395   def ping(mongo: Mongo): Unit = synchronized {
396     // This requires a network roundtrip
397     mongo.isLocked
398   }
399
400   def findOneByAttribute(
401       collection: DBCollection,
402       attributeName: String,
403       attributeValue: String,
404       sortByOpt: Option[DBObject] = None
405   ): Option[DBObject] =  {
406     val query = new BasicDBObject(attributeName, attributeValue)
407     val cursor = sortByOpt match {
408       case None         ⇒ collection find query
409       case Some(sortBy) ⇒ collection find query sort sortBy
410     }
411     withCloseable(cursor) { cursor ⇒
412       if(cursor.hasNext) Some(cursor.next()) else None
413     }
414   }
415
416   def insertDBObject(dbObj: DBObject, collection: DBCollection) {
417     collection.insert(dbObj, WriteConcern.JOURNAL_SAFE)
418   }
419
420   def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = {
421     for {
422       dbObject <- if(cursor.hasNext) Some(cursor.next()) else None
423       payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
424       msg = AvroHelpers.specificRecordOfBytes(payload, fresh)
425     } yield {
426       msg
427     }
428   }
429
430   def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
431     StdConverters.AllConverters.convertEx[DBObject](jsonSupport)
432   }
433 }