Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ 2cd99e7a

History | View | Annotate | Download (7.2 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with AMQP
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications to the ganeti exchange
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
import socket
29
from signal import signal, SIGINT, SIGTERM
30

    
31
from amqplib import client_0_8 as amqp
32

    
33
from ganeti import utils
34
from ganeti import jqueue
35
from ganeti import constants
36
from ganeti import serializer
37

    
38
class JobFileHandler(pyinotify.ProcessEvent):
39
    def __init__(self, logger):
40
        pyinotify.ProcessEvent.__init__(self)
41
        self.logger = logger
42
        self.chan = None 
43

    
44
    def open_channel(self):
45
        conn = None
46
        while conn == None:
47
            handler_logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
48
            try:
49
                conn = amqp.Connection( host=settings.RABBIT_HOST,
50
                     userid=settings.RABBIT_USERNAME,
51
                     password=settings.RABBIT_PASSWORD,
52
                     virtual_host=settings.RABBIT_VHOST)
53
            except socket.error:
54
                time.sleep(1)
55
        
56
        handler_logger.info("Connection succesful, opening channel")
57
        return conn.channel()
58

    
59
    def process_IN_CLOSE_WRITE(self, event):
60
        if self.chan == None:
61
            self.chan = self.open_channel()
62

    
63
        jobfile = os.path.join(event.path, event.name)
64
        if not event.name.startswith("job-"):
65
            self.logger.debug("Not a job file: %s" % event.path)
66
            return
67

    
68
        try:
69
            data = utils.ReadFile(jobfile)
70
        except IOError:
71
            return
72

    
73
        data = serializer.LoadJson(data)
74
        job = jqueue._QueuedJob.Restore(None, data)
75

    
76
        for op in job.ops:
77
            instances = ""
78
            try:
79
                instances = " ".join(op.input.instances)
80
            except AttributeError:
81
                pass
82

    
83
            try:
84
                instances = op.input.instance_name
85
            except AttributeError:
86
                pass
87

    
88
            # Get the last line of the op log as message
89
            try:
90
                logmsg = op.log[-1][-1]
91
            except IndexError:
92
                logmsg = None
93

    
94
            self.logger.debug("%d: %s(%s) %s %s",
95
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
96

    
97
            # Construct message
98
            msg = {
99
                    "type": "ganeti-op-status",
100
                    "instance": instances,
101
                    "operation": op.input.OP_ID,
102
                    "jobId": int(job.id),
103
                    "status": op.status,
104
                    "logmsg": logmsg
105
                    }
106
            if logmsg:
107
                msg["message"] = logmsg
108
            
109
            instance = instances.split('-')[0]  
110
            routekey = "ganeti.%s.event.op" % instance
111
            
112
            self.logger.debug("Delivering msg: %s (key=%s)",
113
                json.dumps(msg), routekey)
114
            msg = amqp.Message(json.dumps(msg))
115
            msg.properties["delivery_mode"] = 2  # Persistent
116

    
117
            while True:
118
                try:
119
                    self.chan.basic_publish(msg,
120
                            exchange=settings.EXCHANGE_GANETI,
121
                            routing_key=routekey)
122
                    return
123
                except socket.error:
124
                    self.logger.exception("Server went away, reconnecting...")
125
                    self.chan = self.open_channel()
126
                except Exception:
127
                    self.logger.exception("Caught unexpected exception (msg: %s)", msg)
128
                    raise
129

    
130
handler_logger = None
131
def fatal_signal_handler(signum, frame):
132
    global handler_logger
133

    
134
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
135
                        signum)
136
    raise SystemExit
137

    
138
def parse_arguments(args):
139
    from optparse import OptionParser
140

    
141
    parser = OptionParser()
142
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
143
                      help="Enable debugging information")
144
    parser.add_option("-l", "--log", dest="log_file",
145
                      default=settings.GANETI_EVENTD_LOG_FILE,
146
                      metavar="FILE",
147
                      help="Write log to FILE instead of %s" %
148
                           settings.GANETI_EVENTD_LOG_FILE)
149
    parser.add_option('--pid-file', dest="pid_file",
150
                      default=settings.GANETI_EVENTD_PID_FILE,
151
                      metavar='PIDFILE',
152
                      help="Save PID to file (default: %s)" %
153
                           settings.GANETI_EVENTD_PID_FILE)
154

    
155
    return parser.parse_args(args)
156

    
157
def main():
158
    global handler_logger
159

    
160
    (opts, args) = parse_arguments(sys.argv[1:])
161

    
162
    # Create pidfile
163
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
164

    
165
    # Initialize logger
166
    lvl = logging.DEBUG if opts.debug else logging.INFO
167
    logger = logging.getLogger("ganeti.eventd")
168
    logger.setLevel(lvl)
169
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
170
            "%Y-%m-%d %H:%M:%S")
171
    handler = logging.FileHandler(opts.log_file)
172
    handler.setFormatter(formatter)
173
    logger.addHandler(handler)
174
    handler_logger = logger
175

    
176
    # Become a daemon:
177
    # Redirect stdout and stderr to handler.stream to catch
178
    # early errors in the daemonization process [e.g., pidfile creation]
179
    # which will otherwise go to /dev/null.
180
    daemon_context = daemon.DaemonContext(
181
            pidfile=pidf,
182
            umask=022,
183
            stdout=handler.stream,
184
            stderr=handler.stream,
185
            files_preserve=[handler.stream])
186
    daemon_context.open()
187
    logger.info("Became a daemon")
188

    
189
    # Catch signals to ensure graceful shutdown
190
    signal(SIGINT, fatal_signal_handler)
191
    signal(SIGTERM, fatal_signal_handler)
192

    
193
    # Monitor the Ganeti job queue, create and push notifications
194
    wm = pyinotify.WatchManager()
195
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
196
    handler = JobFileHandler(logger)
197
    notifier = pyinotify.Notifier(wm, handler)
198

    
199
    try:
200
        # Fail if adding the inotify() watch fails for any reason
201
        res = wm.add_watch(constants.QUEUE_DIR, mask)
202
        if res[constants.QUEUE_DIR] < 0:
203
            raise Exception("pyinotify add_watch returned negative watch descriptor")
204

    
205
        logger.info("Now watching %s" % constants.QUEUE_DIR)
206

    
207
        while True:    # loop forever
208
            # process the queue of events as explained above
209
            notifier.process_events()
210
            if notifier.check_events():
211
                # read notified events and enqeue them
212
                notifier.read_events()
213
    except SystemExit:
214
        logger.info("SystemExit")
215
    except:
216
        logger.exception("Caught exception, terminating")
217
    finally:
218
        # destroy the inotify's instance on this interrupt (stop monitoring)
219
        notifier.stop()
220
        raise
221

    
222
if __name__ == "__main__":
223
    sys.exit(main())
224

    
225
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :