Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ b9eef123

History | View | Annotate | Download (7.2 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with AMQP
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications to the ganeti exchange
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
import socket
29
from signal import signal, SIGINT, SIGTERM
30

    
31
from amqplib import client_0_8 as amqp
32

    
33
from threading import Thread, Event, currentThread
34

    
35
from ganeti import utils
36
from ganeti import jqueue
37
from ganeti import constants
38
from ganeti import serializer
39

    
40
class JobFileHandler(pyinotify.ProcessEvent):
41
    def __init__(self, logger):
42
        pyinotify.ProcessEvent.__init__(self)
43
        self.logger = logger
44
        self.chan = None 
45

    
46
    def open_channel(self):
47
        conn = None
48
        while conn == None:
49
            handler_logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
50
            try:
51
                conn = amqp.Connection( host=settings.RABBIT_HOST,
52
                     userid=settings.RABBIT_USERNAME,
53
                     password=settings.RABBIT_PASSWORD,
54
                     virtual_host=settings.RABBIT_VHOST)
55
            except socket.error:
56
                time.sleep(1)
57
                pass
58
        
59
        handler_logger.info("Connection succesful, opening channel")
60
        return conn.channel()
61

    
62
    def process_IN_CLOSE_WRITE(self, event):
63
        if self.chan == None:
64
            self.chan = self.open_channel()
65

    
66
        jobfile = os.path.join(event.path, event.name)
67
        if not event.name.startswith("job-"):
68
            self.logger.debug("Not a job file: %s" % event.path)
69
            return
70

    
71
        try:
72
            data = utils.ReadFile(jobfile)
73
        except IOError:
74
            return
75

    
76
        data = serializer.LoadJson(data)
77
        job = jqueue._QueuedJob.Restore(None, data)
78

    
79
        for op in job.ops:
80
            instances = ""
81
            try:
82
                instances = " ".join(op.input.instances)
83
            except AttributeError:
84
                pass
85

    
86
            try:
87
                instances = op.input.instance_name
88
            except AttributeError:
89
                pass
90

    
91
            # Get the last line of the op log as message
92
            try:
93
                logmsg = op.log[-1][-1]
94
            except IndexError:
95
                logmsg = None
96

    
97
            self.logger.debug("%d: %s(%s) %s %s",
98
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
99

    
100
            # Construct message
101
            msg = {
102
                    "type": "ganeti-op-status",
103
                    "instance": instances,
104
                    "operation": op.input.OP_ID,
105
                    "jobId": int(job.id),
106
                    "status": op.status,
107
                    "logmsg": logmsg
108
                    }
109
            if logmsg:
110
                msg["message"] = logmsg
111
            
112
            instance = instances.split('-')[0]  
113
            routekey = "ganeti.%s.event.%s" % (instance,op.status)
114
            
115
            self.logger.debug("Delivering msg: %s (key=%s)",
116
                json.dumps(msg), routekey)
117
            msg = amqp.Message(json.dumps(msg))
118
            msg.properties["delivery_mode"] = 2  # Persistent
119

    
120
            while True:
121
                try:
122
                    self.chan.basic_publish(msg,
123
                            exchange=settings.EXCHANGE_GANETI,
124
                            routing_key=routekey)
125
                    return
126
                except socket.error:
127
                    self.logger.exception("Server went away, reconnecting...")
128
                    self.chan = self.open_channel()
129
                except Exception:
130
                    self.logger.exception("Caught unexpected exception (msg: %s)", msg)
131
                    raise
132

    
133
handler_logger = None
134
def fatal_signal_handler(signum, frame):
135
    global handler_logger
136

    
137
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
138
            signum)
139
    raise SystemExit
140

    
141
def parse_arguments(args):
142
    from optparse import OptionParser
143

    
144
    parser = OptionParser()
145
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
146
            help="Enable debugging information")
147
    parser.add_option("-l", "--log", dest="log_file",
148
            default=settings.GANETI_EVENTD_LOG_FILE,
149
            metavar="FILE",
150
            help="Write log to FILE instead of %s" %
151
            settings.GANETI_EVENTD_LOG_FILE),
152
    parser.add_option('--pid-file', dest="pid_file",
153
            default=settings.GANETI_EVENTD_PID_FILE,
154
            metavar='PIDFILE',
155
            help="Save PID to file (default: %s)" %
156
            settings.GANETI_EVENTD_PID_FILE)
157

    
158
    return parser.parse_args(args)
159

    
160
def main():
161
    global handler_logger
162

    
163
    (opts, args) = parse_arguments(sys.argv[1:])
164

    
165
    # Create pidfile
166
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
167

    
168
    # Initialize logger
169
    lvl = logging.DEBUG if opts.debug else logging.INFO
170
    logger = logging.getLogger("ganeti.eventd")
171
    logger.setLevel(lvl)
172
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
173
            "%Y-%m-%d %H:%M:%S")
174
    handler = logging.FileHandler(opts.log_file)
175
    handler.setFormatter(formatter)
176
    logger.addHandler(handler)
177
    handler_logger = logger
178

    
179
    # Become a daemon:
180
    # Redirect stdout and stderr to handler.stream to catch
181
    # early errors in the daemonization process [e.g., pidfile creation]
182
    # which will otherwise go to /dev/null.
183
    daemon_context = daemon.DaemonContext(
184
            pidfile=pidf,
185
            umask=022,
186
            stdout=handler.stream,
187
            stderr=handler.stream,
188
            files_preserve=[handler.stream])
189
    daemon_context.open()
190
    logger.info("Became a daemon")
191

    
192
    # Catch signals to ensure graceful shutdown
193
    signal(SIGINT, fatal_signal_handler)
194
    signal(SIGTERM, fatal_signal_handler)
195

    
196
    # Monitor the Ganeti job queue, create and push notifications
197
    wm = pyinotify.WatchManager()
198
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
199
    handler = JobFileHandler(logger)
200
    notifier = pyinotify.Notifier(wm, handler)
201

    
202
    try:
203
        # Fail if adding the inotify() watch fails for any reason
204
        res = wm.add_watch(constants.QUEUE_DIR, mask)
205
        if res[constants.QUEUE_DIR] < 0:
206
            raise Exception("pyinotify add_watch returned negative watch descriptor")
207

    
208
        logger.info("Now watching %s" % constants.QUEUE_DIR)
209

    
210
        while True:    # loop forever
211
            # process the queue of events as explained above
212
            notifier.process_events()
213
            if notifier.check_events():
214
                # read notified events and enqeue them
215
                notifier.read_events()
216
    except SystemExit:
217
        logger.info("SystemExit")
218
    except:
219
        logger.exception("Caught exception, terminating")
220
    finally:
221
        # destroy the inotify's instance on this interrupt (stop monitoring)
222
        notifier.stop()
223
        raise
224

    
225
if __name__ == "__main__":
226
    sys.exit(main())
227

    
228
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :