Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 1dc821c9

History | View | Annotate | Download (12.9 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils
60
from ganeti import jqueue
61
from ganeti import constants
62
from ganeti import serializer
63
from ganeti.cli import GetClient
64

    
65
from synnefo import settings
66
from synnefo.lib.amqp import AMQPClient
67

    
68

    
69
def get_time_from_status(op, job):
70
    """Generate a unique message identifier for a ganeti job.
71

72
    The identifier is based on the timestamp of the job. Since a ganeti
73
    job passes from multiple states, we need to pick the timestamp that
74
    corresponds to each state.
75

76
    """
77
    status = op.status
78
    if status == constants.JOB_STATUS_QUEUED:
79
        return job.received_timestamp
80
    try:  # Compatibility with Ganeti version
81
        if status == constants.JOB_STATUS_WAITLOCK:
82
            return op.start_timestamp
83
    except AttributeError:
84
        if status == constants.JOB_STATUS_WAITING:
85
            return op.start_timestamp
86
    if status == constants.JOB_STATUS_CANCELING:
87
        return op.start_timestamp
88
    if status == constants.JOB_STATUS_RUNNING:
89
        return op.exec_timestamp
90
    if status in constants.JOBS_FINALIZED:
91
        if op.end_timestamp:
92
            return op.end_timestamp
93
        else:
94
            # Error opcodes do not always have end timestamp
95
            return job.end_timestamp
96

    
97
    raise InvalidBackendStatus(status, job)
98

    
99

    
100
class InvalidBackendStatus(Exception):
101
    def __init__(self, status, job):
102
        self.status = status
103
        self.job = job
104

    
105
    def __str__(self):
106
        return repr("Invalid backend status: %s in job %s"
107
                    % (self.status, self.job))
108

    
109

    
110
def prefix_from_name(name):
111
    return name.split('-')[0]
112

    
113

    
114
def get_field(from_, field):
115
    try:
116
        return getattr(from_, field)
117
    except AttributeError:
118
        None
119

    
120

    
121
class JobFileHandler(pyinotify.ProcessEvent):
122
    def __init__(self, logger, cluster_name):
123
        pyinotify.ProcessEvent.__init__(self)
124
        self.logger = logger
125
        self.cluster_name = cluster_name
126

    
127
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25)
128
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
129

    
130
        self.client.connect()
131
        handler_logger.info("Connected succesfully")
132

    
133
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
134

    
135
        self.op_handlers = {"INSTANCE": self.process_instance_op,
136
                            "NETWORK": self.process_network_op}
137
                            # "GROUP": self.process_group_op}
138

    
139
    def process_IN_CLOSE_WRITE(self, event):
140
        self.process_IN_MOVED_TO(event)
141

    
142
    def process_IN_MOVED_TO(self, event):
143
        jobfile = os.path.join(event.path, event.name)
144
        if not event.name.startswith("job-"):
145
            self.logger.debug("Not a job file: %s" % event.path)
146
            return
147

    
148
        try:
149
            data = utils.ReadFile(jobfile)
150
        except IOError:
151
            return
152

    
153
        data = serializer.LoadJson(data)
154
        try:  # Compatibility with Ganeti version
155
            job = jqueue._QueuedJob.Restore(None, data, False)
156
        except TypeError:
157
            job = jqueue._QueuedJob.Restore(None, data)
158

    
159
        job_id = int(job.id)
160

    
161
        for op in job.ops:
162
            op_id = op.input.OP_ID
163

    
164
            msg = None
165
            try:
166
                handler_fn = self.op_handlers[op_id.split('_')[1]]
167
                msg, routekey = handler_fn(op, job_id)
168
            except KeyError:
169
                pass
170

    
171
            if not msg:
172
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
173
                continue
174

    
175
            # Generate a unique message identifier
176
            event_time = get_time_from_status(op, job)
177

    
178
            # Get the last line of the op log as message
179
            try:
180
                logmsg = op.log[-1][-1]
181
            except IndexError:
182
                logmsg = None
183

    
184
            # Add shared attributes for all operations
185
            msg.update({"event_time": event_time,
186
                        "operation": op_id,
187
                        "status": op.status,
188
                        "cluster": self.cluster_name,
189
                        "logmsg": logmsg,
190
                        "jobId": job_id})
191

    
192
            msg = json.dumps(msg)
193
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
194

    
195
            # Send the message to RabbitMQ
196
            self.client.basic_publish(settings.EXCHANGE_GANETI,
197
                                      routekey,
198
                                      msg)
199

    
200
    def process_instance_op(self, op, job_id):
201
        """ Process OP_INSTANCE_* opcodes.
202

203
        """
204
        input = op.input
205
        op_id = input.OP_ID
206

    
207
        instances = None
208
        instances = get_field(input, 'instance_name')
209
        if not instances:
210
            instances = get_field(input, 'instances')
211
            if not instances or len(instances) > 1:
212
                # Do not publish messages for jobs with no or multiple
213
                # instances.
214
                # Currently snf-dispatcher can not normally handle these messages
215
                return None, None
216
            else:
217
                instances = instances[0]
218

    
219
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
220
                          instances, op.status)
221

    
222
        msg = {"type": "ganeti-op-status",
223
               "instance": instances,
224
               "operation": op_id}
225

    
226
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
227

    
228
        return msg, routekey
229

    
230
    def process_network_op(self, op, job_id):
231
        """ Process OP_NETWORK_* opcodes.
232

233
        """
