Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ 7ca9e930

History | View | Annotate | Download (7.2 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 b024b97a Georgios Gousios
#
3 b024b97a Georgios Gousios
# Copyright (c) 2010 Greek Research and Technology Network
4 b024b97a Georgios Gousios
#
5 b9eef123 Vangelis Koukis
"""Ganeti notification daemon with AMQP
6 b024b97a Georgios Gousios

7 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
8 80bd5072 Georgios Gousios
and Ganeti VM state notifications to the ganeti exchange
9 b024b97a Georgios Gousios

10 b024b97a Georgios Gousios
"""
11 b024b97a Georgios Gousios
12 b024b97a Georgios Gousios
from django.core.management import setup_environ
13 b024b97a Georgios Gousios
14 b024b97a Georgios Gousios
import sys
15 b024b97a Georgios Gousios
import os
16 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17 b024b97a Georgios Gousios
sys.path.append(path)
18 b024b97a Georgios Gousios
import synnefo.settings as settings
19 b024b97a Georgios Gousios
20 b024b97a Georgios Gousios
setup_environ(settings)
21 b024b97a Georgios Gousios
22 b024b97a Georgios Gousios
import time
23 b024b97a Georgios Gousios
import json
24 b024b97a Georgios Gousios
import logging
25 b024b97a Georgios Gousios
import pyinotify
26 b024b97a Georgios Gousios
import daemon
27 b024b97a Georgios Gousios
import daemon.pidlockfile
28 348f53de Georgios Gousios
import socket
29 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
30 b024b97a Georgios Gousios
31 031d3f3a Georgios Gousios
from amqplib import client_0_8 as amqp
32 b024b97a Georgios Gousios
33 b024b97a Georgios Gousios
from threading import Thread, Event, currentThread
34 b024b97a Georgios Gousios
35 b024b97a Georgios Gousios
from ganeti import utils
36 b024b97a Georgios Gousios
from ganeti import jqueue
37 b024b97a Georgios Gousios
from ganeti import constants
38 b024b97a Georgios Gousios
from ganeti import serializer
39 b024b97a Georgios Gousios
40 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
41 348f53de Georgios Gousios
    def __init__(self, logger):
42 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
43 348f53de Georgios Gousios
        self.logger = logger
44 348f53de Georgios Gousios
        self.chan = None 
45 348f53de Georgios Gousios
46 348f53de Georgios Gousios
    def open_channel(self):
47 348f53de Georgios Gousios
        conn = None
48 348f53de Georgios Gousios
        while conn == None:
49 348f53de Georgios Gousios
            handler_logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
50 15f14c26 Georgios Gousios
            try:
51 15f14c26 Georgios Gousios
                conn = amqp.Connection( host=settings.RABBIT_HOST,
52 348f53de Georgios Gousios
                     userid=settings.RABBIT_USERNAME,
53 348f53de Georgios Gousios
                     password=settings.RABBIT_PASSWORD,
54 348f53de Georgios Gousios
                     virtual_host=settings.RABBIT_VHOST)
55 15f14c26 Georgios Gousios
            except socket.error:
56 15f14c26 Georgios Gousios
                time.sleep(1)
57 15f14c26 Georgios Gousios
                pass
58 348f53de Georgios Gousios
        
59 348f53de Georgios Gousios
        handler_logger.info("Connection succesful, opening channel")
60 348f53de Georgios Gousios
        return conn.channel()
61 b024b97a Georgios Gousios
62 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
63 348f53de Georgios Gousios
        if self.chan == None:
64 348f53de Georgios Gousios
            self.chan = self.open_channel()
65 348f53de Georgios Gousios
66 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
67 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
68 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
69 b024b97a Georgios Gousios
            return
70 b024b97a Georgios Gousios
71 b024b97a Georgios Gousios
        try:
72 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
73 b024b97a Georgios Gousios
        except IOError:
74 b024b97a Georgios Gousios
            return
75 b024b97a Georgios Gousios
76 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
77 b024b97a Georgios Gousios
        job = jqueue._QueuedJob.Restore(None, data)
78 b024b97a Georgios Gousios
79 b024b97a Georgios Gousios
        for op in job.ops:
80 b024b97a Georgios Gousios
            instances = ""
81 b024b97a Georgios Gousios
            try:
82 b024b97a Georgios Gousios
                instances = " ".join(op.input.instances)
83 b024b97a Georgios Gousios
            except AttributeError:
84 b024b97a Georgios Gousios
                pass
85 b024b97a Georgios Gousios
86 b024b97a Georgios Gousios
            try:
87 b024b97a Georgios Gousios
                instances = op.input.instance_name
88 b024b97a Georgios Gousios
            except AttributeError:
89 b024b97a Georgios Gousios
                pass
90 b024b97a Georgios Gousios
91 b024b97a Georgios Gousios
            # Get the last line of the op log as message
92 b024b97a Georgios Gousios
            try:
93 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
94 b024b97a Georgios Gousios
            except IndexError:
95 b024b97a Georgios Gousios
                logmsg = None
96 348f53de Georgios Gousios
97 b024b97a Georgios Gousios
            self.logger.debug("%d: %s(%s) %s %s",
98 348f53de Georgios Gousios
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
99 b024b97a Georgios Gousios
100 b024b97a Georgios Gousios
            # Construct message
101 b024b97a Georgios Gousios
            msg = {
102 348f53de Georgios Gousios
                    "type": "ganeti-op-status",
103 348f53de Georgios Gousios
                    "instance": instances,
104 348f53de Georgios Gousios
                    "operation": op.input.OP_ID,
105 348f53de Georgios Gousios
                    "jobId": int(job.id),
106 348f53de Georgios Gousios
                    "status": op.status,
107 348f53de Georgios Gousios
                    "logmsg": logmsg
108 348f53de Georgios Gousios
                    }
109 b024b97a Georgios Gousios
            if logmsg:
110 b024b97a Georgios Gousios
                msg["message"] = logmsg
111 432e5455 Georgios Gousios
            
112 432e5455 Georgios Gousios
            instance = instances.split('-')[0]  
113 7ca9e930 Vangelis Koukis
            routekey = "ganeti.%s.event.op" % instance
114 432e5455 Georgios Gousios
            
115 b9eef123 Vangelis Koukis
            self.logger.debug("Delivering msg: %s (key=%s)",
116 b9eef123 Vangelis Koukis
                json.dumps(msg), routekey)
117 031d3f3a Georgios Gousios
            msg = amqp.Message(json.dumps(msg))
118 b9eef123 Vangelis Koukis
            msg.properties["delivery_mode"] = 2  # Persistent
119 80bd5072 Georgios Gousios
120 80bd5072 Georgios Gousios
            while True:
121 80bd5072 Georgios Gousios
                try:
122 432e5455 Georgios Gousios
                    self.chan.basic_publish(msg,
123 432e5455 Georgios Gousios
                            exchange=settings.EXCHANGE_GANETI,
124 b9eef123 Vangelis Koukis
                            routing_key=routekey)
125 80bd5072 Georgios Gousios
                    return
126 80bd5072 Georgios Gousios
                except socket.error:
127 b9eef123 Vangelis Koukis
                    self.logger.exception("Server went away, reconnecting...")
128 80bd5072 Georgios Gousios
                    self.chan = self.open_channel()
129 80bd5072 Georgios Gousios
                except Exception:
130 b9eef123 Vangelis Koukis
                    self.logger.exception("Caught unexpected exception (msg: %s)", msg)
131 80bd5072 Georgios Gousios
                    raise
132 b024b97a Georgios Gousios
133 b024b97a Georgios Gousios
handler_logger = None
134 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
135 b024b97a Georgios Gousios
    global handler_logger
136 b024b97a Georgios Gousios
137 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
138 348f53de Georgios Gousios
            signum)
139 b024b97a Georgios Gousios
    raise SystemExit
140 b024b97a Georgios Gousios
141 b024b97a Georgios Gousios
def parse_arguments(args):
142 b024b97a Georgios Gousios
    from optparse import OptionParser
143 b024b97a Georgios Gousios
144 b024b97a Georgios Gousios
    parser = OptionParser()
145 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
146 348f53de Georgios Gousios
            help="Enable debugging information")
147 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
148 348f53de Georgios Gousios
            default=settings.GANETI_EVENTD_LOG_FILE,
149 348f53de Georgios Gousios
            metavar="FILE",
150 348f53de Georgios Gousios
            help="Write log to FILE instead of %s" %
151 348f53de Georgios Gousios
            settings.GANETI_EVENTD_LOG_FILE),
152 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
153 348f53de Georgios Gousios
            default=settings.GANETI_EVENTD_PID_FILE,
154 348f53de Georgios Gousios
            metavar='PIDFILE',
155 348f53de Georgios Gousios
            help="Save PID to file (default: %s)" %
156 348f53de Georgios Gousios
            settings.GANETI_EVENTD_PID_FILE)
157 b024b97a Georgios Gousios
158 b024b97a Georgios Gousios
    return parser.parse_args(args)
159 b024b97a Georgios Gousios
160 b024b97a Georgios Gousios
def main():
161 b024b97a Georgios Gousios
    global handler_logger
162 b024b97a Georgios Gousios
163 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
164 b024b97a Georgios Gousios
165 b024b97a Georgios Gousios
    # Create pidfile
166 b024b97a Georgios Gousios
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
167 b024b97a Georgios Gousios
168 b024b97a Georgios Gousios
    # Initialize logger
169 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
170 03a4b970 Georgios Gousios
    logger = logging.getLogger("ganeti.eventd")
