Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ e6fbada1

History | View | Annotate | Download (16.4 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
60
from ganeti import errors as ganeti_errors
61
from ganeti.ssconf import SimpleStore
62

    
63

    
64
from synnefo import settings
65
from synnefo.lib.amqp import AMQPClient
66

    
67

    
68
def get_time_from_status(op, job):
69
    """Generate a unique message identifier for a ganeti job.
70

71
    The identifier is based on the timestamp of the job. Since a ganeti
72
    job passes from multiple states, we need to pick the timestamp that
73
    corresponds to each state.
74

75
    """
76
    status = op.status
77
    if status == constants.JOB_STATUS_QUEUED:
78
        time = job.received_timestamp
79
    try:  # Compatibility with Ganeti version
80
        if status == constants.JOB_STATUS_WAITLOCK:
81
            time = op.start_timestamp
82
    except AttributeError:
83
        if status == constants.JOB_STATUS_WAITING:
84
            time = op.start_timestamp
85
    if status == constants.JOB_STATUS_CANCELING:
86
        time = op.start_timestamp
87
    if status == constants.JOB_STATUS_RUNNING:
88
        time = op.exec_timestamp
89
    if status in constants.JOBS_FINALIZED:
90
        time = op.end_timestamp
91

    
92
    return time and time or job.end_timestamp
93

    
94
    raise InvalidBackendStatus(status, job)
95

    
96

    
97
def get_instance_nics(instance, logger):
98
    """Query Ganeti to a get the instance's NICs.
99

100
    Get instance's NICs from Ganeti configuration data. If running on master,
101
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102
    configuration file.
103

104
    @type instance: string
105
    @param instance: the name of the instance
106
    @rtype: List of dicts
107
    @return: Dictionary containing the instance's NICs. Each dictionary
108
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109
             'link' and 'firewall'
110

111
    """
112
    try:
113
        client = cli.GetClient()
114
        fields = ["nic.networks.names", "nic.ips", "nic.macs", "nic.modes",
115
                  "nic.links", "tags"]
116
        info = client.QueryInstances([instance], fields, use_locking=False)
117
        networks, ips, macs, modes, links, tags = info[0]
118
        nic_keys = ["network", "ip", "mac", "mode", "link"]
119
        nics = zip(networks, ips, macs, modes, links)
120
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121
    except ganeti_errors.OpPrereqError:
122
        # Not running on master! Load the conf file
123
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124
        config = serializer.LoadJson(raw_data)
125
        i = config["instances"][instance]
126
        nics = []
127
        for nic in i["nics"]:
128
            params = nic.pop("nicparams")
129
            nic["mode"] = params["mode"]
130
            nic["link"] = params["link"]
131
            nics.append(nic)
132
        tags = i.get("tags", [])
133
    # Get firewall from instance Tags
134
    # Tags are of the form synnefo:network:N:firewall_mode
135
    for tag in tags:
136
        t = tag.split(":")
137
        if t[0:2] == ["synnefo", "network"]:
138
            if len(t) != 4:
139
                logger.error("Malformed synefo tag %s", tag)
140
                continue
141
            try:
142
                index = int(t[2])
143
                nics[index]['firewall'] = t[3]
144
            except ValueError:
145
                logger.error("Malformed synnefo tag %s", tag)
146
            except IndexError:
147
                logger.error("Found tag %s for non-existent NIC %d",
148
                             tag, index)
149
    return nics
150

    
151

    
152
class InvalidBackendStatus(Exception):
153
    def __init__(self, status, job):
154
        self.status = status
155
        self.job = job
156

    
157
    def __str__(self):
158
        return repr("Invalid backend status: %s in job %s"
159
                    % (self.status, self.job))
160

    
161

    
162
def prefix_from_name(name):
163
    return name.split('-')[0]
164

    
165

    
166
def get_field(from_, field):
167
    try:
168
        return getattr(from_, field)
169
    except AttributeError:
170
        None
171

    
172

    
173
class JobFileHandler(pyinotify.ProcessEvent):
174
    def __init__(self, logger, cluster_name):
175
        pyinotify.ProcessEvent.__init__(self)
176
        self.logger = logger
177
        self.cluster_name = cluster_name
178

    
179
        # Set max_retries to 0 for unlimited retries.
180
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
181
                                 max_retries=0, logger=logger)
182

    
183
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
184

    
185
        self.client.connect()
186
        handler_logger.info("Connected succesfully")
187

    
188
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
189

    
190
        self.op_handlers = {"INSTANCE": self.process_instance_op,
191
                            "NETWORK": self.process_network_op,
192
                            "CLUSTER": self.process_cluster_op}
193
                            # "GROUP": self.process_group_op}
