Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 39ccdb18

History | View | Annotate | Download (11.5 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import socket
55
from signal import signal, SIGINT, SIGTERM
56

    
57
from ganeti import utils
58
from ganeti import jqueue
59
from ganeti import constants
60
from ganeti import serializer
61

    
62
from synnefo import settings
63
from synnefo.lib.amqp import AMQPClient
64

    
65
def get_time_from_status(op, job):
66
    """Generate a unique message identifier for a ganeti job.
67

68
    The identifier is based on the timestamp of the job. Since a ganeti
69
    job passes from multiple states, we need to pick the timestamp that
70
    corresponds to each state.
71

72
    """
73
    status = op.status
74
    if status == constants.JOB_STATUS_QUEUED:
75
        return job.received_timestamp
76
    try: # Compatibility with Ganeti version
77
        if status == constants.JOB_STATUS_WAITLOCK:
78
            return op.start_timestamp
79
    except AttributeError:
80
        if status == constants.JOB_STATUS_WAITING:
81
            return op.start_timestamp
82
    if status == constants.JOB_STATUS_CANCELING:
83
        return op.start_timestamp
84
    if status == constants.JOB_STATUS_RUNNING:
85
        return op.exec_timestamp
86
    if status in constants.JOBS_FINALIZED:
87
        # success, canceled, error
88
        return op.end_timestamp
89

    
90
    raise InvalidBackendState(status, job)
91

    
92

    
93
class InvalidBackendStatus(Exception):
94
    def __init__(self, status, job):
95
        self.status = status
96
        self.job = job
97

    
98
    def __str__(self):
99
        return repr("Invalid backend status: %s in job %s"
100
                    % (self.status, self.job))
101

    
102
def prefix_from_name(name):
103
    return name.split('-')[0]
104

    
105

    
106
def get_field(from_, field):
107
    try:
108
        return getattr(from_, field)
109
    except AttributeError:
110
        None
111

    
112

    
113
class JobFileHandler(pyinotify.ProcessEvent):
114
    def __init__(self, logger):
115
        pyinotify.ProcessEvent.__init__(self)
116
        self.logger = logger
117
        self.client = AMQPClient()
118
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
119
        self.client.connect()
120
        handler_logger.info("Connected succesfully")
121

    
122
        self.op_handlers = {"INSTANCE": self.process_instance_op,
123
                            "NETWORK": self.process_network_op}
124
                            # "GROUP": self.process_group_op}
125

    
126
    def process_IN_CLOSE_WRITE(self, event):
127
        self.process_IN_MOVED_TO(event)
128

    
129
    def process_IN_MOVED_TO(self, event):
130
        jobfile = os.path.join(event.path, event.name)
131
        if not event.name.startswith("job-"):
132
            self.logger.debug("Not a job file: %s" % event.path)
133
            return
134

    
135
        try:
136
            data = utils.ReadFile(jobfile)
137
        except IOError:
138
            return
139

    
140
        data = serializer.LoadJson(data)
141
        try: # Compatibility with Ganeti version
142
            job = jqueue._QueuedJob.Restore(None, data, False)
143
        except TypeError:
144
            job = jqueue._QueuedJob.Restore(None, data)
145

    
146
        job_id = int(job.id)
147

    
148
        for op in job.ops:
149
            op_id = op.input.OP_ID
150

    
151
            msg = None
152
            try:
153
                handler_fn = self.op_handlers[op_id.split('_')[1]]
154
                msg, routekey = handler_fn(op, job_id)
155
            except KeyError:
156
                pass
157

    
158
            if not msg:
159
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
160
                continue
161

    
162
            # Generate a unique message identifier
163
            event_time = get_time_from_status(op, job)
164

    
165
            # Get the last line of the op log as message
166
            try:
167
                logmsg = op.log[-1][-1]
168
            except IndexError:
169
                logmsg = None
170

    
171
            # Add shared attributes for all operations
172
            msg.update({"event_time": event_time,
173
                        "operation": op_id,
174
                        "status": op.status,
175
                        "logmsg": logmsg,
176
                        "jobId": job_id})
177

    
178
            msg = json.dumps(msg)
179
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
180

    
181
            # Send the message to RabbitMQ
182
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
183
                                      routing_key=routekey,
184
                                      body=msg)
185

    
186

    
187
    def process_instance_op(self, op, job_id):
188
        """ Process OP_INSTANCE_* opcodes.
189

190
        """
191
        input = op.input
192
        op_id = input.OP_ID
193

    
194
        instances = None
195
        instances = get_field(input, 'instance_name')
196
        if not instances:
197
            instances = get_field(input, 'instances')
198
            if not instances or len(instances) > 1:
199
                # Do not publish messages for jobs with no or multiple
200
                # instances.
201
                # Currently snf-dispatcher can not normally handle these messages
202
                return None, None
203
            else:
204
                instances = instances[0]
205

    
206
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
207
                          instances, op.status)
208

    
209
        msg = {"type": "ganeti-op-status",
210
               "instance": instances,
211
               "operation": op_id}
212

    
213
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
214

    
215
        return msg, routekey
216

    
217
    def process_network_op(self, op, job_id):
218
        """ Process OP_NETWORK_* opcodes.
219

220
        """
221

    
222
        input = op.input
223
        op_id = input.OP_ID
224
        network_name = get_field(input, 'network_name')
225

    
226
        if not network_name:
227
            return None, None
228

    
229
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
230
                          network_name, op.status)
231

    
232
        msg = {'operation':    op_id,
233
               'type':         "ganeti-network-status",
234
               'network':      network_name,
235
               'subnet':       get_field(input, 'network'),
236
               # 'network_mode': get_field(input, 'network_mode'),
237
               # 'network_link': get_field(input, 'network_link'),
238
               'gateway':      get_field(input, 'gateway'),
239
               # 'reserved_ips': get_field(input, 'reserved_ips'),
240
               'group_name':   get_field(input, 'group_name')}
241

    
242
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
243

    
244
        return msg, routekey
245

    
246

    
247
    # def process_group_op(self, op, job_id):
248
    #     """ Process OP_GROUP_* opcodes.
249

    
250
    #     """
251
    #     return None, None
252

    
253

    
254

    
255

    
256

    
257
handler_logger = None
258
def fatal_signal_handler(signum, frame):
259
    global handler_logger
260

    
261
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
262
                        signum)
263
    raise SystemExit
264

    
265
def parse_arguments(args):
266
    from optparse import OptionParser
267

    
268
    parser = OptionParser()
269
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
270
                      help="Enable debugging information")
271
    parser.add_option("-l", "--log", dest="log_file",
272
                      default="/var/log/snf-ganeti-eventd.log",
273
                      metavar="FILE",
274
                      help="Write log to FILE instead of %s" %
275
                          "/var/log/snf-ganeti-eventd.log")
276
    parser.add_option('--pid-file', dest="pid_file",
277
                      default="/var/run/snf-ganeti-eventd.pid",
278
                      metavar='PIDFILE',
279
                      help="Save PID to file (default: %s)" %
280
                          "/var/run/snf-ganeti-eventd.pid")
281

    
282
    return parser.parse_args(args)
283

    
284
def main():
285
    global handler_logger
286

    
287
    (opts, args) = parse_arguments(sys.argv[1:])
288

    
289
    # Create pidfile
290
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
291

    
292
    # Initialize logger
293
    lvl = logging.DEBUG if opts.debug else logging.INFO
294
    logger = logging.getLogger("ganeti.eventd")
295
    logger.setLevel(lvl)
296
    formatter = logging.Formatter(
297
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
298
        "%Y-%m-%d %H:%M:%S")
299
    handler = logging.FileHandler(opts.log_file)
300
    handler.setFormatter(formatter)
301
    logger.addHandler(handler)
302
    handler_logger = logger
303

    
304
    # Become a daemon:
305
    # Redirect stdout and stderr to handler.stream to catch
306
    # early errors in the daemonization process [e.g., pidfile creation]
307
    # which will otherwise go to /dev/null.
308
    daemon_context = daemon.DaemonContext(
309
            pidfile=pidf,
310
            umask=022,
311
            stdout=handler.stream,
312
            stderr=handler.stream,
313
            files_preserve=[handler.stream])
314
    daemon_context.open()
315
    logger.info("Became a daemon")
316

    
317
    # Catch signals to ensure graceful shutdown
318
    signal(SIGINT, fatal_signal_handler)
319
    signal(SIGTERM, fatal_signal_handler)
320

    
321
    # Monitor the Ganeti job queue, create and push notifications
322
    wm = pyinotify.WatchManager()
323
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
324
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
325
    handler = JobFileHandler(logger)
326
    notifier = pyinotify.Notifier(wm, handler)
327

    
328
    try:
329
        # Fail if adding the inotify() watch fails for any reason
330
        res = wm.add_watch(constants.QUEUE_DIR, mask)
331
        if res[constants.QUEUE_DIR] < 0:
332
            raise Exception("pyinotify add_watch returned negative descriptor")
333

    
334
        logger.info("Now watching %s" % constants.QUEUE_DIR)
335

    
336
        while True:    # loop forever
337
            # process the queue of events as explained above
338
            notifier.process_events()
339
            if notifier.check_events():
340
                # read notified events and enqeue them
341
                notifier.read_events()
342
    except SystemExit:
343
        logger.info("SystemExit")
344
    except:
345
        logger.exception("Caught exception, terminating")
346
    finally:
347
        # destroy the inotify's instance on this interrupt (stop monitoring)
348
        notifier.stop()
349
        raise
350

    
351
if __name__ == "__main__":
352
    sys.exit(main())
353

    
354
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :