Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQProducer.scala @ 29e85df0

History | View | Annotate | Download (9.4 kB)

1
package gr.grnet.aquarium.connector.rabbitmq
2

    
3
import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4
import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5
import gr.grnet.aquarium._
6
import com.rabbitmq.client._
7
import com.ckkloverdos.props.Props
8
import gr.grnet.aquarium.converter.StdConverters
9
import util.{Lifecycle, Loggable, Lock}
10
import gr.grnet.aquarium.store.memory.MemStoreProvider
11
import java.io.File
12
import com.ckkloverdos.resource.FileStreamResource
13
import scala.Some
14
import collection.immutable.{TreeMap, TreeSet}
15
import java.util.concurrent.atomic.AtomicLong
16
import akka.actor.{Actor, ActorRef}
17
import com.google.common.eventbus.Subscribe
18
import gr.grnet.aquarium.service.event.AquariumCreatedEvent
19
import collection.mutable
20

    
21

    
22
/*
23
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
24
 *
25
 * Redistribution and use in source and binary forms, with or
26
 * without modification, are permitted provided that the following
27
 * conditions are met:
28
 *
29
 *   1. Redistributions of source code must retain the above
30
 *      copyright notice, this list of conditions and the following
31
 *      disclaimer.
32
 *
33
 *   2. Redistributions in binary form must reproduce the above
34
 *      copyright notice, this list of conditions and the following
35
 *      disclaimer in the documentation and/or other materials
36
 *      provided with the distribution.
37
 *
38
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
39
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
40
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
41
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
42
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
43
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
44
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
45
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
46
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
47
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
48
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
49
 * POSSIBILITY OF SUCH DAMAGE.
50
 *
51
 * The views and conclusions contained in the software and
52
 * documentation are those of the authors and should not be
53
 * interpreted as representing official policies, either expressed
54
 * or implied, of GRNET S.A.
55
 */
56

    
57
private class RabbitMQProducerActor extends Actor {
58
  def receive = {
59
    case sendMessage: Function0[_] =>
60
      //Console.err.println("Executing msg ... " + sendMessage.hashCode)
61
      sendMessage.asInstanceOf[()=>Unit]()
62
    case x : AnyRef =>
63
      //Console.err.println("Dammit  ..." + x.getClass.getSuperclass.getName)
64
      throw new Exception("Unexpected value in RabbitMQProducerActor with type: " +
65
                          x.getClass.getSuperclass.getName)
66
  }
67
}
68

    
69
/**
70
 *
71
 * @author Prodromos Gerakios <pgerakios@grnet.gr>
72
 */
73

    
74
class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Loggable with Lifecycle {
75
  private[this] var _conf: RabbitMQConsumerConf = _
76
  private[this] var _factory: ConnectionFactory = _
77
  private[this] var _connection: Connection = _
78
  private[this] var _channel: Channel = _
79
  private[this] var _servers : Array[Address] = _
80
  private[this] final val lock = new Lock()
81

    
82
  def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
83
  //  Some(RabbitMQConfKeys.imevents_credit)
84

    
85

    
86
  @volatile private[this] var _unsentMessages = mutable.Queue[()=>Unit]()
87
  @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
88
  @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,()=>Unit]()
89

    
90

    
91
  @volatile private[this] var _actorRef : ActorRef = _
92
  private[this] var _resendPeriodMillis = 1000L
93

    
94
  def start() = {
95
    try{
96
      _connection =_factory.newConnection(_servers)
97
      _channel = _connection.createChannel
98
      _channel.confirmSelect
99
      _channel.addConfirmListener(new ConfirmListener {
100
        private [this] def cutSubset(seqNo:Long,multiple:Boolean) : TreeMap[Long,()=>Unit] =
101
          lock.withLock {
102
            val set = if (multiple)
103
              _unconfirmedSet.range(0,seqNo+1)
104
            else
105
              _unconfirmedSet.range(seqNo,seqNo)
106
            _unconfirmedSet = _unconfirmedSet -- set
107
            val ret : TreeMap[Long,()=>Unit] = set.foldLeft(TreeMap[Long,()=>Unit]())({(map,seq)=>
108
              _unconfirmedMessages.get(seq) match{
109
                case None => map
110
                case Some(s) => map + ((seq,s))
111
              }})
112
            _unconfirmedMessages = _unconfirmedMessages -- set
113
            ret
114
          }
115
        def handleAck(seqNo:Long,multiple:Boolean) = {
116
          //Console.err.println("Received ack for  " + seqNo)
117
          cutSubset(seqNo,multiple)
118
        }
119
        def handleNack(seqNo:Long,multiple:Boolean) = {
120
          //Console.err.println("Received Nack for msg for " + seqNo)
121
          for {(_,msg) <- cutSubset(seqNo,multiple)} _actorRef ! (msg:()=>Unit)
122
        }
123
      })
124

    
125
      _actorRef =  aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
126
      resendMessages        // start our daemon
127
    } catch {
128
      case e:Exception =>
129
        logger.error("RabbitMQProducer error:",e)
130
    }
131
  }
132

    
133
  def stop() = {
134
    try{
135
      _channel.close
136
      _connection.close
137
    } catch {
138
      case e:Exception =>
139
        logger.error("RabbitMQProducer error:",e)
140
    }
141
  }
142

    
143
  @Subscribe
144
  override def awareOfAquarium(event: AquariumCreatedEvent) = {
145
    super.awareOfAquarium(event)
146
    //assert(aquarium!=null && aquarium.akkaService != null)
147
  }
148

    
149
  private[this] def resendMessages() : Unit = {
150
    aquarium.timerService.scheduleOnce(
151
    "RabbitMQProducer.resendMessages",
152
    {
153
      //Console.err.println("RabbitMQProducer Timer ...")
154
      /*if(_actorRef==null) {
155
        _actorRef =  aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
156
      } */
157
      //if(_actorRef != null){
158
      // Console.err.println("RabbitMQProducer Timer --> messages ...")
159
       var msgs : mutable.Queue[()=>Unit] = null
160
       lock.withLock {
161
          if(isChannelOpen) {
162
            msgs  = _unsentMessages
163
            _unsentMessages = mutable.Queue[()=>Unit]()
164
          }
165
       }
166
       if(msgs!=null){
167
         //if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
168
         for {msg <- msgs} {
169
          // Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
170
           _actorRef ! (msg:()=>Unit)
171
         }
172
       }
173
      /*//} else {
174
        //Console.err.println("Akka ActorSystem is null. Waiting ...")
175
      } */
176
      resendMessages()
177
    },
178
    this._resendPeriodMillis,
179
    true
180
    )
181
    ()
182
  }
183

    
184
  def configure(props: Props): Unit = {
185
    val connectionConf = RabbitMQKeys.makeConnectionConf(props)
186
    _factory = new ConnectionFactory
187
    _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
188
    _factory.setUsername(connectionConf(RabbitMQConKeys.username))
189
    _factory.setPassword(connectionConf(RabbitMQConKeys.password))
190
    _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
191
    _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
192
    _servers = connectionConf(RabbitMQConKeys.servers)
193
  }
194

    
195
  private[this] def isChannelOpen: Boolean =
196
    lock.withLock{
197
    if (_connection == null ||_connection.isOpen == false )
198
      _connection =_factory.newConnection(_servers)
199
    if (_channel == null ||_channel.isOpen == false )
200
      _channel = _connection.createChannel
201
    _connection.isOpen && _channel.isOpen
202
    }
203

    
204
  def fun(s:String)  : () => Unit = {
205
     () => {}
206
  }
207

    
208
  def sendMessage(exchangeName:String,routingKey:String,payload:String) = {
209
    def msg () : Unit =
210
      lock.withLock {
211
        try {
212
            if(isChannelOpen) {
213
              var seq : Long = _channel.getNextPublishSeqNo()
214
              _unconfirmedSet += seq
215
              _unconfirmedMessages += ((seq,msg))
216
              _channel.basicPublish(exchangeName,routingKey,
217
                MessageProperties.PERSISTENT_TEXT_PLAIN,
218
                payload.getBytes)
219
               //Console.err.println("####Sent message " + payload + " with seqno=" + seq)
220
            } else {
221
              _unsentMessages += msg
222
               //Console.err.println("####Channel closed!")
223
             }
224
        } catch {
225
            case e: Exception =>
226
              _unsentMessages += msg
227
              //e.printStackTrace
228
        }
229
      }
230
    if(_actorRef != null)
231
      _actorRef ! (msg:()=>Unit)
232
    else
233
      lock.withLock(_unsentMessages += msg)
234
 }
235
}
236

    
237
object RabbitMQProducer  {
238
  def main(args: Array[String]) = {
239
    val propsfile = new FileStreamResource(new File("aquarium.properties"))
240
    var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
241
    val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
242
    update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
243
    update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
244
    build()
245

    
246
    aquarium.start()
247

    
248
    //RabbitMQProducer.wait(1000)
249
    val propName = RabbitMQConfKeys.imevents_credit
250
    def exn () =
251
      throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
252
    val prop = _props.get(propName).getOr(exn())
253
    if (prop.isEmpty) exn()
254
    val Array(exchangeName, routingKey) = prop.split(":")
255
    aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(exchangeName,routingKey,"Test string !!!!")
256
    Console.err.println("Message sent")
257
    ()
258
  }
259
}