194

    
195
    def process_IN_CLOSE_WRITE(self, event):
196
        self.process_IN_MOVED_TO(event)
197

    
198
    def process_IN_MOVED_TO(self, event):
199
        jobfile = os.path.join(event.path, event.name)
200
        if not event.name.startswith("job-"):
201
            self.logger.debug("Not a job file: %s" % event.path)
202
            return
203

    
204
        try:
205
            data = utils.ReadFile(jobfile)
206
        except IOError:
207
            return
208

    
209
        data = serializer.LoadJson(data)
210
        job = jqueue._QueuedJob.Restore(None, data, False, False)
211

    
212
        job_id = int(job.id)
213

    
214
        for op in job.ops:
215
            op_id = op.input.OP_ID
216

    
217
            msg = None
218
            try:
219
                handler_fn = self.op_handlers[op_id.split('_')[1]]
220
                msg, routekey = handler_fn(op, job_id)
221
            except KeyError:
222
                pass
223

    
224
            if not msg:
225
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
226
                continue
227

    
228
            # Generate a unique message identifier
229
            event_time = get_time_from_status(op, job)
230

    
231
            # Get the last line of the op log as message
232
            try:
233
                logmsg = op.log[-1][-1]
234
            except IndexError:
235
                logmsg = None
236

    
237
            # Add shared attributes for all operations
238
            msg.update({"event_time": event_time,
239
                        "operation": op_id,
240
                        "status": op.status,
241
                        "cluster": self.cluster_name,
242
                        "logmsg": logmsg,
243
                        "jobId": job_id})
244

    
245
            if op.status == "success":
246
                msg["result"] = op.result
247

    
248
            if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
249
                 op.status == "success") or
250
                (op_id == "OP_INSTANCE_SET_PARAMS" and
251
                 op.status in ["success", "error", "cancelled"])):
252
                    nics = get_instance_nics(msg["instance"], self.logger)
253
                    msg["nics"] = nics
254

    
255
            msg = json.dumps(msg)
256
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
257

    
258
            # Send the message to RabbitMQ
259
            self.client.basic_publish(settings.EXCHANGE_GANETI,
260
                                      routekey,
261
                                      msg)
262

    
263
    def process_instance_op(self, op, job_id):
264
        """ Process OP_INSTANCE_* opcodes.
265

266
        """
267
        input = op.input
268
        op_id = input.OP_ID
269

    
270
        instances = None
271
        instances = get_field(input, 'instance_name')
272
        if not instances:
273
            instances = get_field(input, 'instances')
274
            if not instances or len(instances) > 1:
275
                # Do not publish messages for jobs with no or multiple
276
                # instances.  Currently snf-dispatcher can not normally handle
277
                # these messages
278
                return None, None
279
            else:
280
                instances = instances[0]
281

    
282
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
283
                          instances, op.status)
284

    
285
        job_fields = {}
286
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
287
            job_fields = {"nics": get_field(input, "nics"),
288
                          "disks": get_field(input, "disks"),
289
                          "beparams": get_field(input, "beparams")}
290

    
291
        msg = {"type": "ganeti-op-status",
292
               "instance": instances,
293
               "operation": op_id,
294
               "job_fields": job_fields}
295

    
296
        if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
297
                     "OP_INSTANCE_STARTUP"]:
298
            if op.status == "success":
299
                nics = get_instance_nics(msg["instance"], self.logger)
300
                msg["instance_nics"] = nics
301

    
302
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
303

    
304
        return msg, routekey
305

    
306
    def process_network_op(self, op, job_id):
307
        """ Process OP_NETWORK_* opcodes.
308

309
        """
310

    
311
        input = op.input
312
        op_id = input.OP_ID
313
        network_name = get_field(input, 'network_name')
314

    
315
        if not network_name:
316
            return None, None
317

    
318
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
319
                          network_name, op.status)
320

    
321
        job_fields = {
322
            'subnet': get_field(input, 'network'),
323
            'gateway': get_field(input, 'gateway'),
324
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
325
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
326
            # 'network_mode': get_field(input, 'network_mode'),
327
            # 'network_link': get_field(input, 'network_link'),
328
            'group_name': get_field(input, 'group_name')}
329

    
330
        msg = {'operation':    op_id,
331
               'type':         "ganeti-network-status",
332
               'network':      network_name,
333
               'job_fields':   job_fields}
334

    
335
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
336

    
337
        return msg, routekey
338

    
339
    def process_cluster_op(self, op, job_id):
340
        """ Process OP_CLUSTER_* opcodes.
341

342
        """
343

    
344
        input = op.input
345
        op_id = input.OP_ID
346

    
347
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
348

    
349
        msg = {'operation':    op_id,
350
               'type':         "ganeti-cluster-status"}
351

    
352
        routekey = "ganeti.event.cluster"
353

    
354
        return msg, routekey
355

    
356

    
357
def find_cluster_name():
358
    global handler_logger
359
    try:
360
        ss = SimpleStore()
361
        name = ss.GetClusterName()
362
    except Exception as e:
363
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
364
        raise e
365

    
366
    return name
367

    
368

    
369
handler_logger = None
370

    
371

    
372
def fatal_signal_handler(signum, frame):
373
    global handler_logger
374

    
375
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
376
                        signum)
377
    raise SystemExit
378

    
379

    
380
def parse_arguments(args):
381
    from optparse import OptionParser
382

    
383
    parser = OptionParser()
384
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
385
                      help="Enable debugging information")
386
    parser.add_option("-l", "--log", dest="log_file",
387
                      default="/var/log/snf-ganeti-eventd.log",
388
                      metavar="FILE",
389
                      help="Write log to FILE instead of %s" %
390
                           "/var/log/snf-ganeti-eventd.log")
391
    parser.add_option('--pid-file', dest="pid_file",
392
                      default="/var/run/snf-ganeti-eventd.pid",
393
                      metavar='PIDFILE',
394
                      help="Save PID to file (default: %s)" %
395
                           "/var/run/snf-ganeti-eventd.pid")
396

    
397
    return parser.parse_args(args)
398

    
399

    
400
def main():
401
    global handler_logger
402

    
403
    (opts, args) = parse_arguments(sys.argv[1:])
404

    
405
    # Initialize logger
406
    lvl = logging.DEBUG if opts.debug else logging.INFO
407
    logger = logging.getLogger("ganeti.eventd")
408
    logger.setLevel(lvl)
409
    formatter = logging.Formatter(
410
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
411
        "%Y-%m-%d %H:%M:%S")
412
    handler = logging.FileHandler(opts.log_file)
413
    handler.setFormatter(formatter)
414
    logger.addHandler(handler)
415
    handler_logger = logger
416

    
417
    # Rename this process so 'ps' output looks like this is a native
418
    # executable.  Can not seperate command-line arguments from actual name of
419
    # the executable by NUL bytes, so only show the name of the executable
420
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
421
    setproctitle.setproctitle(sys.argv[0])
422

    
423
    # Create pidfile
424
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
425

    
426
    # Remove any stale PID files, left behind by previous invocations
427
    if daemon.runner.is_pidfile_stale(pidf):
428
        logger.warning("Removing stale PID lock file %s", pidf.path)
429
        pidf.break_lock()
430

    
431
    # Become a daemon:
432
    # Redirect stdout and stderr to handler.stream to catch
433
    # early errors in the daemonization process [e.g., pidfile creation]
434
    # which will otherwise go to /dev/null.
435
    daemon_context = daemon.DaemonContext(
436
        pidfile=pidf,
437
        umask=022,
438
        stdout=handler.stream,
439
        stderr=handler.stream,
440
        files_preserve=[handler.stream])
441
    try:
442
        daemon_context.open()
443
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
444
        logger.critical("Failed to lock pidfile %s, another instance running?",
445
                        pidf.path)
446
        sys.exit(1)
447

    
448
    logger.info("Became a daemon")
449

    
450
    # Catch signals to ensure graceful shutdown
451
    signal(SIGINT, fatal_signal_handler)
452
    signal(SIGTERM, fatal_signal_handler)
453

    
454
    # Monitor the Ganeti job queue, create and push notifications
455
    wm = pyinotify.WatchManager()
456
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
457
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
458

    
459
    cluster_name = find_cluster_name()
460

    
461
    handler = JobFileHandler(logger, cluster_name)
462
    notifier = pyinotify.Notifier(wm, handler)
463

    
464
    try:
465
        # Fail if adding the inotify() watch fails for any reason
466
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
467
        if res[pathutils.QUEUE_DIR] < 0:
468
            raise Exception("pyinotify add_watch returned negative descriptor")
469

    
470
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
471
                    cluster_name))
472

    
473
        while True:    # loop forever
474
            # process the queue of events as explained above
475
            notifier.process_events()
476
            if notifier.check_events():
477
                # read notified events and enqeue them
478
                notifier.read_events()
479
    except SystemExit:
480
        logger.info("SystemExit")
481
    except:
482
        logger.exception("Caught exception, terminating")
483
    finally:
484
        # destroy the inotify's instance on this interrupt (stop monitoring)
485
        notifier.stop()
486
        raise
487

    
488
if __name__ == "__main__":
489
    sys.exit(main())
490

    
491
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :