2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import com.mongodb.util.JSON
39 import gr.grnet.aquarium.computation.state.UserState.{JsonNames ⇒ UserStateJsonNames}
40 import gr.grnet.aquarium.util.json.JsonSupport
41 import collection.mutable.ListBuffer
42 import gr.grnet.aquarium.event.model.im.IMEventModel
43 import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames}
44 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
45 import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
48 import gr.grnet.aquarium.logic.accounting.Policy
50 import org.bson.types.ObjectId
51 import com.ckkloverdos.maybe.Maybe
52 import gr.grnet.aquarium.util._
53 import gr.grnet.aquarium.converter.Conversions
54 import gr.grnet.aquarium.computation.state.UserState
55 import gr.grnet.aquarium.event.model.{ExternalEventModel, PolicyEntry}
56 import gr.grnet.aquarium.computation.BillingMonthInfo
57 import gr.grnet.aquarium.Aquarium
60 * Mongodb implementation of the various aquarium stores.
62 * @author Christos KK Loverdos <loverdos@gmail.com>
63 * @author Georgios Gousios <gousiosg@gmail.com>
70 extends ResourceEventStore
76 override type IMEvent = MongoDBIMEvent
77 override type ResourceEvent = MongoDBResourceEvent
79 private[store] lazy val resourceEvents = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
80 private[store] lazy val userStates = getCollection(MongoDBStore.USER_STATES_COLLECTION)
81 private[store] lazy val imEvents = getCollection(MongoDBStore.IM_EVENTS_COLLECTION)
82 private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION)
83 private[store] lazy val policyEntries = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION)
85 private[this] def getCollection(name: String): DBCollection = {
86 val db = mongo.getDB(database)
87 //logger.debug("Authenticating to mongo")
88 if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
89 throw new StoreException("Could not authenticate user %s".format(username))
91 db.getCollection(name)
94 private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = {
95 if (one.occurredMillis > two.occurredMillis) false
96 else if (one.occurredMillis < two.occurredMillis) true
100 //+ResourceEventStore
101 def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
102 MongoDBResourceEvent.fromOther(event, null)
105 def pingResourceEventStore(): Unit = synchronized {
106 MongoDBStore.ping(mongo)
109 def insertResourceEvent(event: ResourceEventModel) = {
110 val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
111 MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
115 def findResourceEventById(id: String): Option[ResourceEvent] = {
116 MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
119 def findResourceEventsByUserId(userId: String)
120 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
121 val query = new BasicDBObject(ResourceEventNames.userID, userId)
123 MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
126 def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
127 val query = new BasicDBObject()
128 query.put(ResourceEventNames.userID, userId)
129 query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
131 val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
133 val cursor = resourceEvents.find(query).sort(sort)
136 val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
137 while(cursor.hasNext) {
138 buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
140 buffer.toList.sortWith(_sortByTimestampAsc)
146 def findResourceEventHistory(userId: String, resName: String,
147 instid: Option[String], upTo: Long) : List[ResourceEvent] = {
148 val query = new BasicDBObject()
149 query.put(ResourceEventNames.userID, userId)
150 query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
151 query.put(ResourceEventNames.resource, resName)
155 Policy.policy.findResource(resName) match {
156 case Some(y) => query.put(ResourceEventNames.details,
157 new BasicDBObject(y.descriminatorField, instid.get))
163 val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
164 val cursor = resourceEvents.find(query).sort(sort)
167 val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
168 while(cursor.hasNext) {
169 buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
171 buffer.toList.sortWith(_sortByTimestampAsc)
177 def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
178 val query = new BasicDBObject()
179 query.put(ResourceEventNames.userID, userId)
180 query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
181 query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
183 // Sort them by increasing order for occurred time
184 val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
186 MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
189 def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
190 val query = new BasicDBObjectBuilder().
191 add(ResourceEventModel.Names.userID, userID).
192 // received within the period
193 add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)).
194 add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)).
195 // occurred outside the period
197 val dbList = new BasicDBList()
198 dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis)))
199 dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis)))
204 resourceEvents.count(query)
207 def findAllRelevantResourceEventsForBillingPeriod(userId: String,
209 stopMillis: Long): List[ResourceEvent] = {
213 //-ResourceEventStore
216 def insertUserState(userState: UserState) = {
217 MongoDBStore.insertUserState(
218 userState.copy(_id = new ObjectId().toString),
220 MongoDBStore.jsonSupportToDBObject
224 def findUserStateByUserID(userID: String): Option[UserState] = {
225 val query = new BasicDBObject(UserStateJsonNames.userID, userID)
226 val cursor = userStates find query
228 MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
231 def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
232 val query = new BasicDBObjectBuilder().
233 add(UserState.JsonNames.userID, userID).
234 add(UserState.JsonNames.isFullBillingMonthState, true).
235 add(UserState.JsonNames.theFullBillingMonth_year, bmi.year).
236 add(UserState.JsonNames.theFullBillingMonth_month, bmi.month).
239 logger.debug("findLatestUserStateForFullMonthBilling(%s, %s) query: %s".format(userID, bmi, query))
241 // Descending order, so that the latest comes first
242 val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1)
244 val cursor = userStates.find(query).sort(sorter)
246 MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState)
251 def createIMEventFromJson(json: String) = {
252 MongoDBStore.createIMEventFromJson(json)
255 def createIMEventFromOther(event: IMEventModel) = {
256 MongoDBStore.createIMEventFromOther(event)
259 def pingIMEventStore(): Unit = {
260 MongoDBStore.ping(mongo)
263 def insertIMEvent(event: IMEventModel): IMEvent = {
264 val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
265 MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
269 def findIMEventById(id: String): Option[IMEvent] = {
270 MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
275 * Find the `CREATE` even for the given user. Note that there must be only one such event.
277 def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
278 val query = new BasicDBObjectBuilder().
279 add(IMEventNames.userID, userID).
280 add(IMEventNames.eventType, IMEventModel.EventTypeNames.create).get()
282 // Normally one such event is allowed ...
283 val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
285 MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
288 def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
289 val query = new BasicDBObject(IMEventNames.userID, userID)
290 val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1))
292 MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject)
296 * Find the very first activation event for a particular user.
299 def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
300 val query = new BasicDBObjectBuilder().
301 add(IMEventNames.userID, userID).
302 add(IMEventNames.isActive, true).get()
304 val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
306 withCloseable(cursor) { cursor ⇒
308 Some(MongoDBIMEvent.fromDBObject(cursor.next()))
316 * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
317 * the given function `f`.
319 * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
321 def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
322 val query = new BasicDBObject(IMEventNames.userID, userID)
323 val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
325 withCloseable(cursor) { cursor ⇒
326 while(cursor.hasNext) {
327 val model = MongoDBIMEvent.fromDBObject(cursor.next())
337 def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
338 val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,
339 new BasicDBObject("$gt", after))
340 MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc))
343 def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries)
346 def updatePolicyEntry(policy: PolicyEntry) = {
348 val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id)
349 val policyObject = MongoDBStore.jsonSupportToDBObject(policy)
350 policyEntries.update(query, policyObject, true, false)
353 def findPolicyEntry(id: String) = {
354 MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
360 object MongoDBStore {
362 final val _id = "_id"
366 * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
368 * Resource events are coming from all systems handling billable resources.
370 final val RESOURCE_EVENTS_COLLECTION = "resevents"
373 * Collection holding the snapshots of [[gr.grnet.aquarium.computation.state.UserState]].
375 * [[gr.grnet.aquarium.computation.state.UserState]] is held internally within
376 * [[gr.grnet.aquarium.actor.service.user .UserActor]]s.
378 final val USER_STATES_COLLECTION = "userstates"
381 * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
383 * User events are coming from the IM module (external).
385 final val IM_EVENTS_COLLECTION = "imevents"
388 * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s that could not be parsed to normal objects.
390 * We of course assume at least a valid JSON representation.
392 * User events are coming from the IM module (external).
394 final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
397 * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]].
399 // final val POLICIES_COLLECTION = "policies"
402 * Collection holding [[gr.grnet.aquarium.event.model.PolicyEntry]].
404 final val POLICY_ENTRIES_COLLECTION = "policyEntries"
406 def dbObjectToUserState(dbObj: DBObject): UserState = {
407 UserState.fromJson(JSON.serialize(dbObj))
410 def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
411 withCloseable(cursor) { cursor ⇒
413 Some(f(cursor.next()))
420 def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
421 PolicyEntry.fromJson(JSON.serialize(dbObj))
424 def ping(mongo: Mongo): Unit = synchronized {
425 // This requires a network roundtrip
429 def findBy[A >: Null <: AnyRef](name: String,
431 collection: DBCollection,
432 deserializer: (DBObject) => A) : Option[A] = {
433 val query = new BasicDBObject(name, value)
434 val cursor = collection find query
436 withCloseable(cursor) { cursor ⇒
438 Some(deserializer apply cursor.next)
444 def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
445 (deserializer: (DBObject) => A)
446 (sortWith: Option[(A, A) => Boolean]): List[A] = {
447 val cursor0 = collection find query
448 val cursor = if(orderBy ne null) {
452 } // I really know that docs say that it is the same cursor.
454 if(!cursor.hasNext) {
458 val buff = new ListBuffer[A]()
460 while(cursor.hasNext) {
461 buff += deserializer apply cursor.next
467 case Some(sorter) => buff.toList.sortWith(sorter)
468 case None => buff.toList
473 def storeUserState(userState: UserState, collection: DBCollection) = {
474 storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
477 def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
478 Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
481 def storeAny[A](any: A,
482 collection: DBCollection,
484 idValueProvider: (A) => String,
485 serializer: (A) => DBObject) : RecordID = {
487 val dbObject = serializer apply any
488 val _id = new ObjectId()
489 dbObject.put("_id", _id)
490 val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
491 writeResult.getLastError().throwOnError()
493 RecordID(dbObject.get("_id").toString)
496 // FIXME: consolidate
497 def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
498 val dbObject = serializer apply obj
499 val objectId = obj._id match {
501 val _id = new ObjectId()
502 dbObject.put("_id", _id)
509 dbObject.put(JsonNames._id, objectId)
511 collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
516 def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = {
517 val dbObject = serializer apply obj
518 val objectId = obj._id match {
520 val _id = new ObjectId()
521 dbObject.put("_id", _id)
528 dbObject.put(JsonNames._id, objectId)
530 collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
533 def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
534 Conversions.jsonSupportToDBObject(jsonSupport)
537 final def isLocalIMEvent(event: IMEventModel) = event match {
538 case _: MongoDBIMEvent ⇒ true
542 final def createIMEventFromJson(json: String) = {
543 MongoDBIMEvent.fromJsonString(json)
546 final def createIMEventFromOther(event: IMEventModel) = {
547 MongoDBIMEvent.fromOther(event, null)
550 final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {
551 MongoDBIMEvent.fromJsonBytes(jsonBytes)