Revision 031d3f3a ganeti/ganeti-eventd.py

b/ganeti/ganeti-eventd.py
2 2
#
3 3
# Copyright (c) 2010 Greek Research and Technology Network
4 4
#
5
"""Ganeti notification daemon with ampq
5
"""Ganeti notification daemon with amqp 
6 6

  
7 7
A daemon to monitor the Ganeti job queue and publish job progress
8 8
and Ganeti VM state notifications over a 0mq PUB endpoint.
......
27 27
import daemon.pidlockfile
28 28
from signal import signal, SIGINT, SIGTERM
29 29

  
30
from carrot.connection import BrokerConnection
31
from carrot.messaging import Publisher
30
from amqplib import client_0_8 as amqp
32 31

  
33 32
from threading import Thread, Event, currentThread
34 33

  
......
38 37
from ganeti import serializer
39 38

  
40 39
class JobFileHandler(pyinotify.ProcessEvent):
41
    def __init__(self, logger, amqpd):
40
    def __init__(self, logger, chan):
42 41
            pyinotify.ProcessEvent.__init__(self)
43 42
            self.logger = logger
44
            self.amqpd = amqpd
43
            self.chan = chan
45 44

  
46 45
    def process_IN_CLOSE_WRITE(self, event):
47 46
        jobfile = os.path.join(event.path, event.name)
......
92 91
                msg["message"] = logmsg
93 92
            
94 93
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
95
            amqpd.send(json.dumps(msg))
96

  
94
            msg = amqp.Message(json.dumps(msg))
95
            msg.properties["delivery_mode"] = 2 #Persistent
96
            self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
97 97

  
98 98
handler_logger = None
99 99
def fatal_signal_handler(signum, frame):
......
159 159
#    signal(SIGTERM, fatal_signal_handler)
160 160

  
161 161
    #Init connection to RabbitMQ
162
    conn = BrokerConnection(hostname=settings.RABBIT_HOST,
163
                            port=settings.RABBIT_PORT,
162
    conn = amqp.Connection( host=settings.RABBIT_HOST,
164 163
                            userid=settings.RABBIT_USERNAME,
165 164
                            password=settings.RABBIT_PASSWORD,
166 165
                            virtual_host=settings.RABBIT_VHOST)
167
    publisher = Publisher(connection=conn, exchange="ganeti",
168
                          routing_key="importer")
169

  
170

  
166
    chan = conn.channel()
167
    
171 168
    # Monitor the Ganeti job queue, create and push notifications
172 169
    wm = pyinotify.WatchManager()
173 170
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
174
    handler = JobFileHandler(logger, publisher)
171
    handler = JobFileHandler(logger, chan)
175 172
    notifier = pyinotify.Notifier(wm, handler)
176 173

  
177 174
    try:
......
195 192
    finally:
196 193
        # destroy the inotify's instance on this interrupt (stop monitoring)
197 194
        notifier.stop()
198
        # mark the 0mq thread as stopped, wake it up so that it notices
195
        #Close the amqp connection
196
        chan.close()
197
        conn.close()
199 198
        raise
200 199

  
201 200
if __name__ == "__main__":

Also available in: Unified diff