Statistics
| Branch: | Tag: | Revision:

root / ganeti / snf-ganeti-eventd.py @ a4d2780c

History | View | Annotate | Download (7.4 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 b024b97a Georgios Gousios
#
3 b024b97a Georgios Gousios
# Copyright (c) 2010 Greek Research and Technology Network
4 b024b97a Georgios Gousios
#
5 b9eef123 Vangelis Koukis
"""Ganeti notification daemon with AMQP
6 b024b97a Georgios Gousios

7 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
8 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
9 b024b97a Georgios Gousios

10 b024b97a Georgios Gousios
"""
11 b024b97a Georgios Gousios
12 36cf1973 Vangelis Koukis
#from django.core.management import setup_environ
13 b024b97a Georgios Gousios
14 b024b97a Georgios Gousios
import sys
15 b024b97a Georgios Gousios
import os
16 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17 b024b97a Georgios Gousios
sys.path.append(path)
18 b024b97a Georgios Gousios
import synnefo.settings as settings
19 b024b97a Georgios Gousios
20 36cf1973 Vangelis Koukis
#setup_environ(settings)
21 b024b97a Georgios Gousios
22 b024b97a Georgios Gousios
import time
23 b024b97a Georgios Gousios
import json
24 b024b97a Georgios Gousios
import logging
25 b024b97a Georgios Gousios
import pyinotify
26 b024b97a Georgios Gousios
import daemon
27 b024b97a Georgios Gousios
import daemon.pidlockfile
28 348f53de Georgios Gousios
import socket
29 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
30 b024b97a Georgios Gousios
31 031d3f3a Georgios Gousios
from amqplib import client_0_8 as amqp
32 b024b97a Georgios Gousios
33 b024b97a Georgios Gousios
from ganeti import utils
34 b024b97a Georgios Gousios
from ganeti import jqueue
35 b024b97a Georgios Gousios
from ganeti import constants
36 b024b97a Georgios Gousios
from ganeti import serializer
37 b024b97a Georgios Gousios
38 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
39 348f53de Georgios Gousios
    def __init__(self, logger):
40 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
41 348f53de Georgios Gousios
        self.logger = logger
42 348f53de Georgios Gousios
        self.chan = None 
43 348f53de Georgios Gousios
44 348f53de Georgios Gousios
    def open_channel(self):
45 348f53de Georgios Gousios
        conn = None
46 348f53de Georgios Gousios
        while conn == None:
47 36cf1973 Vangelis Koukis
            handler_logger.info("Attempting to connect to %s",
48 36cf1973 Vangelis Koukis
                settings.RABBIT_HOST)
49 15f14c26 Georgios Gousios
            try:
50 36cf1973 Vangelis Koukis
                conn = amqp.Connection(host=settings.RABBIT_HOST,
51 348f53de Georgios Gousios
                     userid=settings.RABBIT_USERNAME,
52 348f53de Georgios Gousios
                     password=settings.RABBIT_PASSWORD,
53 348f53de Georgios Gousios
                     virtual_host=settings.RABBIT_VHOST)
54 15f14c26 Georgios Gousios
            except socket.error:
55 15f14c26 Georgios Gousios
                time.sleep(1)
56 348f53de Georgios Gousios
        
57 348f53de Georgios Gousios
        handler_logger.info("Connection succesful, opening channel")
58 348f53de Georgios Gousios
        return conn.channel()
59 b024b97a Georgios Gousios
60 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
61 80dcd124 Vangelis Koukis
        self.process_IN_MOVED_TO(event)
62 80dcd124 Vangelis Koukis
63 80dcd124 Vangelis Koukis
    def process_IN_MOVED_TO(self, event):
64 348f53de Georgios Gousios
        if self.chan == None:
65 348f53de Georgios Gousios
            self.chan = self.open_channel()
66 348f53de Georgios Gousios
67 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
68 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
69 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
70 b024b97a Georgios Gousios
            return
71 b024b97a Georgios Gousios
72 b024b97a Georgios Gousios
        try:
73 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
74 b024b97a Georgios Gousios
        except IOError:
75 b024b97a Georgios Gousios
            return
76 b024b97a Georgios Gousios
77 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
78 b024b97a Georgios Gousios
        job = jqueue._QueuedJob.Restore(None, data)
79 b024b97a Georgios Gousios
80 b024b97a Georgios Gousios
        for op in job.ops:
81 b024b97a Georgios Gousios
            instances = ""
82 b024b97a Georgios Gousios
            try:
83 b024b97a Georgios Gousios
                instances = " ".join(op.input.instances)
84 b024b97a Georgios Gousios
            except AttributeError:
85 b024b97a Georgios Gousios
                pass
86 b024b97a Georgios Gousios
87 b024b97a Georgios Gousios
            try:
88 b024b97a Georgios Gousios
                instances = op.input.instance_name
89 b024b97a Georgios Gousios
            except AttributeError:
90 b024b97a Georgios Gousios
                pass
91 b024b97a Georgios Gousios
92 b024b97a Georgios Gousios
            # Get the last line of the op log as message
93 b024b97a Georgios Gousios
            try:
94 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
95 b024b97a Georgios Gousios
            except IndexError:
96 b024b97a Georgios Gousios
                logmsg = None
97 348f53de Georgios Gousios
98 80dcd124 Vangelis Koukis
            self.logger.debug("Job: %d: %s(%s) %s %s",
99 348f53de Georgios Gousios
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
100 b024b97a Georgios Gousios
101 b024b97a Georgios Gousios
            # Construct message
102 b024b97a Georgios Gousios
            msg = {
103 348f53de Georgios Gousios
                    "type": "ganeti-op-status",
104 348f53de Georgios Gousios
                    "instance": instances,
105 348f53de Georgios Gousios
                    "operation": op.input.OP_ID,
106 348f53de Georgios Gousios
                    "jobId": int(job.id),
107 348f53de Georgios Gousios
                    "status": op.status,
108 348f53de Georgios Gousios
                    "logmsg": logmsg
109 348f53de Georgios Gousios
                    }
110 b024b97a Georgios Gousios
            if logmsg:
111 b024b97a Georgios Gousios
                msg["message"] = logmsg
112 432e5455 Georgios Gousios
            
113 432e5455 Georgios Gousios
            instance = instances.split('-')[0]  
114 7ca9e930 Vangelis Koukis
            routekey = "ganeti.%s.event.op" % instance
115 432e5455 Georgios Gousios
            
116 b9eef123 Vangelis Koukis
            self.logger.debug("Delivering msg: %s (key=%s)",
117 b9eef123 Vangelis Koukis
                json.dumps(msg), routekey)
118 031d3f3a Georgios Gousios
            msg = amqp.Message(json.dumps(msg))
119 b9eef123 Vangelis Koukis
            msg.properties["delivery_mode"] = 2  # Persistent
120 80bd5072 Georgios Gousios
121 80bd5072 Georgios Gousios
            while True:
122 80bd5072 Georgios Gousios
                try:
123 432e5455 Georgios Gousios
                    self.chan.basic_publish(msg,
124 432e5455 Georgios Gousios
                            exchange=settings.EXCHANGE_GANETI,
125 b9eef123 Vangelis Koukis
                            routing_key=routekey)
126 80bd5072 Georgios Gousios
                    return
127 80bd5072 Georgios Gousios
                except socket.error:
128 b9eef123 Vangelis Koukis
                    self.logger.exception("Server went away, reconnecting...")
129 80bd5072 Georgios Gousios
                    self.chan = self.open_channel()
130 80bd5072 Georgios Gousios
                except Exception:
131 36cf1973 Vangelis Koukis
                    self.logger.exception("Caught unexpected exception, msg: ",
132 36cf1973 Vangelis Koukis
                                          msg)
133 80bd5072 Georgios Gousios
                    raise
134 b024b97a Georgios Gousios
135 b024b97a Georgios Gousios
handler_logger = None
136 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
137 b024b97a Georgios Gousios
    global handler_logger
138 b024b97a Georgios Gousios
139 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
140 2cd99e7a Georgios Gousios
                        signum)
141 b024b97a Georgios Gousios
    raise SystemExit
142 b024b97a Georgios Gousios
143 b024b97a Georgios Gousios
def parse_arguments(args):
144 b024b97a Georgios Gousios
    from optparse import OptionParser
145 b024b97a Georgios Gousios
146 b024b97a Georgios Gousios
    parser = OptionParser()
147 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
148 2cd99e7a Georgios Gousios
                      help="Enable debugging information")
149 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
150 2cd99e7a Georgios Gousios
                      default=settings.GANETI_EVENTD_LOG_FILE,
151 2cd99e7a Georgios Gousios
                      metavar="FILE",
152 2cd99e7a Georgios Gousios
                      help="Write log to FILE instead of %s" %
153 2cd99e7a Georgios Gousios
                           settings.GANETI_EVENTD_LOG_FILE)
154 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
155 2cd99e7a Georgios Gousios
                      default=settings.GANETI_EVENTD_PID_FILE,
156 2cd99e7a Georgios Gousios
                      metavar='PIDFILE',
157 2cd99e7a Georgios Gousios
                      help="Save PID to file (default: %s)" %
158 2cd99e7a Georgios Gousios
                           settings.GANETI_EVENTD_PID_FILE)
159 b024b97a Georgios Gousios
160 b024b97a Georgios Gousios
    return parser.parse_args(args)
161 b024b97a Georgios Gousios
162 b024b97a Georgios Gousios
def main():
163 b024b97a Georgios Gousios
    global handler_logger
164 b024b97a Georgios Gousios
165 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
166 b024b97a Georgios Gousios
167 b024b97a Georgios Gousios
    # Create pidfile
168 b024b97a Georgios Gousios
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
169 b024b97a Georgios Gousios
170 b024b97a Georgios Gousios
    # Initialize logger
171 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
172 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
173 b024b97a Georgios Gousios
    logger.setLevel(lvl)
174 36cf1973 Vangelis Koukis
    formatter = logging.Formatter(
175 36cf1973 Vangelis Koukis
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
176 36cf1973 Vangelis Koukis
        "%Y-%m-%d %H:%M:%S")
177 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
178 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
179 b024b97a Georgios Gousios
    logger.addHandler(handler)
180 b024b97a Georgios Gousios
    handler_logger = logger
181 b024b97a Georgios Gousios
182 348f53de Georgios Gousios
    # Become a daemon:
183 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
184 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
185 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
186 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
187 348f53de Georgios Gousios
            pidfile=pidf,
188 348f53de Georgios Gousios
            umask=022,
189 348f53de Georgios Gousios
            stdout=handler.stream,
190 348f53de Georgios Gousios
            stderr=handler.stream,
191 348f53de Georgios Gousios
            files_preserve=[handler.stream])
192 348f53de Georgios Gousios
    daemon_context.open()
193 348f53de Georgios Gousios
    logger.info("Became a daemon")
194 348f53de Georgios Gousios
195 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
196 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
197 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
198 348f53de Georgios Gousios
199 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
200 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
201 80dcd124 Vangelis Koukis
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
202 80dcd124 Vangelis Koukis
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
203 348f53de Georgios Gousios
    handler = JobFileHandler(logger)
204 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
205 b024b97a Georgios Gousios
206 b024b97a Georgios Gousios
    try:
207 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
208 b024b97a Georgios Gousios
        res = wm.add_watch(constants.QUEUE_DIR, mask)
209 b024b97a Georgios Gousios
        if res[constants.QUEUE_DIR] < 0:
210 36cf1973 Vangelis Koukis
            raise Exception("pyinotify add_watch returned negative descriptor")
211 348f53de Georgios Gousios
212 b024b97a Georgios Gousios
        logger.info("Now watching %s" % constants.QUEUE_DIR)
213 b024b97a Georgios Gousios
214 b024b97a Georgios Gousios
        while True:    # loop forever
215 348f53de Georgios Gousios
            # process the queue of events as explained above
216 b024b97a Georgios Gousios
            notifier.process_events()
217 b024b97a Georgios Gousios
            if notifier.check_events():
218 b024b97a Georgios Gousios
                # read notified events and enqeue them
219 b024b97a Georgios Gousios
                notifier.read_events()
220 b024b97a Georgios Gousios
    except SystemExit:
221 b024b97a Georgios Gousios
        logger.info("SystemExit")
222 b024b97a Georgios Gousios
    except:
223 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
224 b024b97a Georgios Gousios
    finally:
225 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
226 b024b97a Georgios Gousios
        notifier.stop()
227 b024b97a Georgios Gousios
        raise
228 b024b97a Georgios Gousios
229 b024b97a Georgios Gousios
if __name__ == "__main__":
230 b024b97a Georgios Gousios
    sys.exit(main())
231 b024b97a Georgios Gousios
232 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :