Statistics
| Branch: | Tag: | Revision:

root / logic / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala @ 492a2d49

History | View | Annotate | Download (4.8 kB)

1
/*
2
 * Copyright 2011 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.store.mongodb
37

    
38
import gr.grnet.aquarium.util.Loggable
39
import gr.grnet.aquarium.store.{RecordID, StoreException, EventStore}
40
import com.mongodb._
41
import util.{JSONParseException, JSON}
42
import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
43
import gr.grnet.aquarium.logic.events.{ResourceEvent, AquariumEvent}
44

    
45
/**
46
 * Mongodb implementation of the message store.
47
 *
48
 * @author Christos KK Loverdos <loverdos@gmail.com>
49
 * @author Georgios Gousios <gousiosg@gmail.com>
50
 */
51
class MongoDBStore(host: String, port: String,
52
                   username: String, passwd: String,
53
                   database: String)
54
  extends EventStore with Loggable {
55

    
56
  private object Connection {
57
    lazy val mongo: Option[Mongo] = {
58
      try {
59
        val addr = new ServerAddress(host, port.toInt)
60
        val opt = new MongoOptions()
61
        Some(new Mongo(addr, opt))
62
      } catch {
63
        case e: MongoException =>
64
          logger.error(("Cannot connect to mongo at %s:%s (uname=%s). " +
65
            "Cause:").format(host,port,username, e))
66
          None
67
        case nfe: NumberFormatException =>
68
          logger.error("%s is not a valid port number".format(port))
69
          None
70
      }
71
    }
72
  }
73

    
74
  private[store] lazy val events: DBCollection = getCollection("events")
75

    
76
  private[store] lazy val users: DBCollection = getCollection("user")
77

    
78
  private[store]def getCollection(name: String): DBCollection = {
79
    Connection.mongo match {
80
      case Some(x) =>
81
        val db = x.getDB(database)
82
        if(!db.authenticate(username, passwd.toCharArray))
83
          throw new StoreException("Could not authenticate user %s".format(username))
84
        db.getCollection(name)
85
      case None => throw new StoreException("No connection to Mongo")
86
    }
87
  }
88

    
89
  /* TODO: Some of the following methods rely on JSON (de-)serialization).
90
   * A method based on proper object serialization would be much faster.
91
   */
92

    
93
  //EventStore methods
94
  def storeEvent[A <: AquariumEvent](event: A): Maybe[RecordID] = {
95
    try {
96
      // Store
97
      val obj = JSON.parse(event.toJson).asInstanceOf[DBObject]
98
      events.insert(obj)
99

    
100
      // TODO: Make this retrieval a configurable option
101
      // Get back to retrieve unique id
102
      val q = new BasicDBObject()
103
      q.put("id", event.id)
104

    
105
      val cur = events.find(q)
106

    
107
      if (!cur.hasNext) {
108
        logger.error("Failed to store event: %s".format(event))
109
        Failed(new StoreException("Failed to store event: %s".format(event)))
110
      }
111

    
112
      Just(RecordID(cur.next.get("id").toString))
113
    } catch {
114
      case j: JSONParseException =>
115
        logger.error("Error parsing JSON for event %s %s".format(event,j)); Failed(j)
116
      case m: MongoException =>
117
        logger.error("Unknown Mongo error: %s".format(m)); Failed(m)
118
    }
119
  }
120

    
121
  def findEventById[A <: AquariumEvent](id: String): Option[A] = {
122
    val q = new BasicDBObject()
123
    q.put("id", id)
124

    
125
    val cur = events.find(q)
126

    
127
    if (cur.hasNext)
128
      deserialize(cur.next)
129
    else
130
      None
131
  }
132

    
133
  def findEventsByUserId[A <: AquariumEvent](userId: Long)
134
                                      (sortWith: Option[(A, A) => Boolean]): List[A] = {
135
    List()
136
  }
137

    
138
  private def deserialize[A <: AquariumEvent](a: DBObject): A = {
139
    //TODO: Distinguish events and deserialize appropriately
140
    ResourceEvent.fromJson(JSON.serialize(a)).asInstanceOf[A]
141
  }
142
}