234

    
235
        input = op.input
236
        op_id = input.OP_ID
237
        network_name = get_field(input, 'network_name')
238

    
239
        if not network_name:
240
            return None, None
241

    
242
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
243
                          network_name, op.status)
244

    
245
        msg = {'operation':    op_id,
246
               'type':         "ganeti-network-status",
247
               'network':      network_name,
248
               'subnet':       get_field(input, 'network'),
249
               # 'network_mode': get_field(input, 'network_mode'),
250
               # 'network_link': get_field(input, 'network_link'),
251
               'gateway':      get_field(input, 'gateway'),
252
               # 'reserved_ips': get_field(input, 'reserved_ips'),
253
               'group_name':   get_field(input, 'group_name')}
254

    
255
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
256

    
257
        return msg, routekey
258

    
259

    
260
    # def process_group_op(self, op, job_id):
261
    #     """ Process OP_GROUP_* opcodes.
262

    
263
    #     """
264
    #     return None, None
265

    
266

    
267

    
268
def find_cluster_name():
269
    global handler_logger
270
    try:
271
        cl = GetClient()
272
        name = cl.QueryClusterInfo()['name']
273
    except Exception as e:
274
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
275
        raise e
276

    
277
    return name
278

    
279
handler_logger = None
280
def fatal_signal_handler(signum, frame):
281
    global handler_logger
282

    
283
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
284
                        signum)
285
    raise SystemExit
286

    
287
def parse_arguments(args):
288
    from optparse import OptionParser
289

    
290
    parser = OptionParser()
291
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
292
                      help="Enable debugging information")
293
    parser.add_option("-l", "--log", dest="log_file",
294
                      default="/var/log/snf-ganeti-eventd.log",
295
                      metavar="FILE",
296
                      help="Write log to FILE instead of %s" %
297
                          "/var/log/snf-ganeti-eventd.log")
298
    parser.add_option('--pid-file', dest="pid_file",
299
                      default="/var/run/snf-ganeti-eventd.pid",
300
                      metavar='PIDFILE',
301
                      help="Save PID to file (default: %s)" %
302
                          "/var/run/snf-ganeti-eventd.pid")
303

    
304
    return parser.parse_args(args)
305

    
306
def main():
307
    global handler_logger
308

    
309
    (opts, args) = parse_arguments(sys.argv[1:])
310

    
311

    
312
    # Initialize logger
313
    lvl = logging.DEBUG if opts.debug else logging.INFO
314
    logger = logging.getLogger("ganeti.eventd")
315
    logger.setLevel(lvl)
316
    formatter = logging.Formatter(
317
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
318
        "%Y-%m-%d %H:%M:%S")
319
    handler = logging.FileHandler(opts.log_file)
320
    handler.setFormatter(formatter)
321
    logger.addHandler(handler)
322
    handler_logger = logger
323

    
324
    # Rename this process so 'ps' output looks like this is a native
325
    # executable.  Can not seperate command-line arguments from actual name of
326
    # the executable by NUL bytes, so only show the name of the executable
327
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
328
    setproctitle.setproctitle(sys.argv[0])
329

    
330
    # Create pidfile
331
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
332

    
333
    # Remove any stale PID files, left behind by previous invocations
334
    if daemon.runner.is_pidfile_stale(pidf):
335
        logger.warning("Removing stale PID lock file %s", pidf.path)
336
        pidf.break_lock()
337

    
338
    # Become a daemon:
339
    # Redirect stdout and stderr to handler.stream to catch
340
    # early errors in the daemonization process [e.g., pidfile creation]
341
    # which will otherwise go to /dev/null.
342
    daemon_context = daemon.DaemonContext(
343
            pidfile=pidf,
344
            umask=022,
345
            stdout=handler.stream,
346
            stderr=handler.stream,
347
            files_preserve=[handler.stream])
348
    try:
349
        daemon_context.open()
350
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
351
        logger.critical("Failed to lock pidfile %s, another instance running?",
352
                        pidf.path)
353
        sys.exit(1)
354

    
355
    logger.info("Became a daemon")
356

    
357
    # Catch signals to ensure graceful shutdown
358
    signal(SIGINT, fatal_signal_handler)
359
    signal(SIGTERM, fatal_signal_handler)
360

    
361
    # Monitor the Ganeti job queue, create and push notifications
362
    wm = pyinotify.WatchManager()
363
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
364
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
365

    
366
    cluster_name = find_cluster_name()
367

    
368
    handler = JobFileHandler(logger, cluster_name)
369
    notifier = pyinotify.Notifier(wm, handler)
370

    
371
    try:
372
        # Fail if adding the inotify() watch fails for any reason
373
        res = wm.add_watch(constants.QUEUE_DIR, mask)
374
        if res[constants.QUEUE_DIR] < 0:
375
            raise Exception("pyinotify add_watch returned negative descriptor")
376

    
377
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
378
                cluster_name))
379

    
380
        while True:    # loop forever
381
            # process the queue of events as explained above
382
            notifier.process_events()
383
            if notifier.check_events():
384
                # read notified events and enqeue them
385
                notifier.read_events()
386
    except SystemExit:
387
        logger.info("SystemExit")
388
    except:
389
        logger.exception("Caught exception, terminating")
390
    finally:
391
        # destroy the inotify's instance on this interrupt (stop monitoring)
392
        notifier.stop()
393
        raise
394

    
395
if __name__ == "__main__":
396
    sys.exit(main())
397

    
398
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :