Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala @ 1420b0dc

History | View | Annotate | Download (12.9 kB)

1
/*
2
 * Copyright 2011 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.store.mongodb
37

    
38
import gr.grnet.aquarium.util.Loggable
39
import com.mongodb.util.JSON
40
import gr.grnet.aquarium.user.UserState
41
import gr.grnet.aquarium.util.displayableObjectInfo
42
import gr.grnet.aquarium.util.json.JsonSupport
43
import collection.mutable.ListBuffer
44
import gr.grnet.aquarium.store._
45
import gr.grnet.aquarium.logic.events.ResourceEvent.{JsonNames => ResourceJsonNames}
46
import gr.grnet.aquarium.logic.events.WalletEntry.{JsonNames => WalletJsonNames}
47
import java.util.Date
48
import com.ckkloverdos.maybe.Maybe
49
import com.mongodb._
50
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent}
51

    
52
/**
53
 * Mongodb implementation of the various aquarium stores.
54
 *
55
 * @author Christos KK Loverdos <loverdos@gmail.com>
56
 * @author Georgios Gousios <gousiosg@gmail.com>
57
 */
58
class MongoDBStore(
59
    val mongo: Mongo,
60
    val database: String,
61
    val username: String,
62
    val password: String)
63
  extends ResourceEventStore with UserStateStore
64
  with WalletEntryStore with UserEventStore
65
  with Loggable {
66

    
67
  private[store] lazy val rcEvents      = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
68
  private[store] lazy val userStates    = getCollection(MongoDBStore.USER_STATES_COLLECTION)
69
  private[store] lazy val userEvents    = getCollection(MongoDBStore.USER_EVENTS_COLLECTION)
70
  private[store] lazy val walletEntries = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION)
71

    
72
  private[this] def getCollection(name: String): DBCollection = {
73
    val db = mongo.getDB(database)
74
    if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
75
      throw new StoreException("Could not authenticate user %s".format(username))
76
    }
77
    db.getCollection(name)
78
  }
79

    
80
  private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
81
    if (one.occurredMillis > two.occurredMillis) false
82
    else if (one.occurredMillis < two.occurredMillis) true
83
    else true
84
  }
85

    
86
  private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = {
87
    if (one.occurredMillis < two.occurredMillis) false
88
    else if (one.occurredMillis > two.occurredMillis) true
89
    else true
90
  }
91

    
92
  //+ResourceEventStore
93
  def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] =
94
    MongoDBStore.storeAquariumEvent(event, rcEvents)
95

    
96
  def findResourceEventById(id: String): Maybe[ResourceEvent] =
97
    MongoDBStore.findById(id, rcEvents, MongoDBStore.dbObjectToResourceEvent)
98

    
99
  def findResourceEventsByUserId(userId: String)
100
                                (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
101
    val query = new BasicDBObject(ResourceJsonNames.userId, userId)
102

    
103
    MongoDBStore.runQuery(query, rcEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
104
  }
105

    
106
  def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
107
    val query = new BasicDBObject()
108
    query.put(ResourceJsonNames.userId, userId)
109
    query.put(ResourceJsonNames.timestamp, new BasicDBObject("$gte", timestamp))
110
    
111
    val sort = new BasicDBObject(ResourceJsonNames.timestamp, 1)
112

    
113
    val cursor = rcEvents.find(query).sort(sort)
114

    
115
    try {
116
      val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
117
      while(cursor.hasNext) {
118
        buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
119
      }
120
      buffer.toList
121
    } finally {
122
      cursor.close()
123
    }
124
  }
125
  //-ResourceEventStore
126

    
127
  //+UserStateStore
128
  def storeUserState(userState: UserState): Maybe[RecordID] =
129
    MongoDBStore.storeUserState(userState, userStates)
