Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 168a12e2

History | View | Annotate | Download (16.3 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48
sys.path.insert(0, "/usr/share/ganeti/")
49

    
50
import json
51
import logging
52
import pyinotify
53
import daemon
54
import daemon.pidlockfile
55
import daemon.runner
56
from lockfile import LockTimeout
57
from signal import signal, SIGINT, SIGTERM
58
import setproctitle
59

    
60
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
61
from ganeti import errors as ganeti_errors
62
from ganeti.ssconf import SimpleStore
63

    
64

    
65
from synnefo import settings
66
from synnefo.lib.amqp import AMQPClient
67

    
68

    
69
def get_time_from_status(op, job):
70
    """Generate a unique message identifier for a ganeti job.
71

72
    The identifier is based on the timestamp of the job. Since a ganeti
73
    job passes from multiple states, we need to pick the timestamp that
74
    corresponds to each state.
75

76
    """
77
    status = op.status
78
    if status == constants.JOB_STATUS_QUEUED:
79
        time = job.received_timestamp
80
    try:  # Compatibility with Ganeti version
81
        if status == constants.JOB_STATUS_WAITLOCK:
82
            time = op.start_timestamp
83
    except AttributeError:
84
        if status == constants.JOB_STATUS_WAITING:
85
            time = op.start_timestamp
86
    if status == constants.JOB_STATUS_CANCELING:
87
        time = op.start_timestamp
88
    if status == constants.JOB_STATUS_RUNNING:
89
        time = op.exec_timestamp
90
    if status in constants.JOBS_FINALIZED:
91
        time = op.end_timestamp
92

    
93
    return time and time or job.end_timestamp
94

    
95
    raise InvalidBackendStatus(status, job)
96

    
97

    
98
def get_instance_nics(instance, logger):
99
    """Query Ganeti to a get the instance's NICs.
100

101
    Get instance's NICs from Ganeti configuration data. If running on master,
102
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
103
    configuration file.
104

105
    @type instance: string
106
    @param instance: the name of the instance
107
    @rtype: List of dicts
108
    @return: Dictionary containing the instance's NICs. Each dictionary
109
             contains the following keys: 'network', 'ip', 'mac', 'mode',
110
             'link' and 'firewall'
111

112
    """
113
    try:
114
        client = cli.GetClient()
115
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
116
                  "nic.modes", "nic.links", "tags"]
117
        info = client.QueryInstances([instance], fields, use_locking=False)
118
        names, networks, ips, macs, modes, links, tags = info[0]
119
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
120
        nics = zip(names, networks, ips, macs, modes, links)
121
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
122
    except ganeti_errors.OpPrereqError:
123
        # Not running on master! Load the conf file
124
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
125
        config = serializer.LoadJson(raw_data)
126
        i = config["instances"][instance]
127
        nics = []
128
        for nic in i["nics"]:
129
            params = nic.pop("nicparams")
130
            nic["mode"] = params["mode"]
131
            nic["link"] = params["link"]
132
            nics.append(nic)
133
        tags = i.get("tags", [])
134
    # Get firewall from instance Tags
135
    # Tags are of the form synnefo:network:N:firewall_mode
136
    for tag in tags:
137
        t = tag.split(":")
138
        if t[0:2] == ["synnefo", "network"]:
139
            if len(t) != 4:
140
                logger.error("Malformed synefo tag %s", tag)
141
                continue
142
            nic_name = t[2]
143
            firewall = t[3]
144
            [nic.setdefault("firewall", firewall)
145
             for nic in nics if nic["name"] == nic_name]
146
    return nics
147

    
148

    
149
class InvalidBackendStatus(Exception):
150
    def __init__(self, status, job):
151
        self.status = status
152
        self.job = job
153

    
154
    def __str__(self):
155
        return repr("Invalid backend status: %s in job %s"
156
                    % (self.status, self.job))
157

    
158

    
159
def prefix_from_name(name):
160
    return name.split('-')[0]
161

    
162

    
163
def get_field(from_, field):
164
    try:
165
        return getattr(from_, field)
166
    except AttributeError:
167
        None
168

    
169

    
170
class JobFileHandler(pyinotify.ProcessEvent):
171
    def __init__(self, logger, cluster_name):
172
        pyinotify.ProcessEvent.__init__(self)
173
        self.logger = logger
174
        self.cluster_name = cluster_name
175

    
176
        # Set max_retries to 0 for unlimited retries.
177
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
178
                                 max_retries=0, logger=logger)
179

    
180
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
181

    
182
        self.client.connect()
183
        handler_logger.info("Connected succesfully")
184

    
185
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
186

    
187
        self.op_handlers = {"INSTANCE": self.process_instance_op,
188
                            "NETWORK": self.process_network_op,
189
                            "CLUSTER": self.process_cluster_op}
190
                            # "GROUP": self.process_group_op}
191

    
192
    def process_IN_CLOSE_WRITE(self, event):
193
        self.process_IN_MOVED_TO(event)
194

    
195
    def process_IN_MOVED_TO(self, event):
196
        jobfile = os.path.join(event.path, event.name)
197
        if not event.name.startswith("job-"):
198
            self.logger.debug("Not a job file: %s" % event.path)
199
            return
200

    
201
        try:
202
            data = utils.ReadFile(jobfile)
203
        except IOError:
204
            return
205

    
206
        data = serializer.LoadJson(data)
207
        job = jqueue._QueuedJob.Restore(None, data, False, False)
208

    
209
        job_id = int(job.id)
210

    
211
        for op in job.ops:
212
            op_id = op.input.OP_ID
213

    
214
            msg = None
215
            try:
216
                handler_fn = self.op_handlers[op_id.split('_')[1]]
217
                msg, routekey = handler_fn(op, job_id)
218
            except KeyError:
219
                pass
220

    
221
            if not msg:
222
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
223
                continue
224

    
225
            # Generate a unique message identifier
226
            event_time = get_time_from_status(op, job)
227

    
228
            # Get the last line of the op log as message
229
            try:
230
                logmsg = op.log[-1][-1]
231
            except IndexError:
232
                logmsg = None
233

    
234
            # Add shared attributes for all operations
235
            msg.update({"event_time": event_time,
236
                        "operation": op_id,
237
                        "status": op.status,
238
                        "cluster": self.cluster_name,
239
                        "logmsg": logmsg,
240
                        "jobId": job_id})
241

    
242
            if op.status == "success":
243
                msg["result"] = op.result
244

    
245
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
246
                # In case an instance creation fails send the job input
247
                # so that the job can be retried if needed.
248
                msg["job_fields"] = op.Serialize()["input"]
249

    
250
            msg = json.dumps(msg)
251

    
252
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
253

    
254
            # Send the message to RabbitMQ
255
            self.client.basic_publish(settings.EXCHANGE_GANETI,
256
                                      routekey,
257
                                      msg)
258

    
259
    def process_instance_op(self, op, job_id):
260
        """ Process OP_INSTANCE_* opcodes.
261

262
        """
263
        input = op.input
264
        op_id = input.OP_ID
265

    
266
        instances = None
267
        instances = get_field(input, 'instance_name')
268
        if not instances:
269
            instances = get_field(input, 'instances')
270
            if not instances or len(instances) > 1:
271
                # Do not publish messages for jobs with no or multiple
272
                # instances.  Currently snf-dispatcher can not normally handle
273
                # these messages
274
                return None, None
275
            else:
276
                instances = instances[0]
277

    
278
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
279
                          instances, op.status)
280

    
281
        job_fields = {}
282
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
283
            job_fields = {"nics": get_field(input, "nics"),
284
                          "disks": get_field(input, "disks"),
285
                          "beparams": get_field(input, "beparams")}
286

    
287
        msg = {"type": "ganeti-op-status",
288
               "instance": instances,
289
               "operation": op_id,
290
               "job_fields": job_fields}
291

    
292
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
293
             op.status == "success") or
294
            (op_id == "OP_INSTANCE_SET_PARAMS" and
295
             op.status in ["success", "error", "cancelled"])):
296
                nics = get_instance_nics(msg["instance"], self.logger)
297
                msg["instance_nics"] = nics
298

    
299
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
300

    
301
        return msg, routekey
302

    
303
    def process_network_op(self, op, job_id):
304
        """ Process OP_NETWORK_* opcodes.
305

306
        """
307

    
308
        input = op.input
309
        op_id = input.OP_ID
310
        network_name = get_field(input, 'network_name')
311

    
312
        if not network_name:
313
            return None, None
314

    
315
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
316
                          network_name, op.status)
317

    
318
        job_fields = {
319
            'subnet': get_field(input, 'network'),
320
            'gateway': get_field(input, 'gateway'),
321
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
322
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
323
            # 'network_mode': get_field(input, 'network_mode'),
324
            # 'network_link': get_field(input, 'network_link'),
325
            'group_name': get_field(input, 'group_name')}
326

    
327
        msg = {'operation':    op_id,
328
               'type':         "ganeti-network-status",
329
               'network':      network_name,
330
               'job_fields':   job_fields}
331

    
332
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
333

    
334
        return msg, routekey
335

    
336
    def process_cluster_op(self, op, job_id):
337
        """ Process OP_CLUSTER_* opcodes.
338

339
        """
340

    
341
        input = op.input
342
        op_id = input.OP_ID
