* or implied, of GRNET S.A.
*/
+private class RabbitMQProducerActor extends Actor {
+ def receive = {
+ case sendMessage:(() => Unit) =>
+ //Console.err.println("Executing msg ... " + sendMessage.hashCode)
+ sendMessage()
+ case x : AnyRef =>
+ //Console.err.println("Dammit ..." + x.getClass.getSimpleName)
+ ()
+ }
+}
+
/**
*
* @author Prodromos Gerakios <pgerakios@grnet.gr>
}
}
if(msgs!=null){
- if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
+ //if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
for {msg <- msgs} {
- Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
+ // Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
_actorRef ! msg
}
}
} else {
- Console.err.println("Akka ActorSystem is null. Waiting ...")
+ //Console.err.println("Akka ActorSystem is null. Waiting ...")
}
resendMessages()
},
()
}
- private class RabbitMQProducerActor extends Actor {
- def receive = {
- case sendMessage:(() => Unit) =>
- Console.err.println("Executing msg ... " + sendMessage.hashCode)
- sendMessage
- case x : AnyRef =>
- Console.err.println("Dammit ..." + x.getClass.getSimpleName)
- ()
- }
- }
-
def configure(props: Props): Unit = {
val connectionConf = RabbitMQKeys.makeConnectionConf(props)
_factory = new ConnectionFactory
def handleAck(seqNo:Long,multiple:Boolean) = {
- Console.err.println("Received ack for " + seqNo)
+ //Console.err.println("Received ack for " + seqNo)
cutSubset(seqNo,multiple)
}
def handleNack(seqNo:Long,multiple:Boolean) = {
- Console.err.println("Received Nack for msg for " + seqNo)
+ //Console.err.println("Received Nack for msg for " + seqNo)
for {msg <- cutSubset(seqNo,multiple)} _actorRef ! msg
}
})
_channel.basicPublish(exchangeName,routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
payload.getBytes)
- Console.err.println("####Sent message " + payload + " with seqno=" + seq)
+ //Console.err.println("####Sent message " + payload + " with seqno=" + seq)
} else {
_unsentMessages += msg
+ //Console.err.println("####Channel closed!")
}
} catch {
case e: Exception =>
_unsentMessages += msg
+ //e.printStackTrace
}
}
if(_actorRef != null)