Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 4b7fbdc6

History | View | Annotate | Download (16.7 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48
# Since Ganeti 2.7, debian package ships the majority of the python code in
49
# a private module under '/usr/share/ganeti'. Add this directory to path
50
# in order to be able to import ganeti. Also, add it to the start of path
51
# to allow conflicts with Ganeti RAPI client.
52
sys.path.insert(0, "/usr/share/ganeti")
53

    
54
import json
55
import logging
56
import pyinotify
57
import daemon
58
import daemon.pidlockfile
59
import daemon.runner
60
from lockfile import LockTimeout
61
from signal import signal, SIGINT, SIGTERM
62
import setproctitle
63

    
64
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
65
from ganeti import errors as ganeti_errors
66
from ganeti.ssconf import SimpleStore
67

    
68

    
69
from synnefo import settings
70
from synnefo.lib.amqp import AMQPClient
71

    
72

    
73
def get_time_from_status(op, job):
74
    """Generate a unique message identifier for a ganeti job.
75

76
    The identifier is based on the timestamp of the job. Since a ganeti
77
    job passes from multiple states, we need to pick the timestamp that
78
    corresponds to each state.
79

80
    """
81
    status = op.status
82
    if status == constants.JOB_STATUS_QUEUED:
83
        time = job.received_timestamp
84
    try:  # Compatibility with Ganeti version
85
        if status == constants.JOB_STATUS_WAITLOCK:
86
            time = op.start_timestamp
87
    except AttributeError:
88
        if status == constants.JOB_STATUS_WAITING:
89
            time = op.start_timestamp
90
    if status == constants.JOB_STATUS_CANCELING:
91
        time = op.start_timestamp
92
    if status == constants.JOB_STATUS_RUNNING:
93
        time = op.exec_timestamp
94
    if status in constants.JOBS_FINALIZED:
95
        time = op.end_timestamp
96

    
97
    return time and time or job.end_timestamp
98

    
99
    raise InvalidBackendStatus(status, job)
100

    
101

    
102
def get_instance_nics(instance, logger):
103
    """Query Ganeti to a get the instance's NICs.
104

105
    Get instance's NICs from Ganeti configuration data. If running on master,
106
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
107
    configuration file.
108

109
    @type instance: string
110
    @param instance: the name of the instance
111
    @rtype: List of dicts
112
    @return: Dictionary containing the instance's NICs. Each dictionary
113
             contains the following keys: 'network', 'ip', 'mac', 'mode',
114
             'link' and 'firewall'
115

116
    """
117
    try:
118
        client = cli.GetClient()
119
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
120
                  "nic.modes", "nic.links", "tags"]
121
        info = client.QueryInstances([instance], fields, use_locking=False)
122
        names, networks, ips, macs, modes, links, tags = info[0]
123
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
124
        nics = zip(names, networks, ips, macs, modes, links)
125
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
126
    except ganeti_errors.OpPrereqError:
127
        # Not running on master! Load the conf file
128
        raw_data = utils.ReadFile(pathutils.CLUSTER_CONF_FILE)
129
        config = serializer.LoadJson(raw_data)
130
        i = config["instances"][instance]
131
        nics = []
132
        for nic in i["nics"]:
133
            params = nic.pop("nicparams")
134
            nic["mode"] = params["mode"]
135
            nic["link"] = params["link"]
136
            nics.append(nic)
137
        tags = i.get("tags", [])
138
    # Get firewall from instance Tags
139
    # Tags are of the form synnefo:network:N:firewall_mode
140
    for tag in tags:
141
        t = tag.split(":")
142
        if t[0:2] == ["synnefo", "network"]:
143
            if len(t) != 4:
144
                logger.error("Malformed synefo tag %s", tag)
145
                continue
146
            nic_name = t[2]
147
            firewall = t[3]
148
            [nic.setdefault("firewall", firewall)
149
             for nic in nics if nic["name"] == nic_name]
150
    return nics
151

    
152

    
153
class InvalidBackendStatus(Exception):
154
    def __init__(self, status, job):
155
        self.status = status
156
        self.job = job
157

    
158
    def __str__(self):
159
        return repr("Invalid backend status: %s in job %s"
160
                    % (self.status, self.job))
161

    
162

    
163
def prefix_from_name(name):
164
    return name.split('-')[0]
165

    
166

    
167
def get_field(from_, field):
168
    try:
169
        return getattr(from_, field)
170
    except AttributeError:
171
        None
172

    
173

    
174
class JobFileHandler(pyinotify.ProcessEvent):
175
    def __init__(self, logger, cluster_name):
176
        pyinotify.ProcessEvent.__init__(self)
177
        self.logger = logger
178
        self.cluster_name = cluster_name
179

    
180
        # Set max_retries to 0 for unlimited retries.
181
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
182
                                 max_retries=0, logger=logger)
183

    
184
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
185

    
186
        self.client.connect()