343

    
344
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
345

    
346
        msg = {'operation':    op_id,
347
               'type':         "ganeti-cluster-status"}
348

    
349
        routekey = "ganeti.event.cluster"
350

    
351
        return msg, routekey
352

    
353

    
354
def find_cluster_name():
355
    global handler_logger
356
    try:
357
        ss = SimpleStore()
358
        name = ss.GetClusterName()
359
    except Exception as e:
360
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
361
        raise e
362

    
363
    return name
364

    
365

    
366
handler_logger = None
367

    
368

    
369
def fatal_signal_handler(signum, frame):
370
    global handler_logger
371

    
372
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
373
                        signum)
374
    raise SystemExit
375

    
376

    
377
def parse_arguments(args):
378
    from optparse import OptionParser
379

    
380
    parser = OptionParser()
381
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
382
                      help="Enable debugging information")
383
    parser.add_option("-l", "--log", dest="log_file",
384
                      default="/var/log/snf-ganeti-eventd.log",
385
                      metavar="FILE",
386
                      help="Write log to FILE instead of %s" %
387
                           "/var/log/snf-ganeti-eventd.log")
388
    parser.add_option('--pid-file', dest="pid_file",
389
                      default="/var/run/snf-ganeti-eventd.pid",
390
                      metavar='PIDFILE',
391
                      help="Save PID to file (default: %s)" %
392
                           "/var/run/snf-ganeti-eventd.pid")
393

    
394
    return parser.parse_args(args)
395

    
396

    
397
def main():
398
    global handler_logger
399

    
400
    (opts, args) = parse_arguments(sys.argv[1:])
401

    
402
    # Initialize logger
403
    lvl = logging.DEBUG if opts.debug else logging.INFO
404
    logger = logging.getLogger("ganeti.eventd")
405
    logger.setLevel(lvl)
406
    formatter = logging.Formatter(
407
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
408
        "%Y-%m-%d %H:%M:%S")
409
    handler = logging.FileHandler(opts.log_file)
410
    handler.setFormatter(formatter)
411
    logger.addHandler(handler)
412
    handler_logger = logger
413

    
414
    # Rename this process so 'ps' output looks like this is a native
415
    # executable.  Can not seperate command-line arguments from actual name of
416
    # the executable by NUL bytes, so only show the name of the executable
417
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
418
    setproctitle.setproctitle(sys.argv[0])
419

    
420
    # Create pidfile
421
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
422

    
423
    # Remove any stale PID files, left behind by previous invocations
424
    if daemon.runner.is_pidfile_stale(pidf):
425
        logger.warning("Removing stale PID lock file %s", pidf.path)
426
        pidf.break_lock()
427

    
428
    # Become a daemon:
429
    # Redirect stdout and stderr to handler.stream to catch
430
    # early errors in the daemonization process [e.g., pidfile creation]
431
    # which will otherwise go to /dev/null.
432
    daemon_context = daemon.DaemonContext(
433
        pidfile=pidf,
434
        umask=022,
435
        stdout=handler.stream,
436
        stderr=handler.stream,
437
        files_preserve=[handler.stream])
438
    try:
439
        daemon_context.open()
440
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
441
        logger.critical("Failed to lock pidfile %s, another instance running?",
442
                        pidf.path)
443
        sys.exit(1)
444

    
445
    logger.info("Became a daemon")
446

    
447
    # Catch signals to ensure graceful shutdown
448
    signal(SIGINT, fatal_signal_handler)
449
    signal(SIGTERM, fatal_signal_handler)
450

    
451
    # Monitor the Ganeti job queue, create and push notifications
452
    wm = pyinotify.WatchManager()
453
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
454
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
455

    
456
    cluster_name = find_cluster_name()
457

    
458
    handler = JobFileHandler(logger, cluster_name)
459
    notifier = pyinotify.Notifier(wm, handler)
460

    
461
    try:
462
        # Fail if adding the inotify() watch fails for any reason
463
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
464
        if res[pathutils.QUEUE_DIR] < 0:
465
            raise Exception("pyinotify add_watch returned negative descriptor")
466

    
467
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
468
                    cluster_name))
469

    
470
        while True:    # loop forever
471
            # process the queue of events as explained above
472
            notifier.process_events()
473
            if notifier.check_events():
474
                # read notified events and enqeue them
475
                notifier.read_events()
476
    except SystemExit:
477
        logger.info("SystemExit")
478
    except:
479
        logger.exception("Caught exception, terminating")
480
    finally:
481
        # destroy the inotify's instance on this interrupt (stop monitoring)
482
        notifier.stop()
483
        raise
484

    
485
if __name__ == "__main__":
486
    sys.exit(main())
487

    
488
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :