Finish testcase for amqp. Using a localhost installation running
[aquarium] / logic / src / main / scala / gr / grnet / aquarium / messaging / amqp / rabbitmq / v091 / RabbitMQConsumer.scala
index d9d0398..7367543 100644 (file)
@@ -39,14 +39,38 @@ package v091
 
 import confmodel.RabbitMQConsumerModel
 import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
+import gr.grnet.aquarium.util.Loggable
 
 /**
  * 
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer {
+class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer with Loggable  {
+  private[v091] val _rabbitChannel = {
+    val _ch = owner._rabbitConnection.createChannel()
+    logger.info("Created rabbit channel %s for %s".format(_ch, this.toString))
+    val exchange = owner.confModel.exchange
+    val exchangeType = owner.confModel.exchangeType
+    val exchangeIsDurable = owner.confModel.isDurable
+    val queue = confModel.queue
+    val routingKey = confModel.routingKey
+    val queueIsDurable = confModel.queueIsDurable
+    val queueIsAutoDelete = confModel.queueIsAutoDelete
+    val queueIsExclusive = confModel.queueIsExclusive
+
+    val ed = _ch.exchangeDeclare(exchange, exchangeType, exchangeIsDurable)
+    logger.info("Declared exchange %s for %s with result %s".format(exchange, this, ed))
+
+    _ch.queueDeclare(queue, queueIsDurable, queueIsExclusive, queueIsAutoDelete, null)
+
+    _ch.queueBind(queue, exchange, routingKey)
+    _ch
+  }
+
   def name = confModel.name
 
+  def newDeliveryAgent(handler: AMQPDeliveryHandler) = new RabbitMQDeliveryAgent(this, handler)
+
   override def toString = {
     val connName = owner.name
     val confName = owner.owner.name