+ _connection =_factory.newConnection(_servers)
+ _channel = _connection.createChannel
+ _channel.confirmSelect
+ _channel.addConfirmListener(new ConfirmListener {
+
+ private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
+ val set = if (multiple)
+ _unconfirmedSet.range(0,seqNo+1)
+ else
+ _unconfirmedSet.range(seqNo,seqNo)
+ _unconfirmedSet = _unconfirmedSet -- set
+ val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
+ _unconfirmedMessages.get(seq) match{
+ case None => map
+ case Some(s) => map + ((seq,s))
+ }})
+ _unconfirmedMessages = _unconfirmedMessages -- set
+ ret
+ }
+
+
+ def handleAck(seqNo:Long,multiple:Boolean) = {
+ withChannel {
+ Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
+ subset(seqNo,multiple)
+ }
+ }
+
+ def handleNack(seqNo:Long,multiple:Boolean) = {
+ withChannel {
+ Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
+ for { (_,msg) <- subset(seqNo,multiple) }
+ sendMessage(msg)
+ }
+ }
+ })