130

    
131
  def findUserStateByUserId(userId: String): Maybe[UserState] = {
132
    Maybe {
133
      val query = new BasicDBObject(ResourceJsonNames.userId, userId)
134
      val cursor = userStates find query
135

    
136
      try {
137
        if(cursor.hasNext)
138
          MongoDBStore.dbObjectToUserState(cursor.next())
139
        else
140
          null
141
      } finally {
142
        cursor.close()
143
      }
144
    }
145
  }
146

    
147
  def deleteUserState(userId: String) = {
148
    val query = new BasicDBObject(ResourceJsonNames.userId, userId)
149
    userStates.findAndRemove(query)
150
  }
151
  //-UserStateStore
152

    
153
  //+WalletEntryStore
154
  def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] =
155
    MongoDBStore.storeAquariumEvent(entry, walletEntries)
156

    
157
  def findWalletEntryById(id: String): Maybe[WalletEntry] =
158
    MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry)
159

    
160
  def findUserWalletEntries(userId: String) = {
161
    // TODO: optimize
162
    findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue))
163
  }
164

    
165
  def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = {
166
    val q = new BasicDBObject()
167
    // TODO: Is this the correct way for an AND query?
168
    q.put(ResourceJsonNames.timestamp, new BasicDBObject("$gt", from.getTime))
169
    q.put(ResourceJsonNames.timestamp, new BasicDBObject("$lt", to.getTime))
170
    q.put(ResourceJsonNames.userId, userId)
171

    
172
    MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
173
  }
174

    
175
  def findLatestUserWalletEntries(userId: String) = {
176
    Maybe {
177
      val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, -1) // -1 is descending order
178
      val cursor = walletEntries.find().sort(orderBy)
179

    
180
      try {
181
        val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
182
        if(cursor.hasNext) {
183
          val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
184
          buffer += walletEntry
185

    
186
          var _previousOccurredMillis = walletEntry.occurredMillis
187
          var _ok = true
188

    
189
          while(cursor.hasNext && _ok) {
190
            val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
191
            var currentOccurredMillis = walletEntry.occurredMillis
192
            _ok = currentOccurredMillis == _previousOccurredMillis
193
            
194
            if(_ok) {
195
              buffer += walletEntry
196
            }
197
          }
198

    
199
          buffer.toList
200
        } else {
201
          null
202
        }
203
      } finally {
204
        cursor.close()
205
      }
206
    }
207
  }
208

    
209
  def findPreviousEntry(userId: String, resource: String,
210
                        instanceId: String,
211
                        finalized: Option[Boolean]): List[WalletEntry] = {
212
    val q = new BasicDBObject()
213
    q.put(WalletJsonNames.userId, userId)
214
    q.put(WalletJsonNames.resource, resource)
215
    q.put(WalletJsonNames.instanceId, instanceId)
216
    finalized match {
217
      case Some(x) => q.put(WalletJsonNames.finalized, x)
218
      case None =>
219
    }
220

    
221
    MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
222
  }
223
  //-WalletEntryStore
224

    
225
  //+UserEventStore
226
  def storeUserEvent(event: UserEvent): Maybe[RecordID] =
227
    MongoDBStore.storeAny[UserEvent](event, userEvents, ResourceJsonNames.userId,
228
      _.userId, MongoDBStore.jsonSupportToDBObject)
229

    
230

    
231
  def findUserEventById(id: String): Maybe[UserEvent] =
232
    MongoDBStore.findById[UserEvent](id, userEvents, MongoDBStore.dbObjectToUserEvent)
233

    
234
  def findUserEventsByUserId(userId: String): List[UserEvent] = {
235
    val query = new BasicDBObject(ResourceJsonNames.userId, userId)
236
    MongoDBStore.runQuery(query, userEvents)(MongoDBStore.dbObjectToUserEvent)(Some(_sortByTimestampAsc))
237
  }
238
  //-UserEventStore
239
}
240

    
241
object MongoDBStore {
242
  /**
243
   * Collection holding the [[gr.grnet.aquarium.logic.events.ResourceEvent]]s.
244
   *
245
   * Resource events are coming from all systems handling billable resources.
246
   */
247
  final val RESOURCE_EVENTS_COLLECTION = "resevents"
248

    
249
  /**
250
   * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]].
251
   *
252
   * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.user.actor.UserActor]]s.
253
   */
254
  final val USER_STATES_COLLECTION = "userstates"
255

    
256
  /**
257
   * Collection holding [[gr.grnet.aquarium.logic.events.UserEvent]]s.
258
   *
259
   * User events are coming from the IM module (external).
260
   */
261
  final val USER_EVENTS_COLLECTION = "userevents"
262

    
263

    
264
  /**
265
   * Collection holding [[gr.grnet.aquarium.logic.events.WalletEntry]].
266
   *
267
   * Wallet entries are generated internally in Aquarium.
268
   */
269
  final val WALLET_ENTRIES_COLLECTION = "wallets"
270

    
271
  /* TODO: Some of the following methods rely on JSON (de-)serialization).
272
  * A method based on proper object serialization would be much faster.
273
  */
274
  def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
275
    ResourceEvent.fromJson(JSON.serialize(dbObject))
276
  }
277

    
278
  def dbObjectToUserState(dbObj: DBObject): UserState = {
279
    UserState.fromJson(JSON.serialize(dbObj))
280
  }
281

    
282
  def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = {
283
    WalletEntry.fromJson(JSON.serialize(dbObj))
284
  }
285

    
286
  def dbObjectToUserEvent(dbObj: DBObject): UserEvent = {
287
    UserEvent.fromJson(JSON.serialize(dbObj))
288
  }
289

    
290
  def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
291
    val query = new BasicDBObject(ResourceJsonNames.id, id)
292
    val cursor = collection find query
293

    
294
    try {
295
      if(cursor.hasNext)
296
        deserializer apply cursor.next
297
      else
298
        null: A // will be transformed to NoVal by the Maybe polymorphic constructor
299
    } finally {
300
      cursor.close()
301
    }
302
  }
303

    
304
  def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
305
                                  (deserializer: (DBObject) => A)
306
                                  (sortWith: Option[(A, A) => Boolean]): List[A] = {
307
    val cursor0 = collection find query
308
    val cursor = if(orderBy ne null) {
309
      cursor0 sort orderBy
310
    } else {
311
      cursor0
312
    } // I really know that docs say that it is the same cursor.
313

    
314
    if(!cursor.hasNext) {
315
      cursor.close()
316
      Nil
317
    } else {
318
      val buff = new ListBuffer[A]()
319

    
320
      while(cursor.hasNext) {
321
        buff += deserializer apply cursor.next
322
      }
323

    
324
      cursor.close()
325

    
326
      sortWith match {
327
        case Some(sorter) => buff.toList.sortWith(sorter)
328
        case None => buff.toList
329
      }
330
    }
331
  }
332

    
333
  def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = {
334
    storeAny[A](event, collection, ResourceJsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject)
335
  }
336

    
337
  def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
338
    storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject)
339
  }
340

    
341
  def storeAny[A](any: A,
342
                  collection: DBCollection,
343
                  idName: String,
344
                  idValueProvider: (A) => String,
345
                  serializer: (A) => DBObject) : Maybe[RecordID] = Maybe {
346
    // Store
347
    val dbObj = serializer apply any
348
    val writeResult = collection insert dbObj
349
    writeResult.getLastError().throwOnError()
350

    
351
    // Get back to retrieve unique id
352
    val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any)))
353

    
354
    try {
355
      // TODO: better way to get _id?
356
      if(cursor.hasNext)
357
        RecordID(cursor.next().get(ResourceJsonNames._id).toString)
358
      else
359
        throw new StoreException("Could not store %s to %s".format(any, collection))
360
    } finally {
361
      cursor.close()
362
    }
363
 }
364

    
365
  def jsonSupportToDBObject(any: JsonSupport): DBObject = {
366
    JSON.parse(any.toJson) match {
367
      case dbObject: DBObject ⇒
368
        dbObject
369
      case _ ⇒
370
        throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName))
371
    }
372
  }
373
}