Statistics
| Branch: | Tag: | Revision:

root / ganeti / snf-ganeti-eventd.py @ eab0602e

History | View | Annotate | Download (7.4 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with AMQP
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications to the ganeti exchange
9

10
"""
11

    
12
#from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
#setup_environ(settings)
21

    
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
import socket
29
from signal import signal, SIGINT, SIGTERM
30

    
31
from amqplib import client_0_8 as amqp
32

    
33
from ganeti import utils
34
from ganeti import jqueue
35
from ganeti import constants
36
from ganeti import serializer
37

    
38
class JobFileHandler(pyinotify.ProcessEvent):
39
    def __init__(self, logger):
40
        pyinotify.ProcessEvent.__init__(self)
41
        self.logger = logger
42
        self.chan = None 
43

    
44
    def open_channel(self):
45
        conn = None
46
        while conn == None:
47
            handler_logger.info("Attempting to connect to %s",
48
                settings.RABBIT_HOST)
49
            try:
50
                conn = amqp.Connection(host=settings.RABBIT_HOST,
51
                     userid=settings.RABBIT_USERNAME,
52
                     password=settings.RABBIT_PASSWORD,
53
                     virtual_host=settings.RABBIT_VHOST)
54
            except socket.error:
55
                time.sleep(1)
56
        
57
        handler_logger.info("Connection succesful, opening channel")
58
        return conn.channel()
59

    
60
    def process_IN_CLOSE_WRITE(self, event):
61
        self.process_IN_MOVED_TO(event)
62

    
63
    def process_IN_MOVED_TO(self, event):
64
        if self.chan == None:
65
            self.chan = self.open_channel()
66

    
67
        jobfile = os.path.join(event.path, event.name)
68
        if not event.name.startswith("job-"):
69
            self.logger.debug("Not a job file: %s" % event.path)
70
            return
71

    
72
        try:
73
            data = utils.ReadFile(jobfile)
74
        except IOError:
75
            return
76

    
77
        data = serializer.LoadJson(data)
78
        job = jqueue._QueuedJob.Restore(None, data)
79

    
80
        for op in job.ops:
81
            instances = ""
82
            try:
83
                instances = " ".join(op.input.instances)
84
            except AttributeError:
85
                pass
86

    
87
            try:
88
                instances = op.input.instance_name
89
            except AttributeError:
90
                pass
91

    
92
            # Get the last line of the op log as message
93
            try:
94
                logmsg = op.log[-1][-1]
95
            except IndexError:
96
                logmsg = None
97

    
98
            self.logger.debug("Job: %d: %s(%s) %s %s",
99
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
100

    
101
            # Construct message
102
            msg = {
103
                    "type": "ganeti-op-status",
104
                    "instance": instances,
105
                    "operation": op.input.OP_ID,
106
                    "jobId": int(job.id),
107
                    "status": op.status,
108
                    "logmsg": logmsg
109
                    }
110
            if logmsg:
111
                msg["message"] = logmsg
112
            
113
            instance = instances.split('-')[0]  
114
            routekey = "ganeti.%s.event.op" % instance
115
            
116
            self.logger.debug("Delivering msg: %s (key=%s)",
117
                json.dumps(msg), routekey)
118
            msg = amqp.Message(json.dumps(msg))
119
            msg.properties["delivery_mode"] = 2  # Persistent
120

    
121
            while True:
122
                try:
123
                    self.chan.basic_publish(msg,
124
                            exchange=settings.EXCHANGE_GANETI,
125
                            routing_key=routekey)
126
                    return
127
                except socket.error:
128
                    self.logger.exception("Server went away, reconnecting...")
129
                    self.chan = self.open_channel()
130
                except Exception:
131
                    self.logger.exception("Caught unexpected exception, msg: ",
132
                                          msg)
133
                    raise
134

    
135
handler_logger = None
136
def fatal_signal_handler(signum, frame):
137
    global handler_logger
138

    
139
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
140
                        signum)
141
    raise SystemExit
142

    
143
def parse_arguments(args):
144
    from optparse import OptionParser
145

    
146
    parser = OptionParser()
147
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
148
                      help="Enable debugging information")
149
    parser.add_option("-l", "--log", dest="log_file",
150
                      default=settings.GANETI_EVENTD_LOG_FILE,
151
                      metavar="FILE",
152
                      help="Write log to FILE instead of %s" %
153
                           settings.GANETI_EVENTD_LOG_FILE)
154
    parser.add_option('--pid-file', dest="pid_file",
155
                      default=settings.GANETI_EVENTD_PID_FILE,
156
                      metavar='PIDFILE',
157
                      help="Save PID to file (default: %s)" %
158
                           settings.GANETI_EVENTD_PID_FILE)
159

    
160
    return parser.parse_args(args)
161

    
162
def main():
163
    global handler_logger
164

    
165
    (opts, args) = parse_arguments(sys.argv[1:])
166

    
167
    # Create pidfile
168
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
169

    
170
    # Initialize logger
171
    lvl = logging.DEBUG if opts.debug else logging.INFO
172
    logger = logging.getLogger("ganeti.eventd")
173
    logger.setLevel(lvl)
174
    formatter = logging.Formatter(
175
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
176
        "%Y-%m-%d %H:%M:%S")
177
    handler = logging.FileHandler(opts.log_file)
178
    handler.setFormatter(formatter)
179
    logger.addHandler(handler)
180
    handler_logger = logger
181

    
182
    # Become a daemon:
183
    # Redirect stdout and stderr to handler.stream to catch
184
    # early errors in the daemonization process [e.g., pidfile creation]
185
    # which will otherwise go to /dev/null.
186
    daemon_context = daemon.DaemonContext(
187
            pidfile=pidf,
188
            umask=022,
189
            stdout=handler.stream,
190
            stderr=handler.stream,
191
            files_preserve=[handler.stream])
192
    daemon_context.open()
193
    logger.info("Became a daemon")
194

    
195
    # Catch signals to ensure graceful shutdown
196
    signal(SIGINT, fatal_signal_handler)
197
    signal(SIGTERM, fatal_signal_handler)
198

    
199
    # Monitor the Ganeti job queue, create and push notifications
200
    wm = pyinotify.WatchManager()
201
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
202
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
203
    handler = JobFileHandler(logger)
204
    notifier = pyinotify.Notifier(wm, handler)
205

    
206
    try:
207
        # Fail if adding the inotify() watch fails for any reason
208
        res = wm.add_watch(constants.QUEUE_DIR, mask)
209
        if res[constants.QUEUE_DIR] < 0:
210
            raise Exception("pyinotify add_watch returned negative descriptor")
211

    
212
        logger.info("Now watching %s" % constants.QUEUE_DIR)
213

    
214
        while True:    # loop forever
215
            # process the queue of events as explained above
216
            notifier.process_events()
217
            if notifier.check_events():
218
                # read notified events and enqeue them
219
                notifier.read_events()
220
    except SystemExit:
221
        logger.info("SystemExit")
222
    except:
223
        logger.exception("Caught exception, terminating")
224
    finally:
225
        # destroy the inotify's instance on this interrupt (stop monitoring)
226
        notifier.stop()
227
        raise
228

    
229
if __name__ == "__main__":
230
    sys.exit(main())
231

    
232
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :