2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.mongodb
38 import collection.immutable
40 import gr.grnet.aquarium.computation.BillingMonthInfo
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
43 import gr.grnet.aquarium.message.MessageConstants
44 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
45 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.util._
48 import gr.grnet.aquarium.util.json.JsonSupport
49 import gr.grnet.aquarium.{Aquarium, AquariumException}
50 import org.apache.avro.specific.SpecificRecord
51 import org.bson.types.ObjectId
54 * Mongodb implementation of the various aquarium stores.
56 * @author Christos KK Loverdos <loverdos@gmail.com>
57 * @author Georgios Gousios <gousiosg@gmail.com>
58 * @author Prodromos Gerakios <pgerakio@grnet.gr>
61 val aquarium: Aquarium,
66 extends ResourceEventStore
72 private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection)
73 private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection)
74 private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection)
75 private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection)
77 private[this] def doAuthenticate(db: DB) {
78 if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
79 throw new AquariumException("Could not authenticate user %s".format(username))
83 private[this] def getCollection(name: String): DBCollection = {
84 val db = mongo.getDB(database)
86 db.getCollection(name)
90 def pingResourceEventStore(): Unit = synchronized {
91 getCollection(MongoDBStore.ResourceEventCollection)
92 MongoDBStore.ping(mongo)
95 def insertResourceEvent(event: ResourceEventMsg) = {
96 val mongoID = new ObjectId()
97 event.setInStoreID(mongoID.toStringMongod)
99 val dbObject = new BasicDBObjectBuilder().
100 add(MongoDBStore.JsonNames._id, mongoID).
101 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
102 add(MongoDBStore.JsonNames.userID, event.getUserID).
103 add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
104 add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
107 MongoDBStore.insertDBObject(dbObject, resourceEvents)
111 def findResourceEventByID(id: String): Option[ResourceEventMsg] = {
112 val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id)
114 dbObject ← dbObjectOpt
115 payload = dbObject.get(MongoDBStore.JsonNames.payload)
116 msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg)
120 def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
121 val query = new BasicDBObjectBuilder().
122 add(MongoDBStore.JsonNames.userID, userID).
123 // received within the period
124 add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)).
125 add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)).
126 // occurred outside the period
128 val dbList = new BasicDBList()
129 dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis)))
130 dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis)))
135 resourceEvents.count(query)
138 def foreachResourceEventOccurredInPeriod(
142 )(f: ResourceEventMsg ⇒ Unit): Unit = {
144 val query = new BasicDBObjectBuilder().
145 add(MongoDBStore.JsonNames.userID, userID).
146 add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)).
147 add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)).
150 val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)
151 val cursor = resourceEvents.find(query).sort(sorter)
153 withCloseable(cursor) { cursor ⇒
154 while(cursor.hasNext) {
155 val nextDBObject = cursor.next()
156 val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
157 val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg)
163 //-ResourceEventStore
166 def findUserStateByUserID(userID: String) = {
167 val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID)
169 dbObject <- dbObjectOpt
170 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
171 msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg)
177 def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
178 val query = new BasicDBObjectBuilder().
179 add(MongoDBStore.JsonNames.userID, userID).
180 add(MongoDBStore.JsonNames.isFullBillingMonth, true).
181 add(MongoDBStore.JsonNames.billingYear, bmi.year).
182 add(MongoDBStore.JsonNames.billingMonth, bmi.month).
185 // Descending order, so that the latest comes first
186 val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
188 val cursor = userStates.find(query).sort(sorter)
190 withCloseable(cursor) { cursor ⇒
191 MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
195 def findLatestUserState(userID: String) = {
196 val query = new BasicDBObjectBuilder().
197 add(MongoDBStore.JsonNames.userID, userID).
200 // Descending order, so that the latest comes first
201 val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1)
203 val cursor = userStates.find(query).sort(sorter)
205 withCloseable(cursor) { cursor ⇒
206 MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg)
211 * Stores a user state.
213 def insertUserState(event: UserStateMsg)= {
214 val mongoID = new ObjectId()
215 event.setInStoreID(mongoID.toStringMongod)
217 val dbObject = new BasicDBObjectBuilder().
218 add(MongoDBStore.JsonNames._id, mongoID).
219 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
220 add(MongoDBStore.JsonNames.userID, event.getUserID).
221 add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
222 add(MongoDBStore.JsonNames.isFullBillingMonth, event.getIsFullBillingMonth).
223 add(MongoDBStore.JsonNames.billingYear, event.getBillingYear).
224 add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth).
225 add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay).
228 MongoDBStore.insertDBObject(dbObject, userStates)
234 def pingIMEventStore(): Unit = {
235 getCollection(MongoDBStore.IMEventCollection)
236 MongoDBStore.ping(mongo)
239 def insertIMEvent(event: IMEventMsg) = {
240 val mongoID = new ObjectId()
241 event.setInStoreID(mongoID.toStringMongod)
243 val dbObject = new BasicDBObjectBuilder().
244 add(MongoDBStore.JsonNames._id, mongoID).
245 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)).
246 add(MongoDBStore.JsonNames.userID, event.getUserID).
247 add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase).
248 add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis).
249 add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis).
252 MongoDBStore.insertDBObject(dbObject, imEvents)
256 def findIMEventByID(id: String) = {
257 val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id)
259 dbObject ← dbObjectOpt
260 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
261 msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
269 * Find the `CREATE` even for the given user. Note that there must be only one such event.
271 def findCreateIMEventByUserID(userID: String) = {
272 val query = new BasicDBObjectBuilder().
273 add(MongoDBStore.JsonNames.userID, userID).
274 add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get()
276 // Normally one such event is allowed ...
277 val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
279 val dbObjectOpt = withCloseable(cursor) { cursor ⇒
288 dbObject <- dbObjectOpt
289 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
290 msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
297 * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
298 * the given function `f`.
300 * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
302 def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
303 val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID)
304 val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1))
306 var _shouldContinue = true
307 withCloseable(cursor) { cursor ⇒
308 while(_shouldContinue && cursor.hasNext) {
309 val dbObject = cursor.next()
310 val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
311 val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg)
313 _shouldContinue = f(msg)
322 def foreachPolicy[U](f: PolicyMsg ⇒ U) {
323 val cursor = policies.find()
324 withCloseable(cursor) { cursor ⇒
325 while(cursor.hasNext) {
326 val dbObject = cursor.next()
327 val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
328 val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg)
334 def insertPolicy(policy: PolicyMsg): PolicyMsg = {
335 val mongoID = new ObjectId()
336 policy.setInStoreID(mongoID.toStringMongod)
337 val dbObject = new BasicDBObjectBuilder().
338 add(MongoDBStore.JsonNames._id, mongoID).
339 add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis).
340 add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis).
341 add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)).
344 MongoDBStore.insertDBObject(dbObject, policies)
348 def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = {
350 var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
351 foreachPolicy(_policies += _)
352 _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption
355 def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = {
357 var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
358 foreachPolicy(_policies += _)
360 immutable.SortedMap(_policies.
361 from(MessageFactory.newDummyPolicyMsgAt(fromMillis)).
362 to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq.
363 map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _*
369 object MongoDBStore {
370 final val JsonNames = gr.grnet.aquarium.util.json.JsonNames
372 final val ResourceEventCollection = "resevents"
374 final val UserStateCollection = "userstates"
376 final val UserAgreementHistoryCollection = "useragreementhistory"
378 final val IMEventCollection = "imevents"
380 final val PolicyCollection = "policies"
382 def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
383 withCloseable(cursor) { cursor ⇒
385 Some(f(cursor.next()))
392 def ping(mongo: Mongo): Unit = synchronized {
393 // This requires a network roundtrip
397 def findOneByAttribute(
398 collection: DBCollection,
399 attributeName: String,
400 attributeValue: String,
401 sortByOpt: Option[DBObject] = None
402 ): Option[DBObject] = {
403 val query = new BasicDBObject(attributeName, attributeValue)
404 val cursor = sortByOpt match {
405 case None ⇒ collection find query
406 case Some(sortBy) ⇒ collection find query sort sortBy
408 withCloseable(cursor) { cursor ⇒
409 if(cursor.hasNext) Some(cursor.next()) else None
413 def insertDBObject(dbObj: DBObject, collection: DBCollection) {
414 collection.insert(dbObj, WriteConcern.JOURNAL_SAFE)
417 def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = {
419 dbObject <- if(cursor.hasNext) Some(cursor.next()) else None
420 payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]]
421 msg = AvroHelpers.specificRecordOfBytes(payload, fresh)
427 def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
428 StdConverters.AllConverters.convertEx[DBObject](jsonSupport)