2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import collection.immutable
40 import gr.grnet.aquarium.computation.BillingMonthInfo
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
43 import gr.grnet.aquarium.message.MessageConstants
44 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
45 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.util._
48 import gr.grnet.aquarium.util.Once
49 import gr.grnet.aquarium.util.json.JsonSupport
50 import gr.grnet.aquarium.{Aquarium, AquariumException}
51 import org.apache.avro.specific.SpecificRecord
52 import org.bson.types.ObjectId
55 * Mongodb implementation of the various aquarium stores.
57 * @author Christos KK Loverdos <loverdos@gmail.com>
58 * @author Georgios Gousios <gousiosg@gmail.com>
59 * @author Prodromos Gerakios <pgerakio@grnet.gr>
62 val aquarium: Aquarium,
67 extends ResourceEventStore
73 private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection)
74 private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection)
75 private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection)
76 private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection)
78 private[store] lazy val indicesMap = {
79 val resev= new BasicDBObjectBuilder().
80 add(MongoDBStore.JsonNames.id,1).
81 add(MongoDBStore.JsonNames.userID,1).
82 add(MongoDBStore.JsonNames.occurredMillis,1).
83 add(MongoDBStore.JsonNames.receivedMillis,1).get
84 val imev = new BasicDBObjectBuilder().
85 add(MongoDBStore.JsonNames.userID,1).
86 add(MongoDBStore.JsonNames.eventType,"").
87 add(MongoDBStore.JsonNames.occurredMillis,1).get
88 val policy = new BasicDBObjectBuilder().
89 add("validFromMillis",1).
90 add("validToMillis",1).get
91 val user = new BasicDBObjectBuilder().
92 add( "occurredMillis",1).
93 add("isFullBillingMonth",false).
95 add("billingMonth",1).
96 add("billingMonthDay",1).get
97 Map(MongoDBStore.ResourceEventCollection -> resev,
98 MongoDBStore.IMEventCollection-> imev,
99 MongoDBStore.PolicyCollection-> policy,
100 MongoDBStore.UserStateCollection-> user
103 private[this] val once = new Once()
105 private[this] def doAuthenticate(db: DB) {
106 if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
107 throw new AquariumException("Could not authenticate user %s".format(username))
111 private[this] def getCollection(name: String): DBCollection = {
112 val db = mongo.getDB(database)
114 once.run { /* this code is thread-safe and will run exactly once*/
115 indicesMap.foreach { case (collection,obj) =>
116 mongo.getDB(database).getCollection(collection).createIndex(obj)
119 db.getCollection(name)
122 //+ResourceEventStore
123 def pingResourceEventStore(): Unit = synchronized {
124 getCollection(MongoDBStore.ResourceEventCollection)
125 MongoDBStore.ping(mongo)
128 def insertResourceEvent(event: ResourceEventMsg) = {
129 val mongoID = new ObjectId()
130 event.setInStoreID(mongoID.toStringMongod)
132 val dbObject = new BasicDBObjectBuilder().
133 add(MongoDBStore.JsonNames._id, mongoID).
134 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
135 add(MongoDBStore.JsonNames.userID, event.getUserID).
136 add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
137 add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
140 MongoDBStore.insertDBObject(dbObject, resourceEvents)
144 def findResourceEventByID(id: String): Option[ResourceEventMsg] = {
145 val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id)
147 dbObject ← dbObjectOpt
148 payload = dbObject.get(MongoDBStore.JsonNames.payload)
149 msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg)
153 def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
154 val query = new BasicDBObjectBuilder().
155 add(MongoDBStore.JsonNames.userID, userID).
156 // received within the period
157 add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)).
158 add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)).
159 // occurred outside the period
161 val dbList = new BasicDBList()
162 dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis)))
163 dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis)))
168 resourceEvents.count(query)
171 def foreachResourceEventOccurredInPeriod(
175 )(f: ResourceEventMsg ⇒ Unit): Long = {
177 val query = new BasicDBObjectBuilder().
178 add(MongoDBStore.JsonNames.userID, userID).
179 add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
180 add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)).
183 val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)
184 val cursor = resourceEvents.find(query).sort(sorter)
186 withCloseable(cursor) { cursor ⇒
187 while(cursor.hasNext) {
188 val nextDBObject = cursor.next()
189 val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
190 val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg)
199 //-ResourceEventStore
202 def findUserStateByUserID(userID: String) = {
203 val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID)
205 dbObject <- dbObjectOpt
206 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
207 msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg)
213 def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
214 val query = new BasicDBObjectBuilder().
215 add(MongoDBStore.JsonNames.userID, userID).
216 add(MongoDBStore.JsonNames.isForFullMonth, true).
217 add(MongoDBStore.JsonNames.billingYear, bmi.year).
218 add(MongoDBStore.JsonNames.billingMonth, bmi.month).
221 // Descending order, so that the latest comes first
222 val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
224 val cursor = userStates.find(query).sort(sorter)
226 withCloseable(cursor) { cursor ⇒
227 MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
231 def findLatestUserState(userID: String) = {
232 val query = new BasicDBObjectBuilder().
233 add(MongoDBStore.JsonNames.userID, userID).
236 // Descending order, so that the latest comes first
237 val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
239 val cursor = userStates.find(query).sort(sorter)
241 withCloseable(cursor) { cursor ⇒
242 MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
247 * Stores a user state.
249 def insertUserState(event: UserStateMsg)= {
250 val mongoID = new ObjectId()
251 event.setInStoreID(mongoID.toStringMongod)
253 val dbObject = new BasicDBObjectBuilder().
254 add(MongoDBStore.JsonNames._id, mongoID).
255 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
256 add(MongoDBStore.JsonNames.userID, event.getUserID).
257 add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
258 add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth).
259 add(MongoDBStore.JsonNames.billingYear, event.getBillingYear).
260 add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth).
261 add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay).
264 MongoDBStore.insertDBObject(dbObject, userStates)
270 def pingIMEventStore(): Unit = {
271 getCollection(MongoDBStore.IMEventCollection)
272 MongoDBStore.ping(mongo)
275 def insertIMEvent(event: IMEventMsg) = {
276 val mongoID = new ObjectId()
277 event.setInStoreID(mongoID.toStringMongod)
279 val dbObject = new BasicDBObjectBuilder().
280 add(MongoDBStore.JsonNames._id, mongoID).
281 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
282 add(MongoDBStore.JsonNames.userID, event.getUserID).
283 add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase).
284 add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
285 add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
288 MongoDBStore.insertDBObject(dbObject, imEvents)
292 def findIMEventByID(id: String) = {
293 val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id)
295 dbObject ← dbObjectOpt
296 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
297 msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
305 * Find the `CREATE` even for the given user. Note that there must be only one such event.
307 def findCreateIMEventByUserID(userID: String) = {
308 val query = new BasicDBObjectBuilder().
309 add(MongoDBStore.JsonNames.userID, userID).
310 add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get()
312 // Normally one such event is allowed ...
313 val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
315 val dbObjectOpt = withCloseable(cursor) { cursor ⇒
324 dbObject <- dbObjectOpt
325 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
326 msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
333 * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
334 * the given function `f`.
336 * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
338 def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
339 val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID)
340 val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
342 var _shouldContinue = true
343 withCloseable(cursor) { cursor ⇒
344 while(_shouldContinue && cursor.hasNext) {
345 val dbObject = cursor.next()
346 val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
347 val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
349 _shouldContinue = f(msg)
358 def foreachPolicy[U](f: PolicyMsg ⇒ U) {
359 val cursor = policies.find()
360 withCloseable(cursor) { cursor ⇒
361 while(cursor.hasNext) {
362 val dbObject = cursor.next()
363 val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
364 val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg)
370 def insertPolicy(policy: PolicyMsg): PolicyMsg = {
371 val mongoID = new ObjectId()
372 policy.setInStoreID(mongoID.toStringMongod)
373 val dbObject = new BasicDBObjectBuilder().
374 add(MongoDBStore.JsonNames._id, mongoID).
375 add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis).
376 add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis).
377 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)).
380 MongoDBStore.insertDBObject(dbObject, policies)
384 def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = {
386 var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
387 foreachPolicy(_policies += _)
388 _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption
391 def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = {
393 var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
394 foreachPolicy(_policies += _)
396 immutable.SortedMap(_policies.
397 from(MessageFactory.newDummyPolicyMsgAt(fromMillis)).
398 to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq.
399 map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _*
405 object MongoDBStore {
406 final val JsonNames = gr.grnet.aquarium.util.json.JsonNames
408 final val collections = List("resevents","userstates","imevents","policies")
410 final val ResourceEventCollection = collections(0)
412 final val UserStateCollection = collections(1)
414 final val IMEventCollection = collections(2)
416 final val PolicyCollection = collections(3)
418 def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
419 withCloseable(cursor) { cursor ⇒
421 Some(f(cursor.next()))
428 def ping(mongo: Mongo): Unit = synchronized {
429 // This requires a network roundtrip
433 def findOneByAttribute(
434 collection: DBCollection,
435 attributeName: String,
436 attributeValue: String,
437 sortByOpt: Option[DBObject] = None
438 ): Option[DBObject] = {
439 val query = new BasicDBObject(attributeName, attributeValue)
440 val cursor = sortByOpt match {
441 case None ⇒ collection find query
442 case Some(sortBy) ⇒ collection find query sort sortBy
444 withCloseable(cursor) { cursor ⇒
445 if(cursor.hasNext) Some(cursor.next()) else None
449 def insertDBObject(dbObj: DBObject, collection: DBCollection) {
450 collection.insert(dbObj, WriteConcern.JOURNAL_SAFE)
453 def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = {
455 dbObject <- if(cursor.hasNext) Some(cursor.next()) else None
456 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
457 msg = AvroHelpers.specificRecordOfBytes(payload, fresh)
463 def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
464 StdConverters.AllConverters.convertEx[DBObject](jsonSupport)