Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-eventd.py @ b024b97a

History | View | Annotate | Download (6.2 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with ampq
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications over a 0mq PUB endpoint.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
from signal import signal, SIGINT, SIGTERM
29

    
30
from carrot.connection import BrokerConnection
31
from carrot.messaging import Publisher
32

    
33
from threading import Thread, Event, currentThread
34

    
35
from ganeti import utils
36
from ganeti import jqueue
37
from ganeti import constants
38
from ganeti import serializer
39

    
40
class JobFileHandler(pyinotify.ProcessEvent):
41
    def __init__(self, logger, amqpd):
42
            pyinotify.ProcessEvent.__init__(self)
43
            self.logger = logger
44
            self.amqpd = amqpd
45

    
46
    def process_IN_CLOSE_WRITE(self, event):
47
        jobfile = os.path.join(event.path, event.name)
48
        if not event.name.startswith("job-"):
49
            self.logger.debug("Not a job file: %s" % event.path)
50
            return
51

    
52
        try:
53
            data = utils.ReadFile(jobfile)
54
        except IOError:
55

    
56
            return
57

    
58
        data = serializer.LoadJson(data)
59
        job = jqueue._QueuedJob.Restore(None, data)
60

    
61
        for op in job.ops:
62
            instances = ""
63
            try:
64
                instances = " ".join(op.input.instances)
65
            except AttributeError:
66
                pass
67

    
68
            try:
69
                instances = op.input.instance_name
70
            except AttributeError:
71
                pass
72

    
73
            # Get the last line of the op log as message
74
            try:
75
                logmsg = op.log[-1][-1]
76
            except IndexError:
77
                logmsg = None
78
            
79
            self.logger.debug("%d: %s(%s) %s %s",
80
                int(job.id), op.input.OP_ID, instances, op.status, logmsg)
81

    
82
            # Construct message
83
            msg = {
84
                "type": "ganeti-op-status",
85
                "instance": instances,
86
                "operation": op.input.OP_ID,
87
                "jobId": int(job.id),
88
                "status": op.status,
89
                "logmsg": logmsg
90
            }
91
            if logmsg:
92
                msg["message"] = logmsg
93
            
94
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
95
            amqpd.send(json.dumps(msg))
96

    
97

    
98
handler_logger = None
99
def fatal_signal_handler(signum, frame):
100
    global handler_logger
101

    
102
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
103
        signum)
104
    raise SystemExit
105

    
106
def parse_arguments(args):
107
    from optparse import OptionParser
108

    
109
    parser = OptionParser()
110
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
111
                      help="Enable debugging information")
112
    parser.add_option("-l", "--log", dest="log_file",
113
                      default=settings.GANETI_EVENTD_LOG_FILE,
114
                      metavar="FILE",
115
                      help="Write log to FILE instead of %s" %
116
                      settings.GANETI_EVENTD_LOG_FILE),
117
    parser.add_option('--pid-file', dest="pid_file",
118
                      default=settings.GANETI_EVENTD_PID_FILE,
119
                      metavar='PIDFILE',
120
                      help="Save PID to file (default: %s)" %
121
                      settings.GANETI_EVENTD_PID_FILE)
122

    
123
    return parser.parse_args(args)
124

    
125
def main():
126
    global handler_logger
127

    
128
    (opts, args) = parse_arguments(sys.argv[1:])
129

    
130
    # Create pidfile
131
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
132

    
133
    # Initialize logger
134
    lvl = logging.DEBUG if opts.debug else logging.INFO
135
    logger = logging.getLogger("ganeti-amqpd")
136
    logger.setLevel(lvl)
137
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
138
        "%Y-%m-%d %H:%M:%S")
139
    handler = logging.FileHandler(opts.log_file)
140
    handler.setFormatter(formatter)
141
    logger.addHandler(handler)
142
    handler_logger = logger
143

    
144
#    # Become a daemon:
145
#    # Redirect stdout and stderr to handler.stream to catch
146
#    # early errors in the daemonization process [e.g., pidfile creation]
147
#    # which will otherwise go to /dev/null.
148
#    daemon_context = daemon.DaemonContext(
149
#        pidfile=pidf,
150
#        umask=022,
151
#        stdout=handler.stream,
152
#        stderr=handler.stream,
153
#        files_preserve=[handler.stream])
154
#    daemon_context.open()
155
#    logger.info("Became a daemon")
156
#
157
#    # Catch signals to ensure graceful shutdown
158
#    signal(SIGINT, fatal_signal_handler)
159
#    signal(SIGTERM, fatal_signal_handler)
160

    
161
    #Init connection to RabbitMQ
162
    conn = BrokerConnection(hostname="localhost", port=5672,userid="guest",
163
                            password="guest",virtual_host="/")
164
    publisher = Publisher(connection=conn, exchange="ganeti",
165
                          routing_key="importer")
166

    
167

    
168
    # Monitor the Ganeti job queue, create and push notifications
169
    wm = pyinotify.WatchManager()
170
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
171
    handler = JobFileHandler(logger, publisher)
172
    notifier = pyinotify.Notifier(wm, handler)
173

    
174
    try:
175
        # Fail if adding the inotify() watch fails for any reason
176
        res = wm.add_watch(constants.QUEUE_DIR, mask)
177
        if res[constants.QUEUE_DIR] < 0:
178
            raise Exception("pyinotify add_watch returned negative watch descriptor")
179
        
180
        logger.info("Now watching %s" % constants.QUEUE_DIR)
181

    
182
        while True:    # loop forever
183
        # process the queue of events as explained above
184
            notifier.process_events()
185
            if notifier.check_events():
186
                # read notified events and enqeue them
187
                notifier.read_events()
188
    except SystemExit:
189
        logger.info("SystemExit")
190
    except:
191
        logger.exception("Caught exception, terminating")
192
    finally:
193
        # destroy the inotify's instance on this interrupt (stop monitoring)
194
        notifier.stop()
195
        # mark the 0mq thread as stopped, wake it up so that it notices
196
        raise
197

    
198
if __name__ == "__main__":
199
    sys.exit(main())
200

    
201
# vim: set ts=4 sts=4 sw=4 et ai :