Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 11d4d283

History | View | Annotate | Download (18.1 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48
# Since Ganeti 2.7, debian package ships the majority of the python code in
49
# a private module under '/usr/share/ganeti'. Add this directory to path
50
# in order to be able to import ganeti. Also, add it to the start of path
51
# to allow conflicts with Ganeti RAPI client.
52
sys.path.insert(0, "/usr/share/ganeti")
53

    
54
import json
55
import logging
56
import pyinotify
57
import daemon
58
import daemon.pidlockfile
59
import daemon.runner
60
from lockfile import LockTimeout
61
from signal import signal, SIGINT, SIGTERM
62
import setproctitle
63

    
64
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
65
from ganeti import errors as ganeti_errors
66
from ganeti.ssconf import SimpleStore
67

    
68

    
69
from synnefo import settings
70
from synnefo.lib.amqp import AMQPClient
71

    
72

    
73
def get_time_from_status(op, job):
74
    """Generate a unique message identifier for a ganeti job.
75

76
    The identifier is based on the timestamp of the job. Since a ganeti
77
    job passes from multiple states, we need to pick the timestamp that
78
    corresponds to each state.
79

80
    """
81
    status = op.status
82
    if status == constants.JOB_STATUS_QUEUED:
83
        time = job.received_timestamp
84
    try:  # Compatibility with Ganeti version
85
        if status == constants.JOB_STATUS_WAITLOCK:
86
            time = op.start_timestamp
87
    except AttributeError:
88
        if status == constants.JOB_STATUS_WAITING:
89
            time = op.start_timestamp
90
    if status == constants.JOB_STATUS_CANCELING:
91
        time = op.start_timestamp
92
    if status == constants.JOB_STATUS_RUNNING:
93
        time = op.exec_timestamp
94
    if status in constants.JOBS_FINALIZED:
95
        time = op.end_timestamp
96

    
97
    return time and time or job.end_timestamp
98

    
99
    raise InvalidBackendStatus(status, job)
100

    
101

    
102
def get_instance_attachments(instance, logger):
103
    """Query Ganeti to a get the instance's attachments (NICs and Disks)
104

105
    Get instance's attachments from Ganeti configuration data. If running on
106
    master, query Ganeti via Ganeti CLI client. Otherwise, get attachments
107
    straight from Ganeti's configuration file.
108

109
    @type instance: string
110
    @param instance: the name of the instance
111
    @rtype: instance's NICs and Disks
112
    @return: Dictionary containing the 'nics' and 'disks' of the instance.
113

114
    """
115
    try:
116
        client = cli.GetClient()
117
        q_fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
118
                    "nic.modes", "nic.links", "nic.uuids", "tags",
119
                    "disk.names", "disk.sizes", "disk.uuids"]
120
        info = client.QueryInstances([instance], q_fields, use_locking=False)
121
        # Parse NICs
122
        names, networks, ips, macs, modes, links, uuids, tags = info[0][:-3]
123
        nic_keys = ["name", "network", "ip", "mac", "mode", "link", "uuid"]
124
        nics = zip(names, networks, ips, macs, modes, links, uuids)
125
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
126
        # Parse Disks
127
        names, sizes, uuids = info[0][-3:]
128
        disk_keys = ["name", "size", "uuid"]
129
        disks = zip(names, sizes, uuids)
130
        disks = map(lambda x: dict(zip(disk_keys, x)), disks)
131
    except ganeti_errors.OpPrereqError:
132
        # Not running on master! Load the conf file
133
        raw_data = utils.ReadFile(pathutils.CLUSTER_CONF_FILE)
134
        config = serializer.LoadJson(raw_data)
135
        i = config["instances"][instance]
136
        # Parse NICs
137
        nics = []
138
        for index, nic in enumerate(i["nics"]):
139
            params = nic.pop("nicparams")
140
            nic["mode"] = params["mode"]
141
            nic["link"] = params["link"]
142
            nic["index"] = index
143
            nics.append(nic)
144
        # Parse Disks
145
        disks = []
146
        for index, disk in enumerate(i["disks"]):
147
            disks.append({"name": disk.pop("name"),
148
                          "size": disk["size"],
149
                          "uuid": disk["uuid"],
150
                          "index": index})
151
        tags = i.get("tags", [])
152
    # Get firewall from instance Tags
153
    # Tags are of the form synnefo:network:N:firewall_mode
154
    for tag in tags:
155
        t = tag.split(":")
156
        if t[0:2] == ["synnefo", "network"]:
157
            if len(t) != 4:
158
                logger.error("Malformed synefo tag %s", tag)
159
                continue
160
            nic_name = t[2]
161
            firewall = t[3]
162
            [nic.setdefault("firewall", firewall)
163
             for nic in nics if nic["name"] == nic_name]
164
    attachments = {"nics": nics,
165
                   "disks": disks}
166
    return attachments
167

    
168

    
169
class InvalidBackendStatus(Exception):
170
    def __init__(self, status, job):
171
        self.status = status
172
        self.job = job
173

    
174
    def __str__(self):
175
        return repr("Invalid backend status: %s in job %s"
176
                    % (self.status, self.job))
177

    
178

    
179
def prefix_from_name(name):
180
    return name.split('-')[0]
181

    
182

    
183
def get_field(from_, field):
184
    try:
185
        return getattr(from_, field)
186
    except AttributeError:
187
        None
188

    
189

    
190
class JobFileHandler(pyinotify.ProcessEvent):
191
    def __init__(self, logger, cluster_name):
192
        pyinotify.ProcessEvent.__init__(self)
193
        self.logger = logger
194
        self.cluster_name = cluster_name
195

    
196
        # Set max_retries to 0 for unlimited retries.
197
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
198
                                 max_retries=0, logger=logger)
199

    
200
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
201

    
202
        self.client.connect()
203
        handler_logger.info("Connected succesfully")
204

    
205
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
206

    
207
        self.op_handlers = {"INSTANCE": self.process_instance_op,
208
                            "NETWORK": self.process_network_op,
209
                            "CLUSTER": self.process_cluster_op}
210
                            # "GROUP": self.process_group_op}
211

    
212
    def process_IN_CLOSE_WRITE(self, event):
213
        self.process_IN_MOVED_TO(event)
214

    
215
    def process_IN_MOVED_TO(self, event):
216
        jobfile = os.path.join(event.path, event.name)
217
        if not event.name.startswith("job-"):
218
            self.logger.debug("Not a job file: %s" % event.path)
219
            return
220

    
221
        try:
222
            data = utils.ReadFile(jobfile)
223
        except IOError:
224
            return
225

    
226
        data = serializer.LoadJson(data)
227
        job = jqueue._QueuedJob.Restore(None, data, False, False)
228

    
229
        job_id = int(job.id)
230

    
231
        for op in job.ops:
232
            op_id = op.input.OP_ID
233

    
234
            msg = None
235
            try:
236
                handler_fn = self.op_handlers[op_id.split('_')[1]]
237
                msg, routekey = handler_fn(op, job_id)
238
            except KeyError:
239
                pass
240

    
241
            if not msg:
242
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
243
                continue
244

    
245
            # Generate a unique message identifier
246
            event_time = get_time_from_status(op, job)
247

    
248
            # Get the last line of the op log as message
249
            try:
250
                logmsg = op.log[-1][-1]
251
            except IndexError:
252
                logmsg = None
253

    
254
            # Add shared attributes for all operations
255
            msg.update({"event_time": event_time,
256
                        "operation": op_id,
257
                        "status": op.status,
258
                        "cluster": self.cluster_name,
259
                        "logmsg": logmsg,
260
                        "result": op.result,
261
                        "jobId": job_id})
262

    
263
            if op.status == "success":
264
                msg["result"] = op.result
265

    
266
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
267
                # In case an instance creation fails send the job input
268
                # so that the job can be retried if needed.
269
                msg["job_fields"] = op.Serialize()["input"]
270

    
271
            msg = json.dumps(msg)
272

    
273
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
274

    
275
            # Send the message to RabbitMQ
276
            self.client.basic_publish(settings.EXCHANGE_GANETI,
277
                                      routekey,
278
                                      msg)
279

    
280
    def process_instance_op(self, op, job_id):
281
        """ Process OP_INSTANCE_* opcodes.
282

283
        """
284
        input = op.input
285
        op_id = input.OP_ID
286

    
287
        instances = None
288
        instances = get_field(input, 'instance_name')
289
        if not instances:
290
            instances = get_field(input, 'instances')
291
            if not instances or len(instances) > 1:
292
                # Do not publish messages for jobs with no or multiple
293
                # instances.  Currently snf-dispatcher can not normally handle
294
                # these messages
295
                return None, None
296
            else:
297
                instances = instances[0]
298

    
299
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
300
                          instances, op.status)
301

    
302
        job_fields = {}
303
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
304
            job_fields = {"nics": get_field(input, "nics"),
305
                          "disks": get_field(input, "disks"),
306
                          "beparams": get_field(input, "beparams")}
307
        elif op_id == "OP_INSTANCE_SNAPSHOT":
308
            job_fields = {"disks": get_field(input, "disks")}
309
            reason = get_field(input, "reason")
310
            snapshot_info = None
311
            if isinstance(reason, list) and len(reason) > 0:
312
                reason = reason[0]
313
                if reason[0] == "gnt:user":
314
                    snapshot_info = reason[1]
315
            self.logger.critical("LALALL %s", job_fields["disks"][0])
316
            job_fields["disks"][0][1]["snapshot_info"] = snapshot_info
317

    
318
        msg = {"type": "ganeti-op-status",
319
               "instance": instances,
320
               "operation": op_id,
321
               "job_fields": job_fields}
322

    
323
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
324
             op.status == "success") or
325
            (op_id == "OP_INSTANCE_SET_PARAMS" and
326
             op.status in ["success", "error", "cancelled"])):
327
                attachments = get_instance_attachments(msg["instance"],
328
                                                       self.logger)
329
                msg["instance_nics"] = attachments["nics"]
330
                msg["instance_disks"] = attachments["disks"]
331

    
332
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
333

    
334
        return msg, routekey
335

    
336
    def process_network_op(self, op, job_id):
337
        """ Process OP_NETWORK_* opcodes.
338

339
        """
340

    
341
        input = op.input
342
        op_id = input.OP_ID
343
        network_name = get_field(input, 'network_name')
344

    
345
        if not network_name:
346
            return None, None
347

    
348
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
349
                          network_name, op.status)
350

    
351
        job_fields = {
352
            'subnet': get_field(input, 'network'),
353
            'gateway': get_field(input, 'gateway'),
354
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
355
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
356
            # 'network_mode': get_field(input, 'network_mode'),
357
            # 'network_link': get_field(input, 'network_link'),
358
            'group_name': get_field(input, 'group_name')}
359

    
360
        msg = {'operation':    op_id,
361
               'type':         "ganeti-network-status",
362
               'network':      network_name,
363
               'job_fields':   job_fields}
364

    
365
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
366

    
367
        return msg, routekey
368

    
369
    def process_cluster_op(self, op, job_id):
370
        """ Process OP_CLUSTER_* opcodes.
371

372
        """
373

    
374
        input = op.input
375
        op_id = input.OP_ID
376

    
377
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
378

    
379
        if op_id != "OP_CLUSTER_SET_PARAMS":
380
            # Send only modifications of cluster
381
            return None, None
382

    
383
        msg = {'operation':    op_id,
384
               'type':         "ganeti-cluster-status"}
385

    
386
        routekey = "ganeti.event.cluster"
387

    
388
        return msg, routekey
389

    
390

    
391
def find_cluster_name():
392
    global handler_logger
393
    try:
394
        ss = SimpleStore()
395
        name = ss.GetClusterName()
396
    except Exception as e:
397
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
398
        raise e
399

    
400
    return name
401

    
402

    
403
handler_logger = None
404

    
405

    
406
def fatal_signal_handler(signum, frame):
407
    global handler_logger
408

    
409
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
410
                        signum)
411
    raise SystemExit
412

    
413

    
414
def parse_arguments(args):
415
    from optparse import OptionParser
416

    
417
    parser = OptionParser()
418
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
419
                      help="Enable debugging information")
420
    parser.add_option("-l", "--log", dest="log_file",
421
                      default="/var/log/snf-ganeti-eventd.log",
422
                      metavar="FILE",
423
                      help="Write log to FILE instead of %s" %
424
                           "/var/log/snf-ganeti-eventd.log")
425
    parser.add_option('--pid-file', dest="pid_file",
426
                      default="/var/run/snf-ganeti-eventd.pid",
427
                      metavar='PIDFILE',
428
                      help="Save PID to file (default: %s)" %
429
                           "/var/run/snf-ganeti-eventd.pid")
430

    
431
    return parser.parse_args(args)
432

    
433

    
434
def main():
435
    global handler_logger
436

    
437
    (opts, args) = parse_arguments(sys.argv[1:])
438

    
439
    # Initialize logger
440
    lvl = logging.DEBUG if opts.debug else logging.INFO
441
    logger = logging.getLogger("ganeti.eventd")
442
    logger.setLevel(lvl)
443
    formatter = logging.Formatter(
444
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
445
        "%Y-%m-%d %H:%M:%S")
446
    handler = logging.FileHandler(opts.log_file)
447
    handler.setFormatter(formatter)
448
    logger.addHandler(handler)
449
    handler_logger = logger
450

    
451
    # Rename this process so 'ps' output looks like this is a native
452
    # executable.  Can not seperate command-line arguments from actual name of
453
    # the executable by NUL bytes, so only show the name of the executable
454
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
455
    setproctitle.setproctitle(sys.argv[0])
456

    
457
    # Create pidfile
458
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
459

    
460
    # Remove any stale PID files, left behind by previous invocations
461
    if daemon.runner.is_pidfile_stale(pidf):
462
        logger.warning("Removing stale PID lock file %s", pidf.path)
463
        pidf.break_lock()
464

    
465
    # Become a daemon:
466
    # Redirect stdout and stderr to handler.stream to catch
467
    # early errors in the daemonization process [e.g., pidfile creation]
468
    # which will otherwise go to /dev/null.
469
    daemon_context = daemon.DaemonContext(
470
        pidfile=pidf,
471
        umask=022,
472
        stdout=handler.stream,
473
        stderr=handler.stream,
474
        files_preserve=[handler.stream])
475
    try:
476
        daemon_context.open()
477
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
478
        logger.critical("Failed to lock pidfile %s, another instance running?",
479
                        pidf.path)
480
        sys.exit(1)
481

    
482
    logger.info("Became a daemon")
483

    
484
    # Catch signals to ensure graceful shutdown
485
    signal(SIGINT, fatal_signal_handler)
486
    signal(SIGTERM, fatal_signal_handler)
487

    
488
    # Monitor the Ganeti job queue, create and push notifications
489
    wm = pyinotify.WatchManager()
490
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
491
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
492

    
493
    cluster_name = find_cluster_name()
494

    
495
    handler = JobFileHandler(logger, cluster_name)
496
    notifier = pyinotify.Notifier(wm, handler)
497

    
498
    try:
499
        # Fail if adding the inotify() watch fails for any reason
500
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
501
        if res[pathutils.QUEUE_DIR] < 0:
502
            raise Exception("pyinotify add_watch returned negative descriptor")
503

    
504
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
505
                    cluster_name))
506

    
507
        while True:    # loop forever
508
            # process the queue of events as explained above
509
            notifier.process_events()
510
            if notifier.check_events():
511
                # read notified events and enqeue them
512
                notifier.read_events()
513
    except SystemExit:
514
        logger.info("SystemExit")
515
    except:
516
        logger.exception("Caught exception, terminating")
517
    finally:
518
        # destroy the inotify's instance on this interrupt (stop monitoring)
519
        notifier.stop()
520
        raise
521

    
522
if __name__ == "__main__":
523
    sys.exit(main())
524

    
525
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :