Fixed RabbitMQProducer. Now it is fully reliable.
authorProdromos Gerakios <pgerakios@grnet.gr>
Tue, 21 Aug 2012 13:51:08 +0000 (16:51 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Tue, 21 Aug 2012 13:51:08 +0000 (16:51 +0300)
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala

index f43a71b..04c659e 100644 (file)
@@ -54,6 +54,17 @@ import collection.mutable
  * or implied, of GRNET S.A.
  */
 
+private class RabbitMQProducerActor extends Actor {
+  def receive = {
+    case sendMessage:(() => Unit) =>
+      //Console.err.println("Executing msg ... " + sendMessage.hashCode)
+      sendMessage()
+    case x  : AnyRef     =>
+      //Console.err.println("Dammit  ..." + x.getClass.getSimpleName)
+      ()
+  }
+}
+
 /**
  *
  * @author Prodromos Gerakios <pgerakios@grnet.gr>
@@ -105,14 +116,14 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
           }
        }
        if(msgs!=null){
-         if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
+         //if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
          for {msg <- msgs} {
-           Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
+          // Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
            _actorRef ! msg
          }
        }
       } else {
-        Console.err.println("Akka ActorSystem is null. Waiting ...")
+        //Console.err.println("Akka ActorSystem is null. Waiting ...")
       }
       resendMessages()
     },
@@ -122,17 +133,6 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
     ()
   }
 
-  private class RabbitMQProducerActor extends Actor {
-    def receive = {
-      case sendMessage:(() => Unit) =>
-        Console.err.println("Executing msg ... " + sendMessage.hashCode)
-        sendMessage
-      case x  : AnyRef     =>
-        Console.err.println("Dammit  ..." + x.getClass.getSimpleName)
-        ()
-    }
-  }
-
   def configure(props: Props): Unit = {
     val connectionConf = RabbitMQKeys.makeConnectionConf(props)
     _factory = new ConnectionFactory
@@ -165,12 +165,12 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
 
 
       def handleAck(seqNo:Long,multiple:Boolean) = {
-        Console.err.println("Received ack for  " + seqNo)
+        //Console.err.println("Received ack for  " + seqNo)
         cutSubset(seqNo,multiple)
       }
 
       def handleNack(seqNo:Long,multiple:Boolean) = {
-        Console.err.println("Received Nack for msg for " + seqNo)
+        //Console.err.println("Received Nack for msg for " + seqNo)
         for {msg <- cutSubset(seqNo,multiple)} _actorRef ! msg
       }
     })
@@ -200,13 +200,15 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
               _channel.basicPublish(exchangeName,routingKey,
                 MessageProperties.PERSISTENT_TEXT_PLAIN,
                 payload.getBytes)
-               Console.err.println("####Sent message " + payload + " with seqno=" + seq)
+               //Console.err.println("####Sent message " + payload + " with seqno=" + seq)
             } else {
               _unsentMessages += msg
+               //Console.err.println("####Channel closed!")
              }
         } catch {
             case e: Exception =>
               _unsentMessages += msg
+              //e.printStackTrace
         }
       }
     if(_actorRef != null)