Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 4ed30eed

History | View | Annotate | Download (9 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import time
50
import json
51
import logging
52
import pyinotify
53
import daemon
54
import daemon.pidlockfile
55
import socket
56
from signal import signal, SIGINT, SIGTERM
57

    
58
from amqplib import client_0_8 as amqp
59

    
60
from ganeti import utils
61
from ganeti import jqueue
62
from ganeti import constants
63
from ganeti import serializer
64

    
65
from synnefo import settings
66

    
67
class JobFileHandler(pyinotify.ProcessEvent):
68
    def __init__(self, logger):
69
        pyinotify.ProcessEvent.__init__(self)
70
        self.logger = logger
71
        self.chan = None
72

    
73
    def open_channel(self):
74
        conn = None
75
        while conn == None:
76
            handler_logger.info("Attempting to connect to %s",
77
                settings.RABBIT_HOST)
78
            try:
79
                conn = amqp.Connection(host=settings.RABBIT_HOST,
80
                     userid=settings.RABBIT_USERNAME,
81
                     password=settings.RABBIT_PASSWORD,
82
                     virtual_host=settings.RABBIT_VHOST)
83
            except socket.error:
84
                time.sleep(1)
85

    
86
        handler_logger.info("Connection succesful, opening channel")
87
        return conn.channel()
88

    
89
    def process_IN_CLOSE_WRITE(self, event):
90
        self.process_IN_MOVED_TO(event)
91

    
92
    def process_IN_MOVED_TO(self, event):
93
        if self.chan == None:
94
            self.chan = self.open_channel()
95

    
96
        jobfile = os.path.join(event.path, event.name)
97
        if not event.name.startswith("job-"):
98
            self.logger.debug("Not a job file: %s" % event.path)
99
            return
100

    
101
        try:
102
            data = utils.ReadFile(jobfile)
103
        except IOError:
104
            return
105

    
106
        data = serializer.LoadJson(data)
107
        try: # Version compatibility issue with Ganeti
108
            job = jqueue._QueuedJob.Restore(None, data, False)
109
        except TypeError:
110
            job = jqueue._QueuedJob.Restore(None, data)
111

    
112

    
113
        for op in job.ops:
114
            instances = ""
115
            try:
116
                instances = " ".join(op.input.instances)
117
            except AttributeError:
118
                pass
119

    
120
            try:
121
                instances = op.input.instance_name
122
            except AttributeError:
123
                pass
124

    
125
            # Get the last line of the op log as message
126
            try:
127
                logmsg = op.log[-1][-1]
128
            except IndexError:
129
                logmsg = None
130

    
131
            self.logger.debug("Job: %d: %s(%s) %s %s",
132
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
133

    
134
            # Construct message
135
            msg = {
136
                    "type": "ganeti-op-status",
137
                    "instance": instances,
138
                    "operation": op.input.OP_ID,
139
                    "jobId": int(job.id),
140
                    "status": op.status,
141
                    "logmsg": logmsg
142
                    }
143
            if logmsg:
144
                msg["message"] = logmsg
145

    
146
            instance = instances.split('-')[0]
147
            routekey = "ganeti.%s.event.op" % instance
148

    
149
            self.logger.debug("Delivering msg: %s (key=%s)",
150
                json.dumps(msg), routekey)
151
            msg = amqp.Message(json.dumps(msg))
152
            msg.properties["delivery_mode"] = 2  # Persistent
153

    
154
            while True:
155
                try:
156
                    self.chan.basic_publish(msg,
157
                            exchange=settings.EXCHANGE_GANETI,
158
                            routing_key=routekey)
159
                    self.logger.debug("Message published to AMQP successfully")
160
                    return
161
                except socket.error:
162
                    self.logger.exception("Server went away, reconnecting...")
163
                    self.chan = self.open_channel()
164
                except Exception:
165
                    self.logger.exception("Caught unexpected exception, msg: ",
166
                                          msg)
167
                    raise
168

    
169
handler_logger = None
170
def fatal_signal_handler(signum, frame):
171
    global handler_logger
172

    
173
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
174
                        signum)
175
    raise SystemExit
176

    
177
def parse_arguments(args):
178
    from optparse import OptionParser
179

    
180
    parser = OptionParser()
181
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
182
                      help="Enable debugging information")
183
    parser.add_option("-l", "--log", dest="log_file",
184
                      default="/var/log/snf-ganeti-eventd.log",
185
                      metavar="FILE",
186
                      help="Write log to FILE instead of %s" %
187
                          "/var/log/snf-ganeti-eventd.log")
188
    parser.add_option('--pid-file', dest="pid_file",
189
                      default="/var/run/snf-ganeti-eventd.pid",
190
                      metavar='PIDFILE',
191
                      help="Save PID to file (default: %s)" %
192
                          "/var/run/snf-ganeti-eventd.pid")
193

    
194
    return parser.parse_args(args)
195

    
196
def main():
197
    global handler_logger
198

    
199
    (opts, args) = parse_arguments(sys.argv[1:])
200

    
201
    # Create pidfile
202
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
203

    
204
    # Initialize logger
205
    lvl = logging.DEBUG if opts.debug else logging.INFO
206
    logger = logging.getLogger("ganeti.eventd")
207
    logger.setLevel(lvl)
208
    formatter = logging.Formatter(
209
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
210
        "%Y-%m-%d %H:%M:%S")
211
    handler = logging.FileHandler(opts.log_file)
212
    handler.setFormatter(formatter)
213
    logger.addHandler(handler)
214
    handler_logger = logger
215

    
216
    # Become a daemon:
217
    # Redirect stdout and stderr to handler.stream to catch
218
    # early errors in the daemonization process [e.g., pidfile creation]
219
    # which will otherwise go to /dev/null.
220
    daemon_context = daemon.DaemonContext(
221
            pidfile=pidf,
222
            umask=022,
223
            stdout=handler.stream,
224
            stderr=handler.stream,
225
            files_preserve=[handler.stream])
226
    daemon_context.open()
227
    logger.info("Became a daemon")
228

    
229
    # Catch signals to ensure graceful shutdown
230
    signal(SIGINT, fatal_signal_handler)
231
    signal(SIGTERM, fatal_signal_handler)
232

    
233
    # Monitor the Ganeti job queue, create and push notifications
234
    wm = pyinotify.WatchManager()
235
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
236
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
237
    handler = JobFileHandler(logger)
238
    notifier = pyinotify.Notifier(wm, handler)
239

    
240
    try:
241
        # Fail if adding the inotify() watch fails for any reason
242
        res = wm.add_watch(constants.QUEUE_DIR, mask)
243
        if res[constants.QUEUE_DIR] < 0:
244
            raise Exception("pyinotify add_watch returned negative descriptor")
245

    
246
        logger.info("Now watching %s" % constants.QUEUE_DIR)
247

    
248
        while True:    # loop forever
249
            # process the queue of events as explained above
250
            notifier.process_events()
251
            if notifier.check_events():
252
                # read notified events and enqeue them
253
                notifier.read_events()
254
    except SystemExit:
255
        logger.info("SystemExit")
256
    except:
257
        logger.exception("Caught exception, terminating")
258
    finally:
259
        # destroy the inotify's instance on this interrupt (stop monitoring)
260
        notifier.stop()
261
        raise
262

    
263
if __name__ == "__main__":
264
    sys.exit(main())
265

    
266
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :