Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ d9b25288

History | View | Annotate | Download (17.6 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48
# Since Ganeti 2.7, debian package ships the majority of the python code in
49
# a private module under '/usr/share/ganeti'. Add this directory to path
50
# in order to be able to import ganeti. Also, add it to the start of path
51
# to allow conflicts with Ganeti RAPI client.
52
sys.path.insert(0, "/usr/share/ganeti")
53

    
54
import json
55
import logging
56
import pyinotify
57
import daemon
58
import daemon.pidlockfile
59
import daemon.runner
60
from lockfile import LockTimeout
61
from signal import signal, SIGINT, SIGTERM
62
import setproctitle
63

    
64
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
65
from ganeti import errors as ganeti_errors
66
from ganeti.ssconf import SimpleStore
67

    
68

    
69
from synnefo import settings
70
from synnefo.lib.amqp import AMQPClient
71

    
72

    
73
def get_time_from_status(op, job):
74
    """Generate a unique message identifier for a ganeti job.
75

76
    The identifier is based on the timestamp of the job. Since a ganeti
77
    job passes from multiple states, we need to pick the timestamp that
78
    corresponds to each state.
79

80
    """
81
    status = op.status
82
    if status == constants.JOB_STATUS_QUEUED:
83
        time = job.received_timestamp
84
    try:  # Compatibility with Ganeti version
85
        if status == constants.JOB_STATUS_WAITLOCK:
86
            time = op.start_timestamp
87
    except AttributeError:
88
        if status == constants.JOB_STATUS_WAITING:
89
            time = op.start_timestamp
90
    if status == constants.JOB_STATUS_CANCELING:
91
        time = op.start_timestamp
92
    if status == constants.JOB_STATUS_RUNNING:
93
        time = op.exec_timestamp
94
    if status in constants.JOBS_FINALIZED:
95
        time = op.end_timestamp
96

    
97
    return time and time or job.end_timestamp
98

    
99
    raise InvalidBackendStatus(status, job)
100

    
101

    
102
def get_instance_attachments(instance, logger):
103
    """Query Ganeti to a get the instance's attachments (NICs and Disks)
104

105
    Get instance's attachments from Ganeti configuration data. If running on
106
    master, query Ganeti via Ganeti CLI client. Otherwise, get attachments
107
    straight from Ganeti's configuration file.
108

109
    @type instance: string
110
    @param instance: the name of the instance
111
    @rtype: instance's NICs and Disks
112
    @return: Dictionary containing the 'nics' and 'disks' of the instance.
113

114
    """
115
    try:
116
        client = cli.GetClient()
117
        q_fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
118
                    "nic.modes", "nic.links", "nic.uuids", "tags",
119
                    "disk.names", "disk.sizes", "disk.uuids"]
120
        info = client.QueryInstances([instance], q_fields, use_locking=False)
121
        # Parse NICs
122
        names, networks, ips, macs, modes, links, uuids, tags = info[0][:-3]
123
        nic_keys = ["name", "network", "ip", "mac", "mode", "link", "uuid"]
124
        nics = zip(names, networks, ips, macs, modes, links, uuids)
125
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
126
        # Parse Disks
127
        names, sizes, uuids = info[0][-3:]
128
        disk_keys = ["name", "size", "uuid"]
129
        disks = zip(names, sizes, uuids)
130
        disks = map(lambda x: dict(zip(disk_keys, x)), disks)
131
    except ganeti_errors.OpPrereqError:
132
        # Not running on master! Load the conf file
133
        raw_data = utils.ReadFile(pathutils.CLUSTER_CONF_FILE)
134
        config = serializer.LoadJson(raw_data)
135
        i = config["instances"][instance]
136
        # Parse NICs
137
        nics = []
138
        for index, nic in enumerate(i["nics"]):
139
            params = nic.pop("nicparams")
140
            nic["mode"] = params["mode"]
141
            nic["link"] = params["link"]
142
            nic["index"] = index
143
            nics.append(nic)
144
        # Parse Disks
145
        disks = []
146
        for index, disk in enumerate(i["disks"]):
147
            disks.append({"name": disk.pop("name"),
148
                          "size": disk["size"],
149
                          "uuid": disk["uuid"],
150
                          "index": index})
151
        tags = i.get("tags", [])
152
    # Get firewall from instance Tags
153
    # Tags are of the form synnefo:network:N:firewall_mode
154
    for tag in tags:
155
        t = tag.split(":")
156
        if t[0:2] == ["synnefo", "network"]:
157
            if len(t) != 4:
158
                logger.error("Malformed synefo tag %s", tag)
159
                continue
160
            nic_name = t[2]
161
            firewall = t[3]
162
            [nic.setdefault("firewall", firewall)
163
             for nic in nics if nic["name"] == nic_name]
164
    attachments = {"nics": nics,
165
                   "disks": disks}
166
    return attachments
167

    
168

    
169
class InvalidBackendStatus(Exception):
170
    def __init__(self, status, job):
171
        self.status = status
172
        self.job = job
173

    
174
    def __str__(self):
175
        return repr("Invalid backend status: %s in job %s"
176
                    % (self.status, self.job))
177

    
178

    
179
def prefix_from_name(name):
180
    return name.split('-')[0]
181

    
182

    
183
def get_field(from_, field):
184
    try:
185
        return getattr(from_, field)
186
    except AttributeError:
187
        None
188

    
189

    
190
class JobFileHandler(pyinotify.ProcessEvent):
191
    def __init__(self, logger, cluster_name):
192
        pyinotify.ProcessEvent.__init__(self)
193
        self.logger = logger
194
        self.cluster_name = cluster_name
195

    
196
        # Set max_retries to 0 for unlimited retries.
197
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
198
                                 max_retries=0, logger=logger)
199

    
200
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
201

    
202
        self.client.connect()
203
        handler_logger.info("Connected succesfully")
204

    
205
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
206

    
207
        self.op_handlers = {"INSTANCE": self.process_instance_op,
208
                            "NETWORK": self.process_network_op,
209
                            "CLUSTER": self.process_cluster_op}
210
                            # "GROUP": self.process_group_op}
211

    
212
    def process_IN_CLOSE_WRITE(self, event):
213
        self.process_IN_MOVED_TO(event)
214

    
215
    def process_IN_MOVED_TO(self, event):
216
        jobfile = os.path.join(event.path, event.name)
217
        if not event.name.startswith("job-"):
218
            self.logger.debug("Not a job file: %s" % event.path)
219
            return
220

    
221
        try:
222
            data = utils.ReadFile(jobfile)
223
        except IOError:
224
            return
225

    
226
        data = serializer.LoadJson(data)
227
        job = jqueue._QueuedJob.Restore(None, data, False, False)
228

    
229
        job_id = int(job.id)
230

    
231
        for op in job.ops:
232
            op_id = op.input.OP_ID
233

    
234
            msg = None
235
            try:
236
                handler_fn = self.op_handlers[op_id.split('_')[1]]
237
                msg, routekey = handler_fn(op, job_id)
238
            except KeyError:
239
                pass
240

    
241
            if not msg:
242
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
243
                continue
244

    
245
            # Generate a unique message identifier
246
            event_time = get_time_from_status(op, job)
247

    
248
            # Get the last line of the op log as message
249
            try:
250
                logmsg = op.log[-1][-1]
251
            except IndexError:
252
                logmsg = None
253

    
254
            # Add shared attributes for all operations
255
            msg.update({"event_time": event_time,
256
                        "operation": op_id,
257
                        "status": op.status,
258
                        "cluster": self.cluster_name,
259
                        "logmsg": logmsg,
260
                        "result": op.result,
261
                        "jobId": job_id})
262

    
263
            if op.status == "success":
264
                msg["result"] = op.result
265

    
266
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
267
                # In case an instance creation fails send the job input
268
                # so that the job can be retried if needed.
269
                msg["job_fields"] = op.Serialize()["input"]
270

    
271
            msg = json.dumps(msg)
272

    
273
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
274

    
275
            # Send the message to RabbitMQ
276
            self.client.basic_publish(settings.EXCHANGE_GANETI,
277
                                      routekey,
278
                                      msg)
279

    
280
    def process_instance_op(self, op, job_id):
281
        """ Process OP_INSTANCE_* opcodes.
282

283
        """
284
        input = op.input
285
        op_id = input.OP_ID
286

    
287
        instances = None
288
        instances = get_field(input, 'instance_name')
289
        if not instances:
290
            instances = get_field(input, 'instances')
291
            if not instances or len(instances) > 1:
292
                # Do not publish messages for jobs with no or multiple
293
                # instances.  Currently snf-dispatcher can not normally handle
294
                # these messages
295
                return None, None
296
            else:
297
                instances = instances[0]
298

    
299
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
300
                          instances, op.status)
301

    
302
        job_fields = {}
303
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
304
            job_fields = {"nics": get_field(input, "nics"),
305
                          "disks": get_field(input, "disks"),
306
                          "beparams": get_field(input, "beparams")}
307

    
308
        msg = {"type": "ganeti-op-status",
309
               "instance": instances,
310
               "operation": op_id,
311
               "job_fields": job_fields}
312

    
313
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
314
             op.status == "success") or
315
            (op_id == "OP_INSTANCE_SET_PARAMS" and
316
             op.status in ["success", "error", "cancelled"])):
317
                attachments = get_instance_attachments(msg["instance"],
318
                                                       self.logger)
319
                msg["instance_nics"] = attachments["nics"]
320
                msg["instance_disks"] = attachments["disks"]
321

    
322
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
323

    
324
        return msg, routekey
325

    
326
    def process_network_op(self, op, job_id):
327
        """ Process OP_NETWORK_* opcodes.
328

329
        """
330

    
331
        input = op.input
332
        op_id = input.OP_ID
333
        network_name = get_field(input, 'network_name')
334

    
335
        if not network_name:
336
            return None, None
337

    
338
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
339
                          network_name, op.status)
340

    
341
        job_fields = {
342
            'subnet': get_field(input, 'network'),
343
            'gateway': get_field(input, 'gateway'),
344
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
345
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
346
            # 'network_mode': get_field(input, 'network_mode'),
347
            # 'network_link': get_field(input, 'network_link'),
348
            'group_name': get_field(input, 'group_name')}
349

    
350
        msg = {'operation':    op_id,
351
               'type':         "ganeti-network-status",
352
               'network':      network_name,
353
               'job_fields':   job_fields}
354

    
355
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
356

    
357
        return msg, routekey
358

    
359
    def process_cluster_op(self, op, job_id):
360
        """ Process OP_CLUSTER_* opcodes.
361

362
        """
363

    
364
        input = op.input
365
        op_id = input.OP_ID
366

    
367
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
368

    
369
        if op_id != "OP_CLUSTER_SET_PARAMS":
370
            # Send only modifications of cluster
371
            return None, None
372

    
373
        msg = {'operation':    op_id,
374
               'type':         "ganeti-cluster-status"}
375

    
376
        routekey = "ganeti.event.cluster"
377

    
378
        return msg, routekey
379

    
380

    
381
def find_cluster_name():
382
    global handler_logger
383
    try:
384
        ss = SimpleStore()
385
        name = ss.GetClusterName()
386
    except Exception as e:
387
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
388
        raise e
389

    
390
    return name
391

    
392

    
393
handler_logger = None
394

    
395

    
396
def fatal_signal_handler(signum, frame):
397
    global handler_logger
398

    
399
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
400
                        signum)
401
    raise SystemExit
402

    
403

    
404
def parse_arguments(args):
405
    from optparse import OptionParser
406

    
407
    parser = OptionParser()
408
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
409
                      help="Enable debugging information")
410
    parser.add_option("-l", "--log", dest="log_file",
411
                      default="/var/log/snf-ganeti-eventd.log",
412
                      metavar="FILE",
413
                      help="Write log to FILE instead of %s" %
414
                           "/var/log/snf-ganeti-eventd.log")
415
    parser.add_option('--pid-file', dest="pid_file",
416
                      default="/var/run/snf-ganeti-eventd.pid",
417
                      metavar='PIDFILE',
418
                      help="Save PID to file (default: %s)" %
419
                           "/var/run/snf-ganeti-eventd.pid")
420

    
421
    return parser.parse_args(args)
422

    
423

    
424
def main():
425
    global handler_logger
426

    
427
    (opts, args) = parse_arguments(sys.argv[1:])
428

    
429
    # Initialize logger
430
    lvl = logging.DEBUG if opts.debug else logging.INFO
431
    logger = logging.getLogger("ganeti.eventd")
432
    logger.setLevel(lvl)
433
    formatter = logging.Formatter(
434
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
435
        "%Y-%m-%d %H:%M:%S")
436
    handler = logging.FileHandler(opts.log_file)
437
    handler.setFormatter(formatter)
438
    logger.addHandler(handler)
439
    handler_logger = logger
440

    
441
    # Rename this process so 'ps' output looks like this is a native
442
    # executable.  Can not seperate command-line arguments from actual name of
443
    # the executable by NUL bytes, so only show the name of the executable
444
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
445
    setproctitle.setproctitle(sys.argv[0])
446

    
447
    # Create pidfile
448
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
449

    
450
    # Remove any stale PID files, left behind by previous invocations
451
    if daemon.runner.is_pidfile_stale(pidf):
452
        logger.warning("Removing stale PID lock file %s", pidf.path)
453
        pidf.break_lock()
454

    
455
    # Become a daemon:
456
    # Redirect stdout and stderr to handler.stream to catch
457
    # early errors in the daemonization process [e.g., pidfile creation]
458
    # which will otherwise go to /dev/null.
459
    daemon_context = daemon.DaemonContext(
460
        pidfile=pidf,
461
        umask=022,
462
        stdout=handler.stream,
463
        stderr=handler.stream,
464
        files_preserve=[handler.stream])
465
    try:
466
        daemon_context.open()
467
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
468
        logger.critical("Failed to lock pidfile %s, another instance running?",
469
                        pidf.path)
470
        sys.exit(1)
471

    
472
    logger.info("Became a daemon")
473

    
474
    # Catch signals to ensure graceful shutdown
475
    signal(SIGINT, fatal_signal_handler)
476
    signal(SIGTERM, fatal_signal_handler)
477

    
478
    # Monitor the Ganeti job queue, create and push notifications
479
    wm = pyinotify.WatchManager()
480
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
481
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
482

    
483
    cluster_name = find_cluster_name()
484

    
485
    handler = JobFileHandler(logger, cluster_name)
486
    notifier = pyinotify.Notifier(wm, handler)
487

    
488
    try:
489
        # Fail if adding the inotify() watch fails for any reason
490
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
491
        if res[pathutils.QUEUE_DIR] < 0:
492
            raise Exception("pyinotify add_watch returned negative descriptor")
493

    
494
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
495
                    cluster_name))
496

    
497
        while True:    # loop forever
498
            # process the queue of events as explained above
499
            notifier.process_events()
500
            if notifier.check_events():
501
                # read notified events and enqeue them
502
                notifier.read_events()
503
    except SystemExit:
504
        logger.info("SystemExit")
505
    except:
506
        logger.exception("Caught exception, terminating")
507
    finally:
508
        # destroy the inotify's instance on this interrupt (stop monitoring)
509
        notifier.stop()
510
        raise
511

    
512
if __name__ == "__main__":
513
    sys.exit(main())
514

    
515
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :