Statistics
| Branch: | Tag: | Revision:

root / logic / src / test / scala / gr / grnet / aquarium / messaging / MessagingTest.scala @ 5b0541c9

History | View | Annotate | Download (5.2 kB)

1
/*
2
 * Copyright 2011 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.messaging
37

    
38
import amqp.AMQPDeliveryHandler
39
import amqp.rabbitmq.v091.confmodel._
40
import amqp.rabbitmq.v091.RabbitMQConfigurations.{PropFiles, RCFolders}
41
import amqp.rabbitmq.v091.{RabbitMQConsumer, RabbitMQConfigurations}
42
import org.junit.Test
43
import org.junit.Assert._
44
import com.ckkloverdos.resource.DefaultResourceContext
45
import gr.grnet.aquarium.util.xstream.XStreamHelpers
46
import gr.grnet.aquarium.util.Loggable
47
import com.ckkloverdos.props.Props
48
import com.ckkloverdos.maybe.{Failed, NoVal, Just}
49

    
50
/**
51
 * 
52
 * @author Christos KK Loverdos <loverdos@gmail.com>.
53
 */
54
class MessagingTest extends Loggable {
55

    
56
  val baseRC = DefaultResourceContext
57
  val rabbitmqRC = baseRC / RCFolders.rabbitmq
58

    
59
  object Names {
60
    val consumer1 = "consumer1"
61
    val producer1 = "producer1"
62
    val queue1 = "queue1"
63
    val routing_key_all = "routing.key.all"
64
    val local_connection = "local_connection"
65
    val aquarium_exchange = "aquarium_exchange"
66
    val direct = "direct"
67
    val localhost_aquarium = "localhost_aquarium"
68
    val aquarium = "aquarium"
69
    val localhost = "localhost"
70
  }
71

    
72
  private def _genTestConf: String = {
73
    val consmod1 = new RabbitMQConsumerModel("consumer1", "queue1", "routing.key.all", true, true, false, false)
74
    val prodmod1 = new RabbitMQProducerModel("producer1", "routing.key.all")
75
    val conn1 = new RabbitMQConnectionModel(
76
      "local_connection",
77
    "aquarium_exchange",
78
    "direct",
79
    true,
80
    List(prodmod1),
81
    List(consmod1)
82
    )
83
    val conf1 = new RabbitMQConfigurationModel(
84
    "localhost_aquarium",
85
    "aquarium",
86
    "aquarium",
87
    "localhost",
88
    5672,
89
    Nil,
90
    "/",
91
    List(conn1)
92
    )
93

    
94
    val model = new RabbitMQConfigurationsModel(List(conf1))
95
    val xs = XStreamHelpers.newXStream
96
    val xml = xs.toXML(model)
97

    
98
    xml
99
  }
100
  @Test
101
  def testConfigurationsExist {
102
    assertTrue(rabbitmqRC.getResource(PropFiles.configurations).isJust)
103
  }
104

    
105
  @Test
106
  def testLocalProducer {
107
    val maybeConfs = RabbitMQConfigurations(baseRC)
108
    assertTrue(maybeConfs.isJust)
109
    val maybeProducer = for {
110
      confs    <- maybeConfs
111
      conf     <- confs.findConfiguration(Names.localhost_aquarium)
112
      conn     <- conf.findConnection(Names.local_connection)
113
      producer <- conn.findProducer(Names.producer1)
114
    } yield {
115
      producer
116
    }
117

    
118
    maybeProducer match {
119
      case Just(producer) =>
120
        logger.debug("Publishing a message from %s".format(producer))
121
        producer.publishString("Test")
122
      case NoVal =>
123
        fail("No producer named %s".format(Names.producer1))
124
      case Failed(e, m) =>
125
        fail("%s: %s".format(m, e.getMessage))
126
    }
127
  }
128

    
129
  @Test
130
  def testLocalConsumer {
131
    val maybeConfs = RabbitMQConfigurations(baseRC)
132
    assertTrue(maybeConfs.isJust)
133

    
134
    val maybeConsumer = for {
135
      confs    <- maybeConfs
136
      conf     <- confs.findConfiguration(Names.localhost_aquarium)
137
      conn     <- conf.findConnection(Names.local_connection)
138
      consumer <- conn.findConsumer(Names.consumer1)
139
    } yield {
140
      consumer
141
    }
142

    
143
    maybeConsumer match {
144
      case Just(consumer) =>
145
        logger.debug("Receiving a message from %s".format(consumer))
146
        consumer.newDeliveryAgent(new AMQPDeliveryHandler {
147
          def handleStringDelivery(envelope: Props, headers: Props, content: String) = {
148
            logger.debug("Received message with")
149
            logger.debug("  envelope: %s".format(envelope))
150
            logger.debug("  headers : %s".format(headers))
151
            logger.debug("  body    : %s".format(content))
152
          }
153
        })
154
      case NoVal =>
155
        fail("No consumer named %s".format(Names.consumer1))
156
      case Failed(e, m) =>
157
        fail("%s: %s".format(m, e.getMessage))
158
    }
159
  }
160

    
161
}