187
        handler_logger.info("Connected succesfully")
188

    
189
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
190

    
191
        self.op_handlers = {"INSTANCE": self.process_instance_op,
192
                            "NETWORK": self.process_network_op,
193
                            "CLUSTER": self.process_cluster_op}
194
                            # "GROUP": self.process_group_op}
195

    
196
    def process_IN_CLOSE_WRITE(self, event):
197
        self.process_IN_MOVED_TO(event)
198

    
199
    def process_IN_MOVED_TO(self, event):
200
        jobfile = os.path.join(event.path, event.name)
201
        if not event.name.startswith("job-"):
202
            self.logger.debug("Not a job file: %s" % event.path)
203
            return
204

    
205
        try:
206
            data = utils.ReadFile(jobfile)
207
        except IOError:
208
            return
209

    
210
        data = serializer.LoadJson(data)
211
        job = jqueue._QueuedJob.Restore(None, data, False, False)
212

    
213
        job_id = int(job.id)
214

    
215
        for op in job.ops:
216
            op_id = op.input.OP_ID
217

    
218
            msg = None
219
            try:
220
                handler_fn = self.op_handlers[op_id.split('_')[1]]
221
                msg, routekey = handler_fn(op, job_id)
222
            except KeyError:
223
                pass
224

    
225
            if not msg:
226
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
227
                continue
228

    
229
            # Generate a unique message identifier
230
            event_time = get_time_from_status(op, job)
231

    
232
            # Get the last line of the op log as message
233
            try:
234
                logmsg = op.log[-1][-1]
235
            except IndexError:
236
                logmsg = None
237

    
238
            # Add shared attributes for all operations
239
            msg.update({"event_time": event_time,
240
                        "operation": op_id,
241
                        "status": op.status,
242
                        "cluster": self.cluster_name,
243
                        "logmsg": logmsg,
244
                        "result": op.result,
245
                        "jobId": job_id})
246

    
247
            if op.status == "success":
248
                msg["result"] = op.result
249

    
250
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
251
                # In case an instance creation fails send the job input
252
                # so that the job can be retried if needed.
253
                msg["job_fields"] = op.Serialize()["input"]
254

    
255
            msg = json.dumps(msg)
256

    
257
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
258

    
259
            # Send the message to RabbitMQ
260
            self.client.basic_publish(settings.EXCHANGE_GANETI,
261
                                      routekey,
262
                                      msg)
263

    
264
    def process_instance_op(self, op, job_id):
265
        """ Process OP_INSTANCE_* opcodes.
266

267
        """
268
        input = op.input
269
        op_id = input.OP_ID
270

    
271
        instances = None
272
        instances = get_field(input, 'instance_name')
273
        if not instances:
274
            instances = get_field(input, 'instances')
275
            if not instances or len(instances) > 1:
276
                # Do not publish messages for jobs with no or multiple
277
                # instances.  Currently snf-dispatcher can not normally handle
278
                # these messages
279
                return None, None
280
            else:
281
                instances = instances[0]
282

    
283
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
284
                          instances, op.status)
285

    
286
        job_fields = {}
287
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
288
            job_fields = {"nics": get_field(input, "nics"),
289
                          "disks": get_field(input, "disks"),
290
                          "beparams": get_field(input, "beparams")}
291

    
292
        msg = {"type": "ganeti-op-status",
293
               "instance": instances,
294
               "operation": op_id,
295
               "job_fields": job_fields}
296

    
297
        if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
298
             op.status == "success") or
299
            (op_id == "OP_INSTANCE_SET_PARAMS" and
300
             op.status in ["success", "error", "cancelled"])):
301
                nics = get_instance_nics(msg["instance"], self.logger)
302
                msg["instance_nics"] = nics
303

    
304
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
305

    
306
        return msg, routekey
307

    
308
    def process_network_op(self, op, job_id):
309
        """ Process OP_NETWORK_* opcodes.
310

311
        """
312

    
313
        input = op.input
314
        op_id = input.OP_ID
315
        network_name = get_field(input, 'network_name')
316

    
317
        if not network_name:
318
            return None, None
319

    
320
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
321
                          network_name, op.status)
322

    
323
        job_fields = {
324
            'subnet': get_field(input, 'network'),
325
            'gateway': get_field(input, 'gateway'),
326
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
327
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
328
            # 'network_mode': get_field(input, 'network_mode'),
329
            # 'network_link': get_field(input, 'network_link'),
330
            'group_name': get_field(input, 'group_name')}
331

    
332
        msg = {'operation':    op_id,
333
               'type':         "ganeti-network-status",
334
               'network':      network_name,
335
               'job_fields':   job_fields}
336

    
337
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
338

    
339
        return msg, routekey
340

    
341
    def process_cluster_op(self, op, job_id):
342
        """ Process OP_CLUSTER_* opcodes.
343

344
        """
345

    
346
        input = op.input
347
        op_id = input.OP_ID
348

    
349
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
350

    
351
        if op_id != "OP_CLUSTER_SET_PARAMS":
352
            # Send only modifications of cluster
353
            return None, None
354

    
355
        msg = {'operation':    op_id,
356
               'type':         "ganeti-cluster-status"}
357

    
358
        routekey = "ganeti.event.cluster"
359

    
360
        return msg, routekey
361

    
362

    
363
def find_cluster_name():
364
    global handler_logger
365
    try:
366
        ss = SimpleStore()
367
        name = ss.GetClusterName()
368
    except Exception as e:
369
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
370
        raise e
371

    
372
    return name
373

    
374

    
375
handler_logger = None
376

    
377

    
378
def fatal_signal_handler(signum, frame):
379
    global handler_logger
380

    
381
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
382
                        signum)
383
    raise SystemExit
384

    
385

    
386
def parse_arguments(args):
387
    from optparse import OptionParser
388

    
389
    parser = OptionParser()
390
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
391
                      help="Enable debugging information")
392
    parser.add_option("-l", "--log", dest="log_file",
393
                      default="/var/log/snf-ganeti-eventd.log",
394
                      metavar="FILE",
395
                      help="Write log to FILE instead of %s" %
396
                           "/var/log/snf-ganeti-eventd.log")
397
    parser.add_option('--pid-file', dest="pid_file",
398
                      default="/var/run/snf-ganeti-eventd.pid",
399
                      metavar='PIDFILE',
400
                      help="Save PID to file (default: %s)" %
401
                           "/var/run/snf-ganeti-eventd.pid")
402

    
403
    return parser.parse_args(args)
404

    
405

    
406
def main():
407
    global handler_logger
408

    
409
    (opts, args) = parse_arguments(sys.argv[1:])
410

    
411
    # Initialize logger
412
    lvl = logging.DEBUG if opts.debug else logging.INFO
413
    logger = logging.getLogger("ganeti.eventd")
414
    logger.setLevel(lvl)
415
    formatter = logging.Formatter(
416
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
417
        "%Y-%m-%d %H:%M:%S")
418
    handler = logging.FileHandler(opts.log_file)
419
    handler.setFormatter(formatter)
420
    logger.addHandler(handler)
421
    handler_logger = logger
422

    
423
    # Rename this process so 'ps' output looks like this is a native
424
    # executable.  Can not seperate command-line arguments from actual name of
425
    # the executable by NUL bytes, so only show the name of the executable
426
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
427
    setproctitle.setproctitle(sys.argv[0])
428

    
429
    # Create pidfile
430
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
431

    
432
    # Remove any stale PID files, left behind by previous invocations
433
    if daemon.runner.is_pidfile_stale(pidf):
434
        logger.warning("Removing stale PID lock file %s", pidf.path)
435
        pidf.break_lock()
436

    
437
    # Become a daemon:
438
    # Redirect stdout and stderr to handler.stream to catch
439
    # early errors in the daemonization process [e.g., pidfile creation]
440
    # which will otherwise go to /dev/null.
441
    daemon_context = daemon.DaemonContext(
442
        pidfile=pidf,
443
        umask=022,
444
        stdout=handler.stream,
445
        stderr=handler.stream,
446
        files_preserve=[handler.stream])
447
    try:
448
        daemon_context.open()
449
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
450
        logger.critical("Failed to lock pidfile %s, another instance running?",
451
                        pidf.path)
452
        sys.exit(1)
453

    
454
    logger.info("Became a daemon")
455

    
456
    # Catch signals to ensure graceful shutdown
457
    signal(SIGINT, fatal_signal_handler)
458
    signal(SIGTERM, fatal_signal_handler)
459

    
460
    # Monitor the Ganeti job queue, create and push notifications
461
    wm = pyinotify.WatchManager()
462
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
463
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
464

    
465
    cluster_name = find_cluster_name()
466

    
467
    handler = JobFileHandler(logger, cluster_name)
468
    notifier = pyinotify.Notifier(wm, handler)
469

    
470
    try:
471
        # Fail if adding the inotify() watch fails for any reason
472
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
473
        if res[pathutils.QUEUE_DIR] < 0:
474
            raise Exception("pyinotify add_watch returned negative descriptor")
475

    
476
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
477
                    cluster_name))
478

    
479
        while True:    # loop forever
480
            # process the queue of events as explained above
481
            notifier.process_events()
482
            if notifier.check_events():
483
                # read notified events and enqeue them
484
                notifier.read_events()
485
    except SystemExit:
486
        logger.info("SystemExit")
487
    except:
488
        logger.exception("Caught exception, terminating")
489
    finally:
490
        # destroy the inotify's instance on this interrupt (stop monitoring)
491
        notifier.stop()
492
        raise
493

    
494
if __name__ == "__main__":
495
    sys.exit(main())
496

    
497
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :