Remove dead code before migrating to single project setup
[aquarium] / logic / src / test / scala / gr / grnet / aquarium / messaging / AkkaAMQPTest.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.messaging
37
38 import gr.grnet.aquarium.util.RandomEventGenerator
39 import org.junit.Test
40 import akka.actor.Actor
41 import java.util.concurrent.{TimeUnit, CountDownLatch}
42 import akka.amqp._
43 import akka.config.Supervision.Permanent
44 import java.util.concurrent.atomic.AtomicInteger
45 import org.junit.Assume._
46 import gr.grnet.aquarium.LogicTestsAssumptions
47
48 /**
49  *
50  *
51  * @author Georgios Gousios <gousiosg@gmail.com>
52  */
53
54 class AkkaAMQPTest extends RandomEventGenerator {
55
56   @Test
57   def testSendReceive() : Unit = {
58
59     assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
60
61     val numMsg = 100
62     val msgs = new AtomicInteger(0)
63
64     val publisher = producer("aquarium")
65
66     class Consumer extends Actor {
67
68       self.lifeCycle = Permanent
69
70       def receive = {
71         case Delivery(payload, routingKey, deliveryTag, isRedeliver, properties, sender) =>
72           println(this + " Got message: %s (%d)".format(new String(payload), deliveryTag) +
73             (if(isRedeliver) " - redelivered (%d)".format(deliveryTag) else ""))
74           if (msgs.incrementAndGet() == 15) throw new Exception("Messed up")
75           if (msgs.incrementAndGet() == 55) sender ! Reject(deliveryTag, true)
76           sender ! Acknowledge(deliveryTag)
77         case Acknowledged(deliveryTag) => println("Acked: " + deliveryTag)
78         case _ => println("Unknown delivery")
79       }
80
81       override def preRestart(reason: Throwable) {
82         println("Actor restarted: " + reason)
83       }
84
85       override def postRestart(reason: Throwable) {
86         // reinit stable state after restart
87       }
88     }
89
90     consumer("foo.#", "akka-amqp-test", "aquarium", Actor.actorOf(new Consumer), false)
91     Thread.sleep(2000)
92
93     (1 to numMsg).foreach{
94       i =>  publisher ! new Message(i.toString.getBytes(), "foo.bar")
95     }
96
97     Thread.sleep(5000)
98
99     //AMQP.shutdownAll
100     Actor.registry.shutdownAll
101   }
102 }