Statistics
| Branch: | Tag: | Revision:

root / ganeti / snf-ganeti-eventd.py @ 737b0e28

History | View | Annotate | Download (8.9 kB)

1
#!/usr/bin/env python
2
#
3
# -*- coding: utf-8 -*-
4
#
5
# Copyright 2011 GRNET S.A. All rights reserved.
6
#
7
# Redistribution and use in source and binary forms, with or
8
# without modification, are permitted provided that the following
9
# conditions are met:
10
#
11
#   1. Redistributions of source code must retain the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer.
14
#
15
#   2. Redistributions in binary form must reproduce the above
16
#      copyright notice, this list of conditions and the following
17
#      disclaimer in the documentation and/or other materials
18
#      provided with the distribution.
19
#
20
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
21
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
22
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
24
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
27
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
28
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
29
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
30
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31
# POSSIBILITY OF SUCH DAMAGE.
32
#
33
# The views and conclusions contained in the software and
34
# documentation are those of the authors and should not be
35
# interpreted as representing official policies, either expressed
36
# or implied, of GRNET S.A.
37
#
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42

43
"""
44

    
45
#from django.core.management import setup_environ
46

    
47
import sys
48
import os
49
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
50
sys.path.append(path)
51
import synnefo.settings as settings
52

    
53
#setup_environ(settings)
54

    
55
import time
56
import json
57
import logging
58
import pyinotify
59
import daemon
60
import daemon.pidlockfile
61
import socket
62
from signal import signal, SIGINT, SIGTERM
63

    
64
from amqplib import client_0_8 as amqp
65

    
66
from ganeti import utils
67
from ganeti import jqueue
68
from ganeti import constants
69
from ganeti import serializer
70

    
71
class JobFileHandler(pyinotify.ProcessEvent):
72
    def __init__(self, logger):
73
        pyinotify.ProcessEvent.__init__(self)
74
        self.logger = logger
75
        self.chan = None
76

    
77
    def open_channel(self):
78
        conn = None
79
        while conn == None:
80
            handler_logger.info("Attempting to connect to %s",
81
                settings.RABBIT_HOST)
82
            try:
83
                conn = amqp.Connection(host=settings.RABBIT_HOST,
84
                     userid=settings.RABBIT_USERNAME,
85
                     password=settings.RABBIT_PASSWORD,
86
                     virtual_host=settings.RABBIT_VHOST)
87
            except socket.error:
88
                time.sleep(1)
89

    
90
        handler_logger.info("Connection succesful, opening channel")
91
        return conn.channel()
92

    
93
    def process_IN_CLOSE_WRITE(self, event):
94
        self.process_IN_MOVED_TO(event)
95

    
96
    def process_IN_MOVED_TO(self, event):
97
        if self.chan == None:
98
            self.chan = self.open_channel()
99

    
100
        jobfile = os.path.join(event.path, event.name)
101
        if not event.name.startswith("job-"):
102
            self.logger.debug("Not a job file: %s" % event.path)
103
            return
104

    
105
        try:
106
            data = utils.ReadFile(jobfile)
107
        except IOError:
108
            return
109

    
110
        data = serializer.LoadJson(data)
111
        job = jqueue._QueuedJob.Restore(None, data)
112

    
113
        for op in job.ops:
114
            instances = ""
115
            try:
116
                instances = " ".join(op.input.instances)
117
            except AttributeError:
118
                pass
119

    
120
            try:
121
                instances = op.input.instance_name
122
            except AttributeError:
123
                pass
124

    
125
            # Get the last line of the op log as message
126
            try:
127
                logmsg = op.log[-1][-1]
128
            except IndexError:
129
                logmsg = None
130

    
131
            self.logger.debug("Job: %d: %s(%s) %s %s",
132
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
133

    
134
            # Construct message
135
            msg = {
136
                    "type": "ganeti-op-status",
137
                    "instance": instances,
138
                    "operation": op.input.OP_ID,
139
                    "jobId": int(job.id),
140
                    "status": op.status,
141
                    "logmsg": logmsg
142
                    }
143
            if logmsg:
144
                msg["message"] = logmsg
145

    
146
            instance = instances.split('-')[0]
147
            routekey = "ganeti.%s.event.op" % instance
148

    
149
            self.logger.debug("Delivering msg: %s (key=%s)",
150
                json.dumps(msg), routekey)
151
            msg = amqp.Message(json.dumps(msg))
152
            msg.properties["delivery_mode"] = 2  # Persistent
153

    
154
            while True:
155
                try:
156
                    self.chan.basic_publish(msg,
157
                            exchange=settings.EXCHANGE_GANETI,
158
                            routing_key=routekey)
159
                    return
160
                except socket.error:
161
                    self.logger.exception("Server went away, reconnecting...")
162
                    self.chan = self.open_channel()
163
                except Exception:
164
                    self.logger.exception("Caught unexpected exception, msg: ",
165
                                          msg)
166
                    raise
167

    
168
handler_logger = None
169
def fatal_signal_handler(signum, frame):
170
    global handler_logger
171

    
172
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
173
                        signum)
174
    raise SystemExit
175

    
176
def parse_arguments(args):
177
    from optparse import OptionParser
178

    
179
    parser = OptionParser()
180
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
181
                      help="Enable debugging information")
182
    parser.add_option("-l", "--log", dest="log_file",
183
                      default=settings.GANETI_EVENTD_LOG_FILE,
184
                      metavar="FILE",
185
                      help="Write log to FILE instead of %s" %
186
                           settings.GANETI_EVENTD_LOG_FILE)
187
    parser.add_option('--pid-file', dest="pid_file",
188
                      default=settings.GANETI_EVENTD_PID_FILE,
189
                      metavar='PIDFILE',
190
                      help="Save PID to file (default: %s)" %
191
                           settings.GANETI_EVENTD_PID_FILE)
192

    
193
    return parser.parse_args(args)
194

    
195
def main():
196
    global handler_logger
197

    
198
    (opts, args) = parse_arguments(sys.argv[1:])
199

    
200
    # Create pidfile
201
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
202

    
203
    # Initialize logger
204
    lvl = logging.DEBUG if opts.debug else logging.INFO
205
    logger = logging.getLogger("ganeti.eventd")
206
    logger.setLevel(lvl)
207
    formatter = logging.Formatter(
208
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
209
        "%Y-%m-%d %H:%M:%S")
210
    handler = logging.FileHandler(opts.log_file)
211
    handler.setFormatter(formatter)
212
    logger.addHandler(handler)
213
    handler_logger = logger
214

    
215
    # Become a daemon:
216
    # Redirect stdout and stderr to handler.stream to catch
217
    # early errors in the daemonization process [e.g., pidfile creation]
218
    # which will otherwise go to /dev/null.
219
    daemon_context = daemon.DaemonContext(
220
            pidfile=pidf,
221
            umask=022,
222
            stdout=handler.stream,
223
            stderr=handler.stream,
224
            files_preserve=[handler.stream])
225
    daemon_context.open()
226
    logger.info("Became a daemon")
227

    
228
    # Catch signals to ensure graceful shutdown
229
    signal(SIGINT, fatal_signal_handler)
230
    signal(SIGTERM, fatal_signal_handler)
231

    
232
    # Monitor the Ganeti job queue, create and push notifications
233
    wm = pyinotify.WatchManager()
234
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
235
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
236
    handler = JobFileHandler(logger)
237
    notifier = pyinotify.Notifier(wm, handler)
238

    
239
    try:
240
        # Fail if adding the inotify() watch fails for any reason
241
        res = wm.add_watch(constants.QUEUE_DIR, mask)
242
        if res[constants.QUEUE_DIR] < 0:
243
            raise Exception("pyinotify add_watch returned negative descriptor")
244

    
245
        logger.info("Now watching %s" % constants.QUEUE_DIR)
246

    
247
        while True:    # loop forever
248
            # process the queue of events as explained above
249
            notifier.process_events()
250
            if notifier.check_events():
251
                # read notified events and enqeue them
252
                notifier.read_events()
253
    except SystemExit:
254
        logger.info("SystemExit")
255
    except:
256
        logger.exception("Caught exception, terminating")
257
    finally:
258
        # destroy the inotify's instance on this interrupt (stop monitoring)
259
        notifier.stop()
260
        raise
261

    
262
if __name__ == "__main__":
263
    sys.exit(main())
264

    
265
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :