Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ d0545590

History | View | Annotate | Download (16.3 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
60
from ganeti import errors as ganeti_errors
61
from ganeti.ssconf import SimpleStore
62

    
63

    
64
from synnefo import settings
65
from synnefo.lib.amqp import AMQPClient
66

    
67

    
68
def get_time_from_status(op, job):
69
    """Generate a unique message identifier for a ganeti job.
70

71
    The identifier is based on the timestamp of the job. Since a ganeti
72
    job passes from multiple states, we need to pick the timestamp that
73
    corresponds to each state.
74

75
    """
76
    status = op.status
77
    if status == constants.JOB_STATUS_QUEUED:
78
        time = job.received_timestamp
79
    try:  # Compatibility with Ganeti version
80
        if status == constants.JOB_STATUS_WAITLOCK:
81
            time = op.start_timestamp
82
    except AttributeError:
83
        if status == constants.JOB_STATUS_WAITING:
84
            time = op.start_timestamp
85
    if status == constants.JOB_STATUS_CANCELING:
86
        time = op.start_timestamp
87
    if status == constants.JOB_STATUS_RUNNING:
88
        time = op.exec_timestamp
89
    if status in constants.JOBS_FINALIZED:
90
        time = op.end_timestamp
91

    
92
    return time and time or job.end_timestamp
93

    
94
    raise InvalidBackendStatus(status, job)
95

    
96

    
97
def get_instance_nics(instance, logger):
98
    """Query Ganeti to a get the instance's NICs.
99

100
    Get instance's NICs from Ganeti configuration data. If running on master,
101
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102
    configuration file.
103

104
    @type instance: string
105
    @param instance: the name of the instance
106
    @rtype: List of dicts
107
    @return: Dictionary containing the instance's NICs. Each dictionary
108
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109
             'link' and 'firewall'
110

111
    """
112
    try:
113
        client = cli.GetClient()
114
        fields = ["nic.names", "nic.networks.names", "nic.ips", "nic.macs",
115
                  "nic.modes", "nic.links", "tags"]
116
        info = client.QueryInstances([instance], fields, use_locking=False)
117
        names, networks, ips, macs, modes, links, tags = info[0]
118
        nic_keys = ["name", "network", "ip", "mac", "mode", "link"]
119
        nics = zip(names, networks, ips, macs, modes, links)
120
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121
    except ganeti_errors.OpPrereqError:
122
        # Not running on master! Load the conf file
123
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124
        config = serializer.LoadJson(raw_data)
125
        i = config["instances"][instance]
126
        nics = []
127
        for nic in i["nics"]:
128
            params = nic.pop("nicparams")
129
            nic["mode"] = params["mode"]
130
            nic["link"] = params["link"]
131
            nics.append(nic)
132
        tags = i.get("tags", [])
133
    # Get firewall from instance Tags
134
    # Tags are of the form synnefo:network:N:firewall_mode
135
    for tag in tags:
136
        t = tag.split(":")
137
        if t[0:2] == ["synnefo", "network"]:
138
            if len(t) != 4:
139
                logger.error("Malformed synefo tag %s", tag)
140
                continue
141
            nic_name = t[2]
142
            firewall = t[3]
143
            [nic.setdefault("firewall", firewall)
144
             for nic in nics if nic["name"] == nic_name]
145
    return nics
146

    
147

    
148
class InvalidBackendStatus(Exception):
149
    def __init__(self, status, job):
150
        self.status = status
151
        self.job = job
152

    
153
    def __str__(self):
154
        return repr("Invalid backend status: %s in job %s"
155
                    % (self.status, self.job))
156

    
157

    
158
def prefix_from_name(name):
159
    return name.split('-')[0]
160

    
161

    
162
def get_field(from_, field):
163
    try:
164
        return getattr(from_, field)
165
    except AttributeError:
166
        None
167

    
168

    
169
class JobFileHandler(pyinotify.ProcessEvent):
170
    def __init__(self, logger, cluster_name):
171
        pyinotify.ProcessEvent.__init__(self)
172
        self.logger = logger
173
        self.cluster_name = cluster_name
174

    
175
        # Set max_retries to 0 for unlimited retries.
176
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
177
                                 max_retries=0, logger=logger)
178

    
179
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
180

    
181
        self.client.connect()
182
        handler_logger.info("Connected succesfully")
183

    
184
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
185

    
186
        self.op_handlers = {"INSTANCE": self.process_instance_op,
187
                            "NETWORK": self.process_network_op,
188
                            "CLUSTER": self.process_cluster_op}
189
                            # "GROUP": self.process_group_op}
190

    
191
    def process_IN_CLOSE_WRITE(self, event):
192
        self.process_IN_MOVED_TO(event)
193

    
194
    def process_IN_MOVED_TO(self, event):
195
        jobfile = os.path.join(event.path, event.name)
196
        if not event.name.startswith("job-"):
197
            self.logger.debug("Not a job file: %s" % event.path)
198
            return
199

    
200
        try:
201
            data = utils.ReadFile(jobfile)
202
        except IOError:
203
            return
204

    
205
        data = serializer.LoadJson(data)
206
        job = jqueue._QueuedJob.Restore(None, data, False, False)
207

    
208
        job_id = int(job.id)
209

    
210
        for op in job.ops:
211
            op_id = op.input.OP_ID
212

    
213
            msg = None
214
            try:
215
                handler_fn = self.op_handlers[op_id.split('_')[1]]
216
                msg, routekey = handler_fn(op, job_id)
217
            except KeyError:
218
                pass
219

    
220
            if not msg:
221
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
222
                continue
223

    
224
            # Generate a unique message identifier
225
            event_time = get_time_from_status(op, job)
226

    
227
            # Get the last line of the op log as message
228
            try:
229
                logmsg = op.log[-1][-1]
230
            except IndexError:
231
                logmsg = None
232

    
233
            # Add shared attributes for all operations
234
            msg.update({"event_time": event_time,
235
                        "operation": op_id,
236
                        "status": op.status,
237
                        "cluster": self.cluster_name,
238
                        "logmsg": logmsg,
239
                        "jobId": job_id})
240

    
241
            if op.status == "success":
242
                msg["result"] = op.result
243

    
244
            if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
245
                 op.status == "success") or
246
                (op_id == "OP_INSTANCE_SET_PARAMS" and
247
                 op.status in ["success", "error", "cancelled"])):
248
                    nics = get_instance_nics(msg["instance"], self.logger)
249
                    msg["nics"] = nics
250

    
251
            msg = json.dumps(msg)
252
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
253

    
254
            # Send the message to RabbitMQ
255
            self.client.basic_publish(settings.EXCHANGE_GANETI,
256
                                      routekey,
257
                                      msg)
258

    
259
    def process_instance_op(self, op, job_id):
260
        """ Process OP_INSTANCE_* opcodes.
261

262
        """
263
        input = op.input
264
        op_id = input.OP_ID
265

    
266
        instances = None
267
        instances = get_field(input, 'instance_name')
268
        if not instances:
269
            instances = get_field(input, 'instances')
270
            if not instances or len(instances) > 1:
271
                # Do not publish messages for jobs with no or multiple
272
                # instances.  Currently snf-dispatcher can not normally handle
273
                # these messages
274
                return None, None
275
            else:
276
                instances = instances[0]
277

    
278
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
279
                          instances, op.status)
280

    
281
        job_fields = {}
282
        if op_id in ["OP_INSTANCE_SET_PARAMS", "OP_INSTANCE_CREATE"]:
283
            job_fields = {"nics": get_field(input, "nics"),
284
                          "disks": get_field(input, "disks"),
285
                          "beparams": get_field(input, "beparams")}
286

    
287
        msg = {"type": "ganeti-op-status",
288
               "instance": instances,
289
               "operation": op_id,
290
               "job_fields": job_fields}
291

    
292
        if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
293
                     "OP_INSTANCE_STARTUP"]:
294
            if op.status == "success":
295
                nics = get_instance_nics(msg["instance"], self.logger)
296
                msg["instance_nics"] = nics
297

    
298
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
299

    
300
        return msg, routekey
301

    
302
    def process_network_op(self, op, job_id):
303
        """ Process OP_NETWORK_* opcodes.
304

305
        """
306

    
307
        input = op.input
308
        op_id = input.OP_ID
309
        network_name = get_field(input, 'network_name')
310

    
311
        if not network_name:
312
            return None, None
313

    
314
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
315
                          network_name, op.status)
316

    
317
        job_fields = {
318
            'subnet': get_field(input, 'network'),
319
            'gateway': get_field(input, 'gateway'),
320
            "add_reserved_ips": get_field(input, "add_reserved_ips"),
321
            "remove_reserved_ips": get_field(input, "remove_reserved_ips"),
322
            # 'network_mode': get_field(input, 'network_mode'),
323
            # 'network_link': get_field(input, 'network_link'),
324
            'group_name': get_field(input, 'group_name')}
325

    
326
        msg = {'operation':    op_id,
327
               'type':         "ganeti-network-status",
328
               'network':      network_name,
329
               'job_fields':   job_fields}
330

    
331
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
332

    
333
        return msg, routekey
334

    
335
    def process_cluster_op(self, op, job_id):
336
        """ Process OP_CLUSTER_* opcodes.
337

338
        """
339

    
340
        input = op.input