171 b024b97a Georgios Gousios
    logger.setLevel(lvl)
172 b024b97a Georgios Gousios
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
173 348f53de Georgios Gousios
            "%Y-%m-%d %H:%M:%S")
174 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
175 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
176 b024b97a Georgios Gousios
    logger.addHandler(handler)
177 b024b97a Georgios Gousios
    handler_logger = logger
178 b024b97a Georgios Gousios
179 348f53de Georgios Gousios
    # Become a daemon:
180 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
181 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
182 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
183 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
184 348f53de Georgios Gousios
            pidfile=pidf,
185 348f53de Georgios Gousios
            umask=022,
186 348f53de Georgios Gousios
            stdout=handler.stream,
187 348f53de Georgios Gousios
            stderr=handler.stream,
188 348f53de Georgios Gousios
            files_preserve=[handler.stream])
189 348f53de Georgios Gousios
    daemon_context.open()
190 348f53de Georgios Gousios
    logger.info("Became a daemon")
191 348f53de Georgios Gousios
192 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
193 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
194 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
195 348f53de Georgios Gousios
196 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
197 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
198 b024b97a Georgios Gousios
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
199 348f53de Georgios Gousios
    handler = JobFileHandler(logger)
200 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
201 b024b97a Georgios Gousios
202 b024b97a Georgios Gousios
    try:
203 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
204 b024b97a Georgios Gousios
        res = wm.add_watch(constants.QUEUE_DIR, mask)
205 b024b97a Georgios Gousios
        if res[constants.QUEUE_DIR] < 0:
206 b024b97a Georgios Gousios
            raise Exception("pyinotify add_watch returned negative watch descriptor")
207 348f53de Georgios Gousios
208 b024b97a Georgios Gousios
        logger.info("Now watching %s" % constants.QUEUE_DIR)
209 b024b97a Georgios Gousios
210 b024b97a Georgios Gousios
        while True:    # loop forever
211 348f53de Georgios Gousios
            # process the queue of events as explained above
212 b024b97a Georgios Gousios
            notifier.process_events()
213 b024b97a Georgios Gousios
            if notifier.check_events():
214 b024b97a Georgios Gousios
                # read notified events and enqeue them
215 b024b97a Georgios Gousios
                notifier.read_events()
216 b024b97a Georgios Gousios
    except SystemExit:
217 b024b97a Georgios Gousios
        logger.info("SystemExit")
218 b024b97a Georgios Gousios
    except:
219 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
220 b024b97a Georgios Gousios
    finally:
221 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
222 b024b97a Georgios Gousios
        notifier.stop()
223 b024b97a Georgios Gousios
        raise
224 b024b97a Georgios Gousios
225 b024b97a Georgios Gousios
if __name__ == "__main__":
226 b024b97a Georgios Gousios
    sys.exit(main())
227 b024b97a Georgios Gousios
228 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :