Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ e5a92bec

History | View | Annotate | Download (16 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
60
from ganeti import errors as ganeti_errors
61
from ganeti.ssconf import SimpleStore
62

    
63

    
64
from synnefo import settings
65
from synnefo.lib.amqp import AMQPClient
66

    
67

    
68
def get_time_from_status(op, job):
69
    """Generate a unique message identifier for a ganeti job.
70

71
    The identifier is based on the timestamp of the job. Since a ganeti
72
    job passes from multiple states, we need to pick the timestamp that
73
    corresponds to each state.
74

75
    """
76
    status = op.status
77
    if status == constants.JOB_STATUS_QUEUED:
78
        time = job.received_timestamp
79
    try:  # Compatibility with Ganeti version
80
        if status == constants.JOB_STATUS_WAITLOCK:
81
            time = op.start_timestamp
82
    except AttributeError:
83
        if status == constants.JOB_STATUS_WAITING:
84
            time = op.start_timestamp
85
    if status == constants.JOB_STATUS_CANCELING:
86
        time = op.start_timestamp
87
    if status == constants.JOB_STATUS_RUNNING:
88
        time = op.exec_timestamp
89
    if status in constants.JOBS_FINALIZED:
90
        time = op.end_timestamp
91

    
92
    return time and time or job.end_timestamp
93

    
94
    raise InvalidBackendStatus(status, job)
95

    
96

    
97
def get_instance_nics(instance, logger):
98
    """Query Ganeti to a get the instance's NICs.
99

100
    Get instance's NICs from Ganeti configuration data. If running on master,
101
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102
    configuration file.
103

104
    @type instance: string
105
    @param instance: the name of the instance
106
    @rtype: List of dicts
107
    @return: Dictionary containing the instance's NICs. Each dictionary
108
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109
             'link' and 'firewall'
110

111
    """
112
    try:
113
        client = cli.GetClient()
114
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
115
                  "nic.modes", "nic.links", "tags"]
116
        info = client.QueryInstances([instance], fields, use_locking=False)
117
        names, networks, ips, macs, modes, links, tags = info[0]
118
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
119
        nics = zip(names, networks, ips, macs, modes, links)
120
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121
    except ganeti_errors.OpPrereqError:
122
        # Not running on master! Load the conf file
123
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124
        config = serializer.LoadJson(raw_data)
125
        i = config["instances"][instance]
126
        nics = []
127
        for nic in i["nics"]:
128
            params = nic.pop("nicparams")
129
            nic["mode"] = params["mode"]
130
            nic["link"] = params["link"]
131
            nics.append(nic)
132
        tags = i.get("tags", [])
133
    # Get firewall from instance Tags
134
    # Tags are of the form synnefo:network:N:firewall_mode
135
    for tag in tags:
136
        t = tag.split(":")
137
        if t[0:2] == ["synnefo", "network"]:
138
            if len(t) != 4:
139
                logger.error("Malformed synefo tag %s", tag)
140
                continue
141
            nic_name = t[2]
142
            firewall = t[3]
143
            [nic.setdefault("firewall", firewall)
144
             for nic in nics if nic["name"] == nic_name]
145
    return nics
146

    
147

    
148
class InvalidBackendStatus(Exception):
149
    def __init__(self, status, job):
150
        self.status = status
151
        self.job = job
152

    
153
    def __str__(self):
154
        return repr("Invalid backend status: %s in job %s"
155
                    % (self.status, self.job))
156

    
157

    
158
def prefix_from_name(name):
159
    return name.split('-')[0]
160

    
161

    
162
def get_field(from_, field):
163
    try:
164
        return getattr(from_, field)
165
    except AttributeError:
166
        None
167

    
168

    
169
class JobFileHandler(pyinotify.ProcessEvent):
170
    def __init__(self, logger, cluster_name):
171
        pyinotify.ProcessEvent.__init__(self)
172
        self.logger = logger
173
        self.cluster_name = cluster_name
174

    
175
        # Set max_retries to 0 for unlimited retries.
176
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
177
                                 max_retries=0, logger=logger)
178

    
179
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
180

    
181
        self.client.connect()
182
        handler_logger.info("Connected succesfully")
183

    
184
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
185

    
186
        self.op_handlers = {"INSTANCE": self.process_instance_op,
187
                            "NETWORK": self.process_network_op,
188
                            "CLUSTER": self.process_cluster_op}
189
                            # "GROUP": self.process_group_op}
190

    
191
    def process_IN_CLOSE_WRITE(self, event):
192
        self.process_IN_MOVED_TO(event)
193

    
194
    def process_IN_MOVED_TO(self, event):
195
        jobfile = os.path.join(event.path, event.name)
196
        if not event.name.startswith("job-"):
197
            self.logger.debug("Not a job file: %s" % event.path)
198
            return
199

    
200
        try:
201
            data = utils.ReadFile(jobfile)
202
        except IOError:
203
            return
204

    
205
        data = serializer.LoadJson(data)
206
        job = jqueue._QueuedJob.Restore(None, data, False, False)
207

    
208
        job_id = int(job.id)
209

    
210
        for op in job.ops:
211
            op_id = op.input.OP_ID
212

    
213
            msg = None
214
            try:
215
                handler_fn = self.op_handlers[op_id.split('_')[1]]
216
                msg, routekey = handler_fn(op, job_id)
217
            except KeyError:
218
                pass
219

    
220
            if not msg:
221
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
222
                continue
223

    
224
            # Generate a unique message identifier
225
            event_time = get_time_from_status(op, job)
226

    
227
            # Get the last line of the op log as message
228
            try:
229
                logmsg = op.log[-1][-1]
230
            except IndexError:
231
                logmsg = None
232

    
233
            # Add shared attributes for all operations
234
            msg.update({"event_time": event_time,
235
                        "operation": op_id,
236
                        "status": op.status,
237
                        "cluster": self.cluster_name,
238
                        "logmsg": logmsg,
239
                        "jobId": job_id})
240

    
241
            if op.status == "success":
242
                msg["result"] = op.result
243

    
244
            msg = json.dumps(msg)
245
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
246

    
247
            # Send the message to RabbitMQ
248
            self.client.basic_publish(settings.EXCHANGE_GANETI,
249
                                      routekey,
250
                                      msg)
251

    
252
    def process_instance_op(self, op, job_id):
253
        """ Process OP_INSTANCE_* opcodes.
254

255
        """
256
        input = op.input
257
        op_id = input.OP_ID
258

    
259
        instances = None
260
        instances = get_field(input, 'instance_name')
261
        if not instances:
262
            instances = get_field(input, 'instances')
263
            if not instances or len(instances) > 1:
264
                # Do not publish messages for jobs with no or multiple
265
                # instances.  Currently snf-dispatcher can not normally handle
266
                # these messages
267
                return None, None
268
            else:
269
                instances = instances[0]
270

    
271
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
272
                          instances, op.status)
273

    
274
        job_fields = {}
275
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
276
            job_fields = {"nics": get_field(input, "nics"),
277
                          "disks": get_field(input, "disks"),
278
                          "beparams": get_field(input, "beparams")}
279

    
280
        msg = {"type": "ganeti-op-status",
281
               "instance": instances,
282
               "operation": op_id,
283
               "job_fields": job_fields}
284

    
285
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
286
             op.status == "success") or
287
            (op_id == "OP_INSTANCE_SET_PARAMS" and
288
             op.status in ["success", "error", "cancelled"])):
289
                nics = get_instance_nics(msg["instance"], self.logger)
290
                msg["instance_nics"] = nics
291

    
292
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
293

    
294
        return msg, routekey
295

    
296
    def process_network_op(self, op, job_id):
297
        """ Process OP_NETWORK_* opcodes.
298

299
        """
300

    
301
        input = op.input
302
        op_id = input.OP_ID
303
        network_name = get_field(input, 'network_name')
304

    
305
        if not network_name:
306
            return None, None
307

    
308
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
309
                          network_name, op.status)
310

    
311
        job_fields = {
312
            'subnet': get_field(input, 'network'),
313
            'gateway': get_field(input, 'gateway'),
314
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
315
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
316
            # 'network_mode': get_field(input, 'network_mode'),
317
            # 'network_link': get_field(input, 'network_link'),
318
            'group_name': get_field(input, 'group_name')}
319

    
320
        msg = {'operation':    op_id,
321
               'type':         "ganeti-network-status",
322
               'network':      network_name,
323
               'job_fields':   job_fields}
324

    
325
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
326

    
327
        return msg, routekey
328

    
329
    def process_cluster_op(self, op, job_id):
330
        """ Process OP_CLUSTER_* opcodes.
331

332
        """
333

    
334
        input = op.input
335
        op_id = input.OP_ID
336

    
337
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
338

    
339
        msg = {'operation':    op_id,
340
               'type':         "ganeti-cluster-status"}
341

    
342
        routekey = "ganeti.event.cluster"
343

    
344
        return msg, routekey
345

    
346

    
347
def find_cluster_name():
348
    global handler_logger
349
    try:
350
        ss = SimpleStore()
351
        name = ss.GetClusterName()
352
    except Exception as e:
353
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
354
        raise e
355

    
356
    return name
357

    
358

    
359
handler_logger = None
360

    
361

    
362
def fatal_signal_handler(signum, frame):
363
    global handler_logger
364

    
365
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
366
                        signum)
367
    raise SystemExit
368

    
369

    
370
def parse_arguments(args):
371
    from optparse import OptionParser
372

    
373
    parser = OptionParser()
374
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
375
                      help="Enable debugging information")
376
    parser.add_option("-l", "--log", dest="log_file",
377
                      default="/var/log/snf-ganeti-eventd.log",
378
                      metavar="FILE",
379
                      help="Write log to FILE instead of %s" %
380
                           "/var/log/snf-ganeti-eventd.log")
381
    parser.add_option('--pid-file', dest="pid_file",
382
                      default="/var/run/snf-ganeti-eventd.pid",
383
                      metavar='PIDFILE',
384
                      help="Save PID to file (default: %s)" %
385
                           "/var/run/snf-ganeti-eventd.pid")
386

    
387
    return parser.parse_args(args)
388

    
389

    
390
def main():
391
    global handler_logger
392

    
393
    (opts, args) = parse_arguments(sys.argv[1:])
394

    
395
    # Initialize logger
396
    lvl = logging.DEBUG if opts.debug else logging.INFO
397
    logger = logging.getLogger("ganeti.eventd")
398
    logger.setLevel(lvl)
399
    formatter = logging.Formatter(
400
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
401
        "%Y-%m-%d %H:%M:%S")
402
    handler = logging.FileHandler(opts.log_file)
403
    handler.setFormatter(formatter)
404
    logger.addHandler(handler)
405
    handler_logger = logger
406

    
407
    # Rename this process so 'ps' output looks like this is a native
408
    # executable.  Can not seperate command-line arguments from actual name of
409
    # the executable by NUL bytes, so only show the name of the executable
410
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
411
    setproctitle.setproctitle(sys.argv[0])
412

    
413
    # Create pidfile
414
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
415

    
416
    # Remove any stale PID files, left behind by previous invocations
417
    if daemon.runner.is_pidfile_stale(pidf):
418
        logger.warning("Removing stale PID lock file %s", pidf.path)
419
        pidf.break_lock()
420

    
421
    # Become a daemon:
422
    # Redirect stdout and stderr to handler.stream to catch
423
    # early errors in the daemonization process [e.g., pidfile creation]
424
    # which will otherwise go to /dev/null.
425
    daemon_context = daemon.DaemonContext(
426
        pidfile=pidf,
427
        umask=022,
428
        stdout=handler.stream,
429
        stderr=handler.stream,
430
        files_preserve=[handler.stream])
431
    try:
432
        daemon_context.open()
433
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
434
        logger.critical("Failed to lock pidfile %s, another instance running?",
435
                        pidf.path)
436
        sys.exit(1)
437

    
438
    logger.info("Became a daemon")
439

    
440
    # Catch signals to ensure graceful shutdown
441
    signal(SIGINT, fatal_signal_handler)
442
    signal(SIGTERM, fatal_signal_handler)
443

    
444
    # Monitor the Ganeti job queue, create and push notifications
445
    wm = pyinotify.WatchManager()
446
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
447
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
448

    
449
    cluster_name = find_cluster_name()
450

    
451
    handler = JobFileHandler(logger, cluster_name)
452
    notifier = pyinotify.Notifier(wm, handler)
453

    
454
    try:
455
        # Fail if adding the inotify() watch fails for any reason
456
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
457
        if res[pathutils.QUEUE_DIR] < 0:
458
            raise Exception("pyinotify add_watch returned negative descriptor")
459

    
460
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
461
                    cluster_name))
462

    
463
        while True:    # loop forever
464
            # process the queue of events as explained above
465
            notifier.process_events()
466
            if notifier.check_events():
467
                # read notified events and enqeue them
468
                notifier.read_events()
469
    except SystemExit:
470
        logger.info("SystemExit")
471
    except:
472
        logger.exception("Caught exception, terminating")
473
    finally:
474
        # destroy the inotify's instance on this interrupt (stop monitoring)
475
        notifier.stop()
476
        raise
477

    
478
if __name__ == "__main__":
479
    sys.exit(main())
480

    
481
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :