root / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala @ 1420b0dc
History | View | Annotate | Download (12.9 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.store.mongodb |
37 |
|
38 |
import gr.grnet.aquarium.util.Loggable |
39 |
import com.mongodb.util.JSON |
40 |
import gr.grnet.aquarium.user.UserState |
41 |
import gr.grnet.aquarium.util.displayableObjectInfo |
42 |
import gr.grnet.aquarium.util.json.JsonSupport |
43 |
import collection.mutable.ListBuffer |
44 |
import gr.grnet.aquarium.store._ |
45 |
import gr.grnet.aquarium.logic.events.ResourceEvent.{JsonNames => ResourceJsonNames} |
46 |
import gr.grnet.aquarium.logic.events.WalletEntry.{JsonNames => WalletJsonNames} |
47 |
import java.util.Date |
48 |
import com.ckkloverdos.maybe.Maybe |
49 |
import com.mongodb._ |
50 |
import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent} |
51 |
|
52 |
/** |
53 |
* Mongodb implementation of the various aquarium stores. |
54 |
* |
55 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
56 |
* @author Georgios Gousios <gousiosg@gmail.com> |
57 |
*/ |
58 |
class MongoDBStore( |
59 |
val mongo: Mongo, |
60 |
val database: String, |
61 |
val username: String, |
62 |
val password: String) |
63 |
extends ResourceEventStore with UserStateStore |
64 |
with WalletEntryStore with UserEventStore |
65 |
with Loggable { |
66 |
|
67 |
private[store] lazy val rcEvents = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION) |
68 |
private[store] lazy val userStates = getCollection(MongoDBStore.USER_STATES_COLLECTION) |
69 |
private[store] lazy val userEvents = getCollection(MongoDBStore.USER_EVENTS_COLLECTION) |
70 |
private[store] lazy val walletEntries = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION) |
71 |
|
72 |
private[this] def getCollection(name: String): DBCollection = { |
73 |
val db = mongo.getDB(database) |
74 |
if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { |
75 |
throw new StoreException("Could not authenticate user %s".format(username)) |
76 |
} |
77 |
db.getCollection(name) |
78 |
} |
79 |
|
80 |
private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = { |
81 |
if (one.occurredMillis > two.occurredMillis) false |
82 |
else if (one.occurredMillis < two.occurredMillis) true |
83 |
else true |
84 |
} |
85 |
|
86 |
private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = { |
87 |
if (one.occurredMillis < two.occurredMillis) false |
88 |
else if (one.occurredMillis > two.occurredMillis) true |
89 |
else true |
90 |
} |
91 |
|
92 |
//+ResourceEventStore |
93 |
def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] = |
94 |
MongoDBStore.storeAquariumEvent(event, rcEvents) |
95 |
|
96 |
def findResourceEventById(id: String): Maybe[ResourceEvent] = |
97 |
MongoDBStore.findById(id, rcEvents, MongoDBStore.dbObjectToResourceEvent) |
98 |
|
99 |
def findResourceEventsByUserId(userId: String) |
100 |
(sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = { |
101 |
val query = new BasicDBObject(ResourceJsonNames.userId, userId) |
102 |
|
103 |
MongoDBStore.runQuery(query, rcEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith) |
104 |
} |
105 |
|
106 |
def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = { |
107 |
val query = new BasicDBObject() |
108 |
query.put(ResourceJsonNames.userId, userId) |
109 |
query.put(ResourceJsonNames.timestamp, new BasicDBObject("$gte", timestamp)) |
110 |
|
111 |
val sort = new BasicDBObject(ResourceJsonNames.timestamp, 1) |
112 |
|
113 |
val cursor = rcEvents.find(query).sort(sort) |
114 |
|
115 |
try { |
116 |
val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent] |
117 |
while(cursor.hasNext) { |
118 |
buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next()) |
119 |
} |
120 |
buffer.toList |
121 |
} finally { |
122 |
cursor.close() |
123 |
} |
124 |
} |
125 |
//-ResourceEventStore |
126 |
|
127 |
//+UserStateStore |
128 |
def storeUserState(userState: UserState): Maybe[RecordID] = |
129 |
MongoDBStore.storeUserState(userState, userStates) |
130 |
|
131 |
def findUserStateByUserId(userId: String): Maybe[UserState] = { |
132 |
Maybe { |
133 |
val query = new BasicDBObject(ResourceJsonNames.userId, userId) |
134 |
val cursor = userStates find query |
135 |
|
136 |
try { |
137 |
if(cursor.hasNext) |
138 |
MongoDBStore.dbObjectToUserState(cursor.next()) |
139 |
else |
140 |
null |
141 |
} finally { |
142 |
cursor.close() |
143 |
} |
144 |
} |
145 |
} |
146 |
|
147 |
def deleteUserState(userId: String) = { |
148 |
val query = new BasicDBObject(ResourceJsonNames.userId, userId) |
149 |
userStates.findAndRemove(query) |
150 |
} |
151 |
//-UserStateStore |
152 |
|
153 |
//+WalletEntryStore |
154 |
def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = |
155 |
MongoDBStore.storeAquariumEvent(entry, walletEntries) |
156 |
|
157 |
def findWalletEntryById(id: String): Maybe[WalletEntry] = |
158 |
MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry) |
159 |
|
160 |
def findUserWalletEntries(userId: String) = { |
161 |
// TODO: optimize |
162 |
findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue)) |
163 |
} |
164 |
|
165 |
def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = { |
166 |
val q = new BasicDBObject() |
167 |
// TODO: Is this the correct way for an AND query? |
168 |
q.put(ResourceJsonNames.timestamp, new BasicDBObject("$gt", from.getTime)) |
169 |
q.put(ResourceJsonNames.timestamp, new BasicDBObject("$lt", to.getTime)) |
170 |
q.put(ResourceJsonNames.userId, userId) |
171 |
|
172 |
MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) |
173 |
} |
174 |
|
175 |
def findLatestUserWalletEntries(userId: String) = { |
176 |
Maybe { |
177 |
val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, -1) // -1 is descending order |
178 |
val cursor = walletEntries.find().sort(orderBy) |
179 |
|
180 |
try { |
181 |
val buffer = new scala.collection.mutable.ListBuffer[WalletEntry] |
182 |
if(cursor.hasNext) { |
183 |
val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) |
184 |
buffer += walletEntry |
185 |
|
186 |
var _previousOccurredMillis = walletEntry.occurredMillis |
187 |
var _ok = true |
188 |
|
189 |
while(cursor.hasNext && _ok) { |
190 |
val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) |
191 |
var currentOccurredMillis = walletEntry.occurredMillis |
192 |
_ok = currentOccurredMillis == _previousOccurredMillis |
193 |
|
194 |
if(_ok) { |
195 |
buffer += walletEntry |
196 |
} |
197 |
} |
198 |
|
199 |
buffer.toList |
200 |
} else { |
201 |
null |
202 |
} |
203 |
} finally { |
204 |
cursor.close() |
205 |
} |
206 |
} |
207 |
} |
208 |
|
209 |
def findPreviousEntry(userId: String, resource: String, |
210 |
instanceId: String, |
211 |
finalized: Option[Boolean]): List[WalletEntry] = { |
212 |
val q = new BasicDBObject() |
213 |
q.put(WalletJsonNames.userId, userId) |
214 |
q.put(WalletJsonNames.resource, resource) |
215 |
q.put(WalletJsonNames.instanceId, instanceId) |
216 |
finalized match { |
217 |
case Some(x) => q.put(WalletJsonNames.finalized, x) |
218 |
case None => |
219 |
} |
220 |
|
221 |
MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) |
222 |
} |
223 |
//-WalletEntryStore |
224 |
|
225 |
//+UserEventStore |
226 |
def storeUserEvent(event: UserEvent): Maybe[RecordID] = |
227 |
MongoDBStore.storeAny[UserEvent](event, userEvents, ResourceJsonNames.userId, |
228 |
_.userId, MongoDBStore.jsonSupportToDBObject) |
229 |
|
230 |
|
231 |
def findUserEventById(id: String): Maybe[UserEvent] = |
232 |
MongoDBStore.findById[UserEvent](id, userEvents, MongoDBStore.dbObjectToUserEvent) |
233 |
|
234 |
def findUserEventsByUserId(userId: String): List[UserEvent] = { |
235 |
val query = new BasicDBObject(ResourceJsonNames.userId, userId) |
236 |
MongoDBStore.runQuery(query, userEvents)(MongoDBStore.dbObjectToUserEvent)(Some(_sortByTimestampAsc)) |
237 |
} |
238 |
//-UserEventStore |
239 |
} |
240 |
|
241 |
object MongoDBStore { |
242 |
/** |
243 |
* Collection holding the [[gr.grnet.aquarium.logic.events.ResourceEvent]]s. |
244 |
* |
245 |
* Resource events are coming from all systems handling billable resources. |
246 |
*/ |
247 |
final val RESOURCE_EVENTS_COLLECTION = "resevents" |
248 |
|
249 |
/** |
250 |
* Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]]. |
251 |
* |
252 |
* [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.user.actor.UserActor]]s. |
253 |
*/ |
254 |
final val USER_STATES_COLLECTION = "userstates" |
255 |
|
256 |
/** |
257 |
* Collection holding [[gr.grnet.aquarium.logic.events.UserEvent]]s. |
258 |
* |
259 |
* User events are coming from the IM module (external). |
260 |
*/ |
261 |
final val USER_EVENTS_COLLECTION = "userevents" |
262 |
|
263 |
|
264 |
/** |
265 |
* Collection holding [[gr.grnet.aquarium.logic.events.WalletEntry]]. |
266 |
* |
267 |
* Wallet entries are generated internally in Aquarium. |
268 |
*/ |
269 |
final val WALLET_ENTRIES_COLLECTION = "wallets" |
270 |
|
271 |
/* TODO: Some of the following methods rely on JSON (de-)serialization). |
272 |
* A method based on proper object serialization would be much faster. |
273 |
*/ |
274 |
def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = { |
275 |
ResourceEvent.fromJson(JSON.serialize(dbObject)) |
276 |
} |
277 |
|
278 |
def dbObjectToUserState(dbObj: DBObject): UserState = { |
279 |
UserState.fromJson(JSON.serialize(dbObj)) |
280 |
} |
281 |
|
282 |
def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = { |
283 |
WalletEntry.fromJson(JSON.serialize(dbObj)) |
284 |
} |
285 |
|
286 |
def dbObjectToUserEvent(dbObj: DBObject): UserEvent = { |
287 |
UserEvent.fromJson(JSON.serialize(dbObj)) |
288 |
} |
289 |
|
290 |
def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe { |
291 |
val query = new BasicDBObject(ResourceJsonNames.id, id) |
292 |
val cursor = collection find query |
293 |
|
294 |
try { |
295 |
if(cursor.hasNext) |
296 |
deserializer apply cursor.next |
297 |
else |
298 |
null: A // will be transformed to NoVal by the Maybe polymorphic constructor |
299 |
} finally { |
300 |
cursor.close() |
301 |
} |
302 |
} |
303 |
|
304 |
def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null) |
305 |
(deserializer: (DBObject) => A) |
306 |
(sortWith: Option[(A, A) => Boolean]): List[A] = { |
307 |
val cursor0 = collection find query |
308 |
val cursor = if(orderBy ne null) { |
309 |
cursor0 sort orderBy |
310 |
} else { |
311 |
cursor0 |
312 |
} // I really know that docs say that it is the same cursor. |
313 |
|
314 |
if(!cursor.hasNext) { |
315 |
cursor.close() |
316 |
Nil |
317 |
} else { |
318 |
val buff = new ListBuffer[A]() |
319 |
|
320 |
while(cursor.hasNext) { |
321 |
buff += deserializer apply cursor.next |
322 |
} |
323 |
|
324 |
cursor.close() |
325 |
|
326 |
sortWith match { |
327 |
case Some(sorter) => buff.toList.sortWith(sorter) |
328 |
case None => buff.toList |
329 |
} |
330 |
} |
331 |
} |
332 |
|
333 |
def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = { |
334 |
storeAny[A](event, collection, ResourceJsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject) |
335 |
} |
336 |
|
337 |
def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = { |
338 |
storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject) |
339 |
} |
340 |
|
341 |
def storeAny[A](any: A, |
342 |
collection: DBCollection, |
343 |
idName: String, |
344 |
idValueProvider: (A) => String, |
345 |
serializer: (A) => DBObject) : Maybe[RecordID] = Maybe { |
346 |
// Store |
347 |
val dbObj = serializer apply any |
348 |
val writeResult = collection insert dbObj |
349 |
writeResult.getLastError().throwOnError() |
350 |
|
351 |
// Get back to retrieve unique id |
352 |
val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any))) |
353 |
|
354 |
try { |
355 |
// TODO: better way to get _id? |
356 |
if(cursor.hasNext) |
357 |
RecordID(cursor.next().get(ResourceJsonNames._id).toString) |
358 |
else |
359 |
throw new StoreException("Could not store %s to %s".format(any, collection)) |
360 |
} finally { |
361 |
cursor.close() |
362 |
} |
363 |
} |
364 |
|
365 |
def jsonSupportToDBObject(any: JsonSupport): DBObject = { |
366 |
JSON.parse(any.toJson) match { |
367 |
case dbObject: DBObject ⇒ |
368 |
dbObject |
369 |
case _ ⇒ |
370 |
throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName)) |
371 |
} |
372 |
} |
373 |
} |