341
        op_id = input.OP_ID
342

    
343
        self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
344

    
345
        msg = {'operation':    op_id,
346
               'type':         "ganeti-cluster-status"}
347

    
348
        routekey = "ganeti.event.cluster"
349

    
350
        return msg, routekey
351

    
352

    
353
def find_cluster_name():
354
    global handler_logger
355
    try:
356
        ss = SimpleStore()
357
        name = ss.GetClusterName()
358
    except Exception as e:
359
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
360
        raise e
361

    
362
    return name
363

    
364

    
365
handler_logger = None
366

    
367

    
368
def fatal_signal_handler(signum, frame):
369
    global handler_logger
370

    
371
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
372
                        signum)
373
    raise SystemExit
374

    
375

    
376
def parse_arguments(args):
377
    from optparse import OptionParser
378

    
379
    parser = OptionParser()
380
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
381
                      help="Enable debugging information")
382
    parser.add_option("-l", "--log", dest="log_file",
383
                      default="/var/log/snf-ganeti-eventd.log",
384
                      metavar="FILE",
385
                      help="Write log to FILE instead of %s" %
386
                           "/var/log/snf-ganeti-eventd.log")
387
    parser.add_option('--pid-file', dest="pid_file",
388
                      default="/var/run/snf-ganeti-eventd.pid",
389
                      metavar='PIDFILE',
390
                      help="Save PID to file (default: %s)" %
391
                           "/var/run/snf-ganeti-eventd.pid")
392

    
393
    return parser.parse_args(args)
394

    
395

    
396
def main():
397
    global handler_logger
398

    
399
    (opts, args) = parse_arguments(sys.argv[1:])
400

    
401
    # Initialize logger
402
    lvl = logging.DEBUG if opts.debug else logging.INFO
403
    logger = logging.getLogger("ganeti.eventd")
404
    logger.setLevel(lvl)
405
    formatter = logging.Formatter(
406
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
407
        "%Y-%m-%d %H:%M:%S")
408
    handler = logging.FileHandler(opts.log_file)
409
    handler.setFormatter(formatter)
410
    logger.addHandler(handler)
411
    handler_logger = logger
412

    
413
    # Rename this process so 'ps' output looks like this is a native
414
    # executable.  Can not seperate command-line arguments from actual name of
415
    # the executable by NUL bytes, so only show the name of the executable
416
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
417
    setproctitle.setproctitle(sys.argv[0])
418

    
419
    # Create pidfile
420
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
421

    
422
    # Remove any stale PID files, left behind by previous invocations
423
    if daemon.runner.is_pidfile_stale(pidf):
424
        logger.warning("Removing stale PID lock file %s", pidf.path)
425
        pidf.break_lock()
426

    
427
    # Become a daemon:
428
    # Redirect stdout and stderr to handler.stream to catch
429
    # early errors in the daemonization process [e.g., pidfile creation]
430
    # which will otherwise go to /dev/null.
431
    daemon_context = daemon.DaemonContext(
432
        pidfile=pidf,
433
        umask=022,
434
        stdout=handler.stream,
435
        stderr=handler.stream,
436
        files_preserve=[handler.stream])
437
    try:
438
        daemon_context.open()
439
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
440
        logger.critical("Failed to lock pidfile %s, another instance running?",
441
                        pidf.path)
442
        sys.exit(1)
443

    
444
    logger.info("Became a daemon")
445

    
446
    # Catch signals to ensure graceful shutdown
447
    signal(SIGINT, fatal_signal_handler)
448
    signal(SIGTERM, fatal_signal_handler)
449

    
450
    # Monitor the Ganeti job queue, create and push notifications
451
    wm = pyinotify.WatchManager()
452
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
453
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
454

    
455
    cluster_name = find_cluster_name()
456

    
457
    handler = JobFileHandler(logger, cluster_name)
458
    notifier = pyinotify.Notifier(wm, handler)
459

    
460
    try:
461
        # Fail if adding the inotify() watch fails for any reason
462
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
463
        if res[pathutils.QUEUE_DIR] < 0:
464
            raise Exception("pyinotify add_watch returned negative descriptor")
465

    
466
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
467
                    cluster_name))
468

    
469
        while True:    # loop forever
470
            # process the queue of events as explained above
471
            notifier.process_events()
472
            if notifier.check_events():
473
                # read notified events and enqeue them
474
                notifier.read_events()
475
    except SystemExit:
476
        logger.info("SystemExit")
477
    except:
478
        logger.exception("Caught exception, terminating")
479
    finally:
480
        # destroy the inotify's instance on this interrupt (stop monitoring)
481
        notifier.stop()
482
        raise
483

    
484
if __name__ == "__main__":
485
    sys.exit(main())
486

    
487
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :