Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ 031d3f3a

History | View | Annotate | Download (6.3 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with amqp 
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications over a 0mq PUB endpoint.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
from signal import signal, SIGINT, SIGTERM
29

    
30
from amqplib import client_0_8 as amqp
31

    
32
from threading import Thread, Event, currentThread
33

    
34
from ganeti import utils
35
from ganeti import jqueue
36
from ganeti import constants
37
from ganeti import serializer
38

    
39
class JobFileHandler(pyinotify.ProcessEvent):
40
    def __init__(self, logger, chan):
41
            pyinotify.ProcessEvent.__init__(self)
42
            self.logger = logger
43
            self.chan = chan
44

    
45
    def process_IN_CLOSE_WRITE(self, event):
46
        jobfile = os.path.join(event.path, event.name)
47
        if not event.name.startswith("job-"):
48
            self.logger.debug("Not a job file: %s" % event.path)
49
            return
50

    
51
        try:
52
            data = utils.ReadFile(jobfile)
53
        except IOError:
54

    
55
            return
56

    
57
        data = serializer.LoadJson(data)
58
        job = jqueue._QueuedJob.Restore(None, data)
59

    
60
        for op in job.ops:
61
            instances = ""
62
            try:
63
                instances = " ".join(op.input.instances)
64
            except AttributeError:
65
                pass
66

    
67
            try:
68
                instances = op.input.instance_name
69
            except AttributeError:
70
                pass
71

    
72
            # Get the last line of the op log as message
73
            try:
74
                logmsg = op.log[-1][-1]
75
            except IndexError:
76
                logmsg = None
77
            
78
            self.logger.debug("%d: %s(%s) %s %s",
79
                int(job.id), op.input.OP_ID, instances, op.status, logmsg)
80

    
81
            # Construct message
82
            msg = {
83
                "type": "ganeti-op-status",
84
                "instance": instances,
85
                "operation": op.input.OP_ID,
86
                "jobId": int(job.id),
87
                "status": op.status,
88
                "logmsg": logmsg
89
            }
90
            if logmsg:
91
                msg["message"] = logmsg
92
            
93
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
94
            msg = amqp.Message(json.dumps(msg))
95
            msg.properties["delivery_mode"] = 2 #Persistent
96
            self.chan.basic_publish(msg,exchange="ganeti",routing_key="eventd")
97

    
98
handler_logger = None
99
def fatal_signal_handler(signum, frame):
100
    global handler_logger
101

    
102
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
103
        signum)
104
    raise SystemExit
105

    
106
def parse_arguments(args):
107
    from optparse import OptionParser
108

    
109
    parser = OptionParser()
110
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
111
                      help="Enable debugging information")
112
    parser.add_option("-l", "--log", dest="log_file",
113
                      default=settings.GANETI_EVENTD_LOG_FILE,
114
                      metavar="FILE",
115
                      help="Write log to FILE instead of %s" %
116
                      settings.GANETI_EVENTD_LOG_FILE),
117
    parser.add_option('--pid-file', dest="pid_file",
118
                      default=settings.GANETI_EVENTD_PID_FILE,
119
                      metavar='PIDFILE',
120
                      help="Save PID to file (default: %s)" %
121
                      settings.GANETI_EVENTD_PID_FILE)
122

    
123
    return parser.parse_args(args)
124

    
125
def main():
126
    global handler_logger
127

    
128
    (opts, args) = parse_arguments(sys.argv[1:])
129

    
130
    # Create pidfile
131
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
132

    
133
    # Initialize logger
134
    lvl = logging.DEBUG if opts.debug else logging.INFO
135
    logger = logging.getLogger("ganeti-amqpd")
136
    logger.setLevel(lvl)
137
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
138
        "%Y-%m-%d %H:%M:%S")
139
    handler = logging.FileHandler(opts.log_file)
140
    handler.setFormatter(formatter)
141
    logger.addHandler(handler)
142
    handler_logger = logger
143

    
144
#    # Become a daemon:
145
#    # Redirect stdout and stderr to handler.stream to catch
146
#    # early errors in the daemonization process [e.g., pidfile creation]
147
#    # which will otherwise go to /dev/null.
148
#    daemon_context = daemon.DaemonContext(
149
#        pidfile=pidf,
150
#        umask=022,
151
#        stdout=handler.stream,
152
#        stderr=handler.stream,
153
#        files_preserve=[handler.stream])
154
#    daemon_context.open()
155
#    logger.info("Became a daemon")
156
#
157
#    # Catch signals to ensure graceful shutdown
158
#    signal(SIGINT, fatal_signal_handler)
159
#    signal(SIGTERM, fatal_signal_handler)
160

    
161
    #Init connection to RabbitMQ
162
    conn = amqp.Connection( host=settings.RABBIT_HOST,
163
                            userid=settings.RABBIT_USERNAME,
164
                            password=settings.RABBIT_PASSWORD,
165
                            virtual_host=settings.RABBIT_VHOST)
166
    chan = conn.channel()
167
    
168
    # Monitor the Ganeti job queue, create and push notifications
169
    wm = pyinotify.WatchManager()
170
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
171
    handler = JobFileHandler(logger, chan)
172
    notifier = pyinotify.Notifier(wm, handler)
173

    
174
    try:
175
        # Fail if adding the inotify() watch fails for any reason
176
        res = wm.add_watch(constants.QUEUE_DIR, mask)
177
        if res[constants.QUEUE_DIR] < 0:
178
            raise Exception("pyinotify add_watch returned negative watch descriptor")
179
        
180
        logger.info("Now watching %s" % constants.QUEUE_DIR)
181

    
182
        while True:    # loop forever
183
        # process the queue of events as explained above
184
            notifier.process_events()
185
            if notifier.check_events():
186
                # read notified events and enqeue them
187
                notifier.read_events()
188
    except SystemExit:
189
        logger.info("SystemExit")
190
    except:
191
        logger.exception("Caught exception, terminating")
192
    finally:
193
        # destroy the inotify's instance on this interrupt (stop monitoring)
194
        notifier.stop()
195
        #Close the amqp connection
196
        chan.close()
197
        conn.close()
198
        raise
199

    
200
if __name__ == "__main__":
201
    sys.exit(main())
202

    
203
# vim: set ts=4 sts=4 sw=4 et ai :