Statistics
| Branch: | Tag: | Revision:

root / snf-ganeti-tools / synnefo / ganeti / eventd.py @ 45ebfd48

History | View | Annotate | Download (9 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import time
50
import json
51
import logging
52
import pyinotify
53
import daemon
54
import daemon.pidlockfile
55
import socket
56
from signal import signal, SIGINT, SIGTERM
57

    
58
from amqplib import client_0_8 as amqp
59

    
60
from ganeti import utils
61
from ganeti import jqueue
62
from ganeti import constants
63
from ganeti import serializer
64

    
65
# XXX: this should be probably also moved to a command-line argument
66
try:
67
    conf_dir = os.environ["SYNNEFO_CONFIG_DIR"]
68
    import config
69
    settings = config.load(conf_dir)
70
except KeyError:
71
    import synnefo.settings as settings
72

    
73

    
74
class JobFileHandler(pyinotify.ProcessEvent):
75
    def __init__(self, logger):
76
        pyinotify.ProcessEvent.__init__(self)
77
        self.logger = logger
78
        self.chan = None
79

    
80
    def open_channel(self):
81
        conn = None
82
        while conn == None:
83
            handler_logger.info("Attempting to connect to %s",
84
                settings.RABBIT_HOST)
85
            try:
86
                conn = amqp.Connection(host=settings.RABBIT_HOST,
87
                     userid=settings.RABBIT_USERNAME,
88
                     password=settings.RABBIT_PASSWORD,
89
                     virtual_host=settings.RABBIT_VHOST)
90
            except socket.error:
91
                time.sleep(1)
92

    
93
        handler_logger.info("Connection succesful, opening channel")
94
        return conn.channel()
95

    
96
    def process_IN_CLOSE_WRITE(self, event):
97
        self.process_IN_MOVED_TO(event)
98

    
99
    def process_IN_MOVED_TO(self, event):
100
        if self.chan == None:
101
            self.chan = self.open_channel()
102

    
103
        jobfile = os.path.join(event.path, event.name)
104
        if not event.name.startswith("job-"):
105
            self.logger.debug("Not a job file: %s" % event.path)
106
            return
107

    
108
        try:
109
            data = utils.ReadFile(jobfile)
110
        except IOError:
111
            return
112

    
113
        data = serializer.LoadJson(data)
114
        job = jqueue._QueuedJob.Restore(None, data)
115

    
116
        for op in job.ops:
117
            instances = ""
118
            try:
119
                instances = " ".join(op.input.instances)
120
            except AttributeError:
121
                pass
122

    
123
            try:
124
                instances = op.input.instance_name
125
            except AttributeError:
126
                pass
127

    
128
            # Get the last line of the op log as message
129
            try:
130
                logmsg = op.log[-1][-1]
131
            except IndexError:
132
                logmsg = None
133

    
134
            self.logger.debug("Job: %d: %s(%s) %s %s",
135
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
136

    
137
            # Construct message
138
            msg = {
139
                    "type": "ganeti-op-status",
140
                    "instance": instances,
141
                    "operation": op.input.OP_ID,
142
                    "jobId": int(job.id),
143
                    "status": op.status,
144
                    "logmsg": logmsg
145
                    }
146
            if logmsg:
147
                msg["message"] = logmsg
148

    
149
            instance = instances.split('-')[0]
150
            routekey = "ganeti.%s.event.op" % instance
151

    
152
            self.logger.debug("Delivering msg: %s (key=%s)",
153
                json.dumps(msg), routekey)
154
            msg = amqp.Message(json.dumps(msg))
155
            msg.properties["delivery_mode"] = 2  # Persistent
156

    
157
            while True:
158
                try:
159
                    self.chan.basic_publish(msg,
160
                            exchange=settings.EXCHANGE_GANETI,
161
                            routing_key=routekey)
162
                    return
163
                except socket.error:
164
                    self.logger.exception("Server went away, reconnecting...")
165
                    self.chan = self.open_channel()
166
                except Exception:
167
                    self.logger.exception("Caught unexpected exception, msg: ",
168
                                          msg)
169
                    raise
170

    
171
handler_logger = None
172
def fatal_signal_handler(signum, frame):
173
    global handler_logger
174

    
175
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
176
                        signum)
177
    raise SystemExit
178

    
179
def parse_arguments(args):
180
    from optparse import OptionParser
181

    
182
    parser = OptionParser()
183
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
184
                      help="Enable debugging information")
185
    parser.add_option("-l", "--log", dest="log_file",
186
                      default="/var/log/snf-ganeti-eventd.log",
187
                      metavar="FILE",
188
                      help="Write log to FILE instead of %s" %
189
                          "/var/log/snf-ganeti-eventd.log")
190
    parser.add_option('--pid-file', dest="pid_file",
191
                      default="/var/run/snf-ganeti-eventd.pid",
192
                      metavar='PIDFILE',
193
                      help="Save PID to file (default: %s)" %
194
                          "/var/run/snf-ganeti-eventd.pid")
195

    
196
    return parser.parse_args(args)
197

    
198
def main():
199
    global handler_logger
200

    
201
    (opts, args) = parse_arguments(sys.argv[1:])
202

    
203
    # Create pidfile
204
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
205

    
206
    # Initialize logger
207
    lvl = logging.DEBUG if opts.debug else logging.INFO
208
    logger = logging.getLogger("ganeti.eventd")
209
    logger.setLevel(lvl)
210
    formatter = logging.Formatter(
211
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
212
        "%Y-%m-%d %H:%M:%S")
213
    handler = logging.FileHandler(opts.log_file)
214
    handler.setFormatter(formatter)
215
    logger.addHandler(handler)
216
    handler_logger = logger
217

    
218
    # Become a daemon:
219
    # Redirect stdout and stderr to handler.stream to catch
220
    # early errors in the daemonization process [e.g., pidfile creation]
221
    # which will otherwise go to /dev/null.
222
    daemon_context = daemon.DaemonContext(
223
            pidfile=pidf,
224
            umask=022,
225
            stdout=handler.stream,
226
            stderr=handler.stream,
227
            files_preserve=[handler.stream])
228
    daemon_context.open()
229
    logger.info("Became a daemon")
230

    
231
    # Catch signals to ensure graceful shutdown
232
    signal(SIGINT, fatal_signal_handler)
233
    signal(SIGTERM, fatal_signal_handler)
234

    
235
    # Monitor the Ganeti job queue, create and push notifications
236
    wm = pyinotify.WatchManager()
237
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
238
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
239
    handler = JobFileHandler(logger)
240
    notifier = pyinotify.Notifier(wm, handler)
241

    
242
    try:
243
        # Fail if adding the inotify() watch fails for any reason
244
        res = wm.add_watch(constants.QUEUE_DIR, mask)
245
        if res[constants.QUEUE_DIR] < 0:
246
            raise Exception("pyinotify add_watch returned negative descriptor")
247

    
248
        logger.info("Now watching %s" % constants.QUEUE_DIR)
249

    
250
        while True:    # loop forever
251
            # process the queue of events as explained above
252
            notifier.process_events()
253
            if notifier.check_events():
254
                # read notified events and enqeue them
255
                notifier.read_events()
256
    except SystemExit:
257
        logger.info("SystemExit")
258
    except:
259
        logger.exception("Caught exception, terminating")
260
    finally:
261
        # destroy the inotify's instance on this interrupt (stop monitoring)
262
        notifier.stop()
263
        raise
264

    
265
if __name__ == "__main__":
266
    sys.exit(main())
267

    
268
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :