Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ 348f53de

History | View | Annotate | Download (6.8 kB)

1 b024b97a Georgios Gousios
#!/usr/bin/env python
2 b024b97a Georgios Gousios
#
3 b024b97a Georgios Gousios
# Copyright (c) 2010 Greek Research and Technology Network
4 b024b97a Georgios Gousios
#
5 031d3f3a Georgios Gousios
"""Ganeti notification daemon with amqp 
6 b024b97a Georgios Gousios

7 b024b97a Georgios Gousios
A daemon to monitor the Ganeti job queue and publish job progress
8 b024b97a Georgios Gousios
and Ganeti VM state notifications over a 0mq PUB endpoint.
9 b024b97a Georgios Gousios

10 b024b97a Georgios Gousios
"""
11 b024b97a Georgios Gousios
12 b024b97a Georgios Gousios
from django.core.management import setup_environ
13 b024b97a Georgios Gousios
14 b024b97a Georgios Gousios
import sys
15 b024b97a Georgios Gousios
import os
16 b024b97a Georgios Gousios
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17 b024b97a Georgios Gousios
sys.path.append(path)
18 b024b97a Georgios Gousios
import synnefo.settings as settings
19 b024b97a Georgios Gousios
20 b024b97a Georgios Gousios
setup_environ(settings)
21 b024b97a Georgios Gousios
22 b024b97a Georgios Gousios
import time
23 b024b97a Georgios Gousios
import json
24 b024b97a Georgios Gousios
import logging
25 b024b97a Georgios Gousios
import pyinotify
26 b024b97a Georgios Gousios
import daemon
27 b024b97a Georgios Gousios
import daemon.pidlockfile
28 348f53de Georgios Gousios
import socket
29 b024b97a Georgios Gousios
from signal import signal, SIGINT, SIGTERM
30 b024b97a Georgios Gousios
31 031d3f3a Georgios Gousios
from amqplib import client_0_8 as amqp
32 b024b97a Georgios Gousios
33 b024b97a Georgios Gousios
from threading import Thread, Event, currentThread
34 b024b97a Georgios Gousios
35 b024b97a Georgios Gousios
from ganeti import utils
36 b024b97a Georgios Gousios
from ganeti import jqueue
37 b024b97a Georgios Gousios
from ganeti import constants
38 b024b97a Georgios Gousios
from ganeti import serializer
39 b024b97a Georgios Gousios
40 b024b97a Georgios Gousios
class JobFileHandler(pyinotify.ProcessEvent):
41 348f53de Georgios Gousios
    def __init__(self, logger):
42 348f53de Georgios Gousios
        pyinotify.ProcessEvent.__init__(self)
43 348f53de Georgios Gousios
        self.logger = logger
44 348f53de Georgios Gousios
        self.chan = None 
45 348f53de Georgios Gousios
46 348f53de Georgios Gousios
    def open_channel(self):
47 348f53de Georgios Gousios
        conn = None
48 348f53de Georgios Gousios
        while conn == None:
49 348f53de Georgios Gousios
            handler_logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
50 348f53de Georgios Gousios
            conn = amqp.Connection( host=settings.RABBIT_HOST,
51 348f53de Georgios Gousios
                     userid=settings.RABBIT_USERNAME,
52 348f53de Georgios Gousios
                     password=settings.RABBIT_PASSWORD,
53 348f53de Georgios Gousios
                     virtual_host=settings.RABBIT_VHOST)
54 348f53de Georgios Gousios
            time.sleep(1)
55 348f53de Georgios Gousios
        
56 348f53de Georgios Gousios
        handler_logger.info("Connection succesful, opening channel")
57 348f53de Georgios Gousios
        return conn.channel()
58 b024b97a Georgios Gousios
59 b024b97a Georgios Gousios
    def process_IN_CLOSE_WRITE(self, event):
60 348f53de Georgios Gousios
        if self.chan == None:
61 348f53de Georgios Gousios
            self.chan = self.open_channel()
62 348f53de Georgios Gousios
63 b024b97a Georgios Gousios
        jobfile = os.path.join(event.path, event.name)
64 b024b97a Georgios Gousios
        if not event.name.startswith("job-"):
65 b024b97a Georgios Gousios
            self.logger.debug("Not a job file: %s" % event.path)
66 b024b97a Georgios Gousios
            return
67 b024b97a Georgios Gousios
68 b024b97a Georgios Gousios
        try:
69 b024b97a Georgios Gousios
            data = utils.ReadFile(jobfile)
70 b024b97a Georgios Gousios
        except IOError:
71 b024b97a Georgios Gousios
            return
72 b024b97a Georgios Gousios
73 b024b97a Georgios Gousios
        data = serializer.LoadJson(data)
74 b024b97a Georgios Gousios
        job = jqueue._QueuedJob.Restore(None, data)
75 b024b97a Georgios Gousios
76 b024b97a Georgios Gousios
        for op in job.ops:
77 b024b97a Georgios Gousios
            instances = ""
78 b024b97a Georgios Gousios
            try:
79 b024b97a Georgios Gousios
                instances = " ".join(op.input.instances)
80 b024b97a Georgios Gousios
            except AttributeError:
81 b024b97a Georgios Gousios
                pass
82 b024b97a Georgios Gousios
83 b024b97a Georgios Gousios
            try:
84 b024b97a Georgios Gousios
                instances = op.input.instance_name
85 b024b97a Georgios Gousios
            except AttributeError:
86 b024b97a Georgios Gousios
                pass
87 b024b97a Georgios Gousios
88 b024b97a Georgios Gousios
            # Get the last line of the op log as message
89 b024b97a Georgios Gousios
            try:
90 b024b97a Georgios Gousios
                logmsg = op.log[-1][-1]
91 b024b97a Georgios Gousios
            except IndexError:
92 b024b97a Georgios Gousios
                logmsg = None
93 348f53de Georgios Gousios
94 b024b97a Georgios Gousios
            self.logger.debug("%d: %s(%s) %s %s",
95 348f53de Georgios Gousios
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
96 b024b97a Georgios Gousios
97 b024b97a Georgios Gousios
            # Construct message
98 b024b97a Georgios Gousios
            msg = {
99 348f53de Georgios Gousios
                    "type": "ganeti-op-status",
100 348f53de Georgios Gousios
                    "instance": instances,
101 348f53de Georgios Gousios
                    "operation": op.input.OP_ID,
102 348f53de Georgios Gousios
                    "jobId": int(job.id),
103 348f53de Georgios Gousios
                    "status": op.status,
104 348f53de Georgios Gousios
                    "logmsg": logmsg
105 348f53de Georgios Gousios
                    }
106 b024b97a Georgios Gousios
            if logmsg:
107 b024b97a Georgios Gousios
                msg["message"] = logmsg
108 348f53de Georgios Gousios
109 b024b97a Georgios Gousios
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
110 031d3f3a Georgios Gousios
            msg = amqp.Message(json.dumps(msg))
111 031d3f3a Georgios Gousios
            msg.properties["delivery_mode"] = 2 #Persistent
112 348f53de Georgios Gousios
            try:    
113 348f53de Georgios Gousios
                self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
114 348f53de Georgios Gousios
            except socket.error:
115 348f53de Georgios Gousios
                self.logger.error("Server went away, reconnecting...")
116 348f53de Georgios Gousios
                self.chan = self.open_channel()
117 348f53de Georgios Gousios
                self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
118 348f53de Georgios Gousios
            except Exception:
119 348f53de Georgios Gousios
                self.logger.error("Uknown error (msg: %s)", msg)
120 348f53de Georgios Gousios
                raise
121 b024b97a Georgios Gousios
122 b024b97a Georgios Gousios
handler_logger = None
123 b024b97a Georgios Gousios
def fatal_signal_handler(signum, frame):
124 b024b97a Georgios Gousios
    global handler_logger
125 b024b97a Georgios Gousios
126 b024b97a Georgios Gousios
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
127 348f53de Georgios Gousios
            signum)
128 b024b97a Georgios Gousios
    raise SystemExit
129 b024b97a Georgios Gousios
130 b024b97a Georgios Gousios
def parse_arguments(args):
131 b024b97a Georgios Gousios
    from optparse import OptionParser
132 b024b97a Georgios Gousios
133 b024b97a Georgios Gousios
    parser = OptionParser()
134 b024b97a Georgios Gousios
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
135 348f53de Georgios Gousios
            help="Enable debugging information")
136 b024b97a Georgios Gousios
    parser.add_option("-l", "--log", dest="log_file",
137 348f53de Georgios Gousios
            default=settings.GANETI_EVENTD_LOG_FILE,
138 348f53de Georgios Gousios
            metavar="FILE",
139 348f53de Georgios Gousios
            help="Write log to FILE instead of %s" %
140 348f53de Georgios Gousios
            settings.GANETI_EVENTD_LOG_FILE),
141 b024b97a Georgios Gousios
    parser.add_option('--pid-file', dest="pid_file",
142 348f53de Georgios Gousios
            default=settings.GANETI_EVENTD_PID_FILE,
143 348f53de Georgios Gousios
            metavar='PIDFILE',
144 348f53de Georgios Gousios
            help="Save PID to file (default: %s)" %
145 348f53de Georgios Gousios
            settings.GANETI_EVENTD_PID_FILE)
146 b024b97a Georgios Gousios
147 b024b97a Georgios Gousios
    return parser.parse_args(args)
148 b024b97a Georgios Gousios
149 b024b97a Georgios Gousios
def main():
150 b024b97a Georgios Gousios
    global handler_logger
151 b024b97a Georgios Gousios
152 b024b97a Georgios Gousios
    (opts, args) = parse_arguments(sys.argv[1:])
153 b024b97a Georgios Gousios
154 b024b97a Georgios Gousios
    # Create pidfile
155 b024b97a Georgios Gousios
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
156 b024b97a Georgios Gousios
157 b024b97a Georgios Gousios
    # Initialize logger
158 b024b97a Georgios Gousios
    lvl = logging.DEBUG if opts.debug else logging.INFO
159 b024b97a Georgios Gousios
    logger = logging.getLogger("ganeti-amqpd")
160 b024b97a Georgios Gousios
    logger.setLevel(lvl)
161 b024b97a Georgios Gousios
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
162 348f53de Georgios Gousios
            "%Y-%m-%d %H:%M:%S")
163 b024b97a Georgios Gousios
    handler = logging.FileHandler(opts.log_file)
164 b024b97a Georgios Gousios
    handler.setFormatter(formatter)
165 b024b97a Georgios Gousios
    logger.addHandler(handler)
166 b024b97a Georgios Gousios
    handler_logger = logger
167 b024b97a Georgios Gousios
168 348f53de Georgios Gousios
    # Become a daemon:
169 348f53de Georgios Gousios
    # Redirect stdout and stderr to handler.stream to catch
170 348f53de Georgios Gousios
    # early errors in the daemonization process [e.g., pidfile creation]
171 348f53de Georgios Gousios
    # which will otherwise go to /dev/null.
172 348f53de Georgios Gousios
    daemon_context = daemon.DaemonContext(
173 348f53de Georgios Gousios
            pidfile=pidf,
174 348f53de Georgios Gousios
            umask=022,
175 348f53de Georgios Gousios
            stdout=handler.stream,
176 348f53de Georgios Gousios
            stderr=handler.stream,
177 348f53de Georgios Gousios
            files_preserve=[handler.stream])
178 348f53de Georgios Gousios
    daemon_context.open()
179 348f53de Georgios Gousios
    logger.info("Became a daemon")
180 348f53de Georgios Gousios
181 348f53de Georgios Gousios
    # Catch signals to ensure graceful shutdown
182 348f53de Georgios Gousios
    signal(SIGINT, fatal_signal_handler)
183 348f53de Georgios Gousios
    signal(SIGTERM, fatal_signal_handler)
184 348f53de Georgios Gousios
185 348f53de Georgios Gousios
186 b024b97a Georgios Gousios
    # Monitor the Ganeti job queue, create and push notifications
187 b024b97a Georgios Gousios
    wm = pyinotify.WatchManager()
188 b024b97a Georgios Gousios
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
189 348f53de Georgios Gousios
    handler = JobFileHandler(logger)
190 b024b97a Georgios Gousios
    notifier = pyinotify.Notifier(wm, handler)
191 b024b97a Georgios Gousios
192 b024b97a Georgios Gousios
    try:
193 b024b97a Georgios Gousios
        # Fail if adding the inotify() watch fails for any reason
194 b024b97a Georgios Gousios
        res = wm.add_watch(constants.QUEUE_DIR, mask)
195 b024b97a Georgios Gousios
        if res[constants.QUEUE_DIR] < 0:
196 b024b97a Georgios Gousios
            raise Exception("pyinotify add_watch returned negative watch descriptor")
197 348f53de Georgios Gousios
198 b024b97a Georgios Gousios
        logger.info("Now watching %s" % constants.QUEUE_DIR)
199 b024b97a Georgios Gousios
200 b024b97a Georgios Gousios
        while True:    # loop forever
201 348f53de Georgios Gousios
            # process the queue of events as explained above
202 b024b97a Georgios Gousios
            notifier.process_events()
203 b024b97a Georgios Gousios
            if notifier.check_events():
204 b024b97a Georgios Gousios
                # read notified events and enqeue them
205 b024b97a Georgios Gousios
                notifier.read_events()
206 b024b97a Georgios Gousios
    except SystemExit:
207 b024b97a Georgios Gousios
        logger.info("SystemExit")
208 b024b97a Georgios Gousios
    except:
209 b024b97a Georgios Gousios
        logger.exception("Caught exception, terminating")
210 b024b97a Georgios Gousios
    finally:
211 b024b97a Georgios Gousios
        # destroy the inotify's instance on this interrupt (stop monitoring)
212 b024b97a Georgios Gousios
        notifier.stop()
213 b024b97a Georgios Gousios
        raise
214 b024b97a Georgios Gousios
215 b024b97a Georgios Gousios
if __name__ == "__main__":
216 b024b97a Georgios Gousios
    sys.exit(main())
217 b024b97a Georgios Gousios
218 348f53de Georgios Gousios
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :