Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ da102335

History | View | Annotate | Download (6.9 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with amqp 
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications over a 0mq PUB endpoint.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
import socket
29
from signal import signal, SIGINT, SIGTERM
30

    
31
from amqplib import client_0_8 as amqp
32

    
33
from threading import Thread, Event, currentThread
34

    
35
from ganeti import utils
36
from ganeti import jqueue
37
from ganeti import constants
38
from ganeti import serializer
39

    
40
class JobFileHandler(pyinotify.ProcessEvent):
41
    def __init__(self, logger):
42
        pyinotify.ProcessEvent.__init__(self)
43
        self.logger = logger
44
        self.chan = None 
45

    
46
    def open_channel(self):
47
        conn = None
48
        while conn == None:
49
            handler_logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
50
            try:
51
                conn = amqp.Connection( host=settings.RABBIT_HOST,
52
                     userid=settings.RABBIT_USERNAME,
53
                     password=settings.RABBIT_PASSWORD,
54
                     virtual_host=settings.RABBIT_VHOST)
55
            except socket.error:
56
                time.sleep(1)
57
                pass
58
        
59
        handler_logger.info("Connection succesful, opening channel")
60
        return conn.channel()
61

    
62
    def process_IN_CLOSE_WRITE(self, event):
63
        if self.chan == None:
64
            self.chan = self.open_channel()
65

    
66
        jobfile = os.path.join(event.path, event.name)
67
        if not event.name.startswith("job-"):
68
            self.logger.debug("Not a job file: %s" % event.path)
69
            return
70

    
71
        try:
72
            data = utils.ReadFile(jobfile)
73
        except IOError:
74
            return
75

    
76
        data = serializer.LoadJson(data)
77
        job = jqueue._QueuedJob.Restore(None, data)
78

    
79
        for op in job.ops:
80
            instances = ""
81
            try:
82
                instances = " ".join(op.input.instances)
83
            except AttributeError:
84
                pass
85

    
86
            try:
87
                instances = op.input.instance_name
88
            except AttributeError:
89
                pass
90

    
91
            # Get the last line of the op log as message
92
            try:
93
                logmsg = op.log[-1][-1]
94
            except IndexError:
95
                logmsg = None
96

    
97
            self.logger.debug("%d: %s(%s) %s %s",
98
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
99

    
100
            # Construct message
101
            msg = {
102
                    "type": "ganeti-op-status",
103
                    "instance": instances,
104
                    "operation": op.input.OP_ID,
105
                    "jobId": int(job.id),
106
                    "status": op.status,
107
                    "logmsg": logmsg
108
                    }
109
            if logmsg:
110
                msg["message"] = logmsg
111

    
112
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
113
            msg = amqp.Message(json.dumps(msg))
114
            msg.properties["delivery_mode"] = 2 #Persistent
115
            try:    
116
                self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
117
            except socket.error:
118
                self.logger.error("Server went away, reconnecting...")
119
                self.chan = self.open_channel()
120
                self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
121
            except Exception:
122
                self.logger.error("Uknown error (msg: %s)", msg)
123
                raise
124

    
125
handler_logger = None
126
def fatal_signal_handler(signum, frame):
127
    global handler_logger
128

    
129
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
130
            signum)
131
    raise SystemExit
132

    
133
def parse_arguments(args):
134
    from optparse import OptionParser
135

    
136
    parser = OptionParser()
137
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
138
            help="Enable debugging information")
139
    parser.add_option("-l", "--log", dest="log_file",
140
            default=settings.GANETI_EVENTD_LOG_FILE,
141
            metavar="FILE",
142
            help="Write log to FILE instead of %s" %
143
            settings.GANETI_EVENTD_LOG_FILE),
144
    parser.add_option('--pid-file', dest="pid_file",
145
            default=settings.GANETI_EVENTD_PID_FILE,
146
            metavar='PIDFILE',
147
            help="Save PID to file (default: %s)" %
148
            settings.GANETI_EVENTD_PID_FILE)
149

    
150
    return parser.parse_args(args)
151

    
152
def main():
153
    global handler_logger
154

    
155
    (opts, args) = parse_arguments(sys.argv[1:])
156

    
157
    # Create pidfile
158
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
159

    
160
    # Initialize logger
161
    lvl = logging.DEBUG if opts.debug else logging.INFO
162
    logger = logging.getLogger("ganeti-amqpd")
163
    logger.setLevel(lvl)
164
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
165
            "%Y-%m-%d %H:%M:%S")
166
    handler = logging.FileHandler(opts.log_file)
167
    handler.setFormatter(formatter)
168
    logger.addHandler(handler)
169
    handler_logger = logger
170

    
171
    # Become a daemon:
172
    # Redirect stdout and stderr to handler.stream to catch
173
    # early errors in the daemonization process [e.g., pidfile creation]
174
    # which will otherwise go to /dev/null.
175
    daemon_context = daemon.DaemonContext(
176
            pidfile=pidf,
177
            umask=022,
178
            stdout=handler.stream,
179
            stderr=handler.stream,
180
            files_preserve=[handler.stream])
181
    daemon_context.open()
182
    logger.info("Became a daemon")
183

    
184
    # Catch signals to ensure graceful shutdown
185
    signal(SIGINT, fatal_signal_handler)
186
    signal(SIGTERM, fatal_signal_handler)
187

    
188
    # Monitor the Ganeti job queue, create and push notifications
189
    wm = pyinotify.WatchManager()
190
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
191
    handler = JobFileHandler(logger)
192
    notifier = pyinotify.Notifier(wm, handler)
193

    
194
    try:
195
        # Fail if adding the inotify() watch fails for any reason
196
        res = wm.add_watch(constants.QUEUE_DIR, mask)
197
        if res[constants.QUEUE_DIR] < 0:
198
            raise Exception("pyinotify add_watch returned negative watch descriptor")
199

    
200
        logger.info("Now watching %s" % constants.QUEUE_DIR)
201

    
202
        while True:    # loop forever
203
            # process the queue of events as explained above
204
            notifier.process_events()
205
            if notifier.check_events():
206
                # read notified events and enqeue them
207
                notifier.read_events()
208
    except SystemExit:
209
        logger.info("SystemExit")
210
    except:
211
        logger.exception("Caught exception, terminating")
212
    finally:
213
        # destroy the inotify's instance on this interrupt (stop monitoring)
214
        notifier.stop()
215
        raise
216

    
217
if __name__ == "__main__":
218
    sys.exit(main())
219

    
220
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :