Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 07e4ab22

History | View | Annotate | Download (9.6 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import socket
55
from signal import signal, SIGINT, SIGTERM
56

    
57
from ganeti import utils
58
from ganeti import jqueue
59
from ganeti import constants
60
from ganeti import serializer
61

    
62
from synnefo import settings
63
from synnefo.lib.amqp import AMQPClient
64

    
65
def get_time_from_status(op, job):
66
    """Generate a unique message identifier for a ganeti job.
67

68
    The identifier is based on the timestamp of the job. Since a ganeti
69
    job passes from multiple states, we need to pick the timestamp that
70
    corresponds to each state.
71

72
    """
73
    status = op.status
74
    if status == constants.JOB_STATUS_QUEUED:
75
        return job.received_timestamp
76
    if status == constants.JOB_STATUS_WAITLOCK:
77
    #if status == constants.JOB_STATUS_WAITING:
78
        return op.start_timestamp
79
    if status == constants.JOB_STATUS_CANCELING:
80
        return op.start_timestamp
81
    if status == constants.JOB_STATUS_RUNNING:
82
        return op.exec_timestamp
83
    if status in constants.JOBS_FINALIZED:
84
        # success, canceled, error
85
        return op.end_timestamp
86

    
87
    raise InvalidBackendState(status, job)
88

    
89

    
90
class InvalidBackendStatus(Exception):
91
    def __init__(self, status, job):
92
        self.status = status
93
        self.job = job
94

    
95
    def __str__(self):
96
        return repr("Invalid backend status: %s in job %s"
97
                    % (self.status, self.job))
98

    
99

    
100
class JobFileHandler(pyinotify.ProcessEvent):
101
    def __init__(self, logger):
102
        pyinotify.ProcessEvent.__init__(self)
103
        self.logger = logger
104
        self.client = AMQPClient()
105
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
106
        self.client.connect()
107
        handler_logger.info("Connected succesfully")
108

    
109
    def process_IN_CLOSE_WRITE(self, event):
110
        self.process_IN_MOVED_TO(event)
111

    
112
    def process_IN_MOVED_TO(self, event):
113
        jobfile = os.path.join(event.path, event.name)
114
        if not event.name.startswith("job-"):
115
            self.logger.debug("Not a job file: %s" % event.path)
116
            return
117

    
118
        try:
119
            data = utils.ReadFile(jobfile)
120
        except IOError:
121
            return
122

    
123
        data = serializer.LoadJson(data)
124
        try: # Version compatibility issue with Ganeti
125
            job = jqueue._QueuedJob.Restore(None, data, False)
126
        except TypeError:
127
            job = jqueue._QueuedJob.Restore(None, data)
128

    
129

    
130
        for op in job.ops:
131
            instances = None
132
            try:
133
                instances = " ".join(op.input.instances)
134
            except AttributeError:
135
                pass
136

    
137
            try:
138
                instances = op.input.instance_name
139
            except AttributeError:
140
                pass
141

    
142
            if not instances or len(instances.split(" ")) != 1:
143
                # Do not publish messages for jobs with no or multiple
144
                # instances.
145
                # Currently snf-dispatcher can not normally handle these messages
146
                self.logger.debug("Ignoring Job: %d: %s(%s)", int(job.id),
147
                                  op.input.OP_ID, instances)
148
                continue
149

    
150
            # Get the last line of the op log as message
151
            try:
152
                logmsg = op.log[-1][-1]
153
            except IndexError:
154
                logmsg = None
155

    
156
            # Generate a unique message identifier
157
            event_time = get_time_from_status(op, job)
158

    
159
            self.logger.debug("Job: %d: %s(%s) %s %s",
160
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
161

    
162
            # Construct message
163
            msg = {
164
                    "event_time" : event_time,
165
                    "type": "ganeti-op-status",
166
                    "instance": instances,
167
                    "operation": op.input.OP_ID,
168
                    "jobId": int(job.id),
169
                    "status": op.status,
170
                    "logmsg": logmsg
171
                    }
172

    
173
            instance_prefix = instances.split('-')[0]
174
            routekey = "ganeti.%s.event.op" % instance_prefix
175

    
176
            self.logger.debug("Delivering msg: %s (key=%s)", json.dumps(msg),
177
                              routekey)
178
            msg = json.dumps(msg)
179

    
180
            # Send the message to RabbitMQ
181
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
182
                                      routing_key=routekey,
183
                                      body=msg)
184

    
185
handler_logger = None
186
def fatal_signal_handler(signum, frame):
187
    global handler_logger
188

    
189
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
190
                        signum)
191
    raise SystemExit
192

    
193
def parse_arguments(args):
194
    from optparse import OptionParser
195

    
196
    parser = OptionParser()
197
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
198
                      help="Enable debugging information")
199
    parser.add_option("-l", "--log", dest="log_file",
200
                      default="/var/log/snf-ganeti-eventd.log",
201
                      metavar="FILE",
202
                      help="Write log to FILE instead of %s" %
203
                          "/var/log/snf-ganeti-eventd.log")
204
    parser.add_option('--pid-file', dest="pid_file",
205
                      default="/var/run/snf-ganeti-eventd.pid",
206
                      metavar='PIDFILE',
207
                      help="Save PID to file (default: %s)" %
208
                          "/var/run/snf-ganeti-eventd.pid")
209

    
210
    return parser.parse_args(args)
211

    
212
def main():
213
    global handler_logger
214

    
215
    (opts, args) = parse_arguments(sys.argv[1:])
216

    
217
    # Create pidfile
218
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
219

    
220
    # Initialize logger
221
    lvl = logging.DEBUG if opts.debug else logging.INFO
222
    logger = logging.getLogger("ganeti.eventd")
223
    logger.setLevel(lvl)
224
    formatter = logging.Formatter(
225
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
226
        "%Y-%m-%d %H:%M:%S")
227
    handler = logging.FileHandler(opts.log_file)
228
    handler.setFormatter(formatter)
229
    logger.addHandler(handler)
230
    handler_logger = logger
231

    
232
    # Become a daemon:
233
    # Redirect stdout and stderr to handler.stream to catch
234
    # early errors in the daemonization process [e.g., pidfile creation]
235
    # which will otherwise go to /dev/null.
236
    daemon_context = daemon.DaemonContext(
237
            pidfile=pidf,
238
            umask=022,
239
            stdout=handler.stream,
240
            stderr=handler.stream,
241
            files_preserve=[handler.stream])
242
    daemon_context.open()
243
    logger.info("Became a daemon")
244

    
245
    # Catch signals to ensure graceful shutdown
246
    signal(SIGINT, fatal_signal_handler)
247
    signal(SIGTERM, fatal_signal_handler)
248

    
249
    # Monitor the Ganeti job queue, create and push notifications
250
    wm = pyinotify.WatchManager()
251
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
252
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
253
    handler = JobFileHandler(logger)
254
    notifier = pyinotify.Notifier(wm, handler)
255

    
256
    try:
257
        # Fail if adding the inotify() watch fails for any reason
258
        res = wm.add_watch(constants.QUEUE_DIR, mask)
259
        if res[constants.QUEUE_DIR] < 0:
260
            raise Exception("pyinotify add_watch returned negative descriptor")
261

    
262
        logger.info("Now watching %s" % constants.QUEUE_DIR)
263

    
264
        while True:    # loop forever
265
            # process the queue of events as explained above
266
            notifier.process_events()
267
            if notifier.check_events():
268
                # read notified events and enqeue them
269
                notifier.read_events()
270
    except SystemExit:
271
        logger.info("SystemExit")
272
    except:
273
        logger.exception("Caught exception, terminating")
274
    finally:
275
        # destroy the inotify's instance on this interrupt (stop monitoring)
276
        notifier.stop()
277
        raise
278

    
279
if __name__ == "__main__":
280
    sys.exit(main())
281

    
282
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :