Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 90858bda

History | View | Annotate | Download (15.8 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils, jqueue, constants, serializer, cli
60
from ganeti import errors as ganeti_errors
61
from ganeti.ssconf import SimpleConfigReader
62

    
63

    
64
from synnefo import settings
65
from synnefo.lib.amqp import AMQPClient
66

    
67

    
68
def get_time_from_status(op, job):
69
    """Generate a unique message identifier for a ganeti job.
70

71
    The identifier is based on the timestamp of the job. Since a ganeti
72
    job passes from multiple states, we need to pick the timestamp that
73
    corresponds to each state.
74

75
    """
76
    status = op.status
77
    if status == constants.JOB_STATUS_QUEUED:
78
        time = job.received_timestamp
79
    try:  # Compatibility with Ganeti version
80
        if status == constants.JOB_STATUS_WAITLOCK:
81
            time = op.start_timestamp
82
    except AttributeError:
83
        if status == constants.JOB_STATUS_WAITING:
84
            time = op.start_timestamp
85
    if status == constants.JOB_STATUS_CANCELING:
86
        time = op.start_timestamp
87
    if status == constants.JOB_STATUS_RUNNING:
88
        time = op.exec_timestamp
89
    if status in constants.JOBS_FINALIZED:
90
        time = op.end_timestamp
91

    
92
    return time and time or job.end_timestamp
93

    
94
    raise InvalidBackendStatus(status, job)
95

    
96

    
97
def get_instance_nics(instance, logger):
98
    """Query Ganeti to a get the instance's NICs.
99

100
    Get instance's NICs from Ganeti configuration data. If running on master,
101
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
102
    configuration file.
103

104
    @type instance: string
105
    @param instance: the name of the instance
106
    @rtype: List of dicts
107
    @return: Dictionary containing the instance's NICs. Each dictionary
108
             contains the following keys: 'network', 'ip', 'mac', 'mode',
109
             'link' and 'firewall'
110

111
    """
112
    try:
113
        client = cli.GetClient()
114
        fields = ["nic.networks", "nic.ips", "nic.macs", "nic.modes",
115
                  "nic.links", "tags"]
116
        info = client.QueryInstances([instance], fields, use_locking=False)
117
        networks, ips, macs, modes, links, tags = info[0]
118
        nic_keys = ["network", "ip", "mac", "mode", "link"]
119
        nics = zip(networks, ips, macs, modes, links)
120
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
121
    except ganeti_errors.OpPrereqError:
122
        # Not running on master! Load the conf file
123
        raw_data = utils.ReadFile(constants.CLUSTER_CONF_FILE)
124
        config = serializer.LoadJson(raw_data)
125
        i = config["instances"][instance]
126
        nics = []
127
        for nic in i["nics"]:
128
            params = nic.pop("nicparams")
129
            nic["mode"] = params["mode"]
130
            nic["link"] = params["link"]
131
            nics.append(nic)
132
        tags = i.get("tags", [])
133
    # Get firewall from instance Tags
134
    # Tags are of the form synnefo:network:N:firewall_mode
135
    for tag in tags:
136
        t = tag.split(":")
137
        if t[0:2] == ["synnefo", "network"]:
138
            if len(t) != 4:
139
                logger.error("Malformed synefo tag %s", tag)
140
                continue
141
            try:
142
                index = int(t[2])
143
                nics[index]['firewall'] = t[3]
144
            except ValueError:
145
                logger.error("Malformed synnefo tag %s", tag)
146
            except IndexError:
147
                logger.error("Found tag %s for non-existent NIC %d",
148
                             tag, index)
149
    return nics
150

    
151

    
152
class InvalidBackendStatus(Exception):
153
    def __init__(self, status, job):
154
        self.status = status
155
        self.job = job
156

    
157
    def __str__(self):
158
        return repr("Invalid backend status: %s in job %s"
159
                    % (self.status, self.job))
160

    
161

    
162
def prefix_from_name(name):
163
    return name.split('-')[0]
164

    
165

    
166
def get_field(from_, field):
167
    try:
168
        return getattr(from_, field)
169
    except AttributeError:
170
        None
171

    
172

    
173
class JobFileHandler(pyinotify.ProcessEvent):
174
    def __init__(self, logger, cluster_name):
175
        pyinotify.ProcessEvent.__init__(self)
176
        self.logger = logger
177
        self.cluster_name = cluster_name
178

    
179
        # Set max_retries to 0 for unlimited retries.
180
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
181
                                 max_retries=0, logger=logger)
182

    
183
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
184

    
185
        self.client.connect()
186
        handler_logger.info("Connected succesfully")
187

    
188
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
189

    
190
        self.op_handlers = {"INSTANCE": self.process_instance_op,
191
                            "NETWORK": self.process_network_op}
192
                            # "GROUP": self.process_group_op}
193

    
194
    def process_IN_CLOSE_WRITE(self, event):
195
        self.process_IN_MOVED_TO(event)
196

    
197
    def process_IN_MOVED_TO(self, event):
198
        jobfile = os.path.join(event.path, event.name)
199
        if not event.name.startswith("job-"):
200
            self.logger.debug("Not a job file: %s" % event.path)
201
            return
202

    
203
        try:
204
            data = utils.ReadFile(jobfile)
205
        except IOError:
206
            return
207

    
208
        data = serializer.LoadJson(data)
209
        try:  # Compatibility with Ganeti version
210
            job = jqueue._QueuedJob.Restore(None, data, False)
211
        except TypeError:
212
            job = jqueue._QueuedJob.Restore(None, data)
213

    
214
        job_id = int(job.id)
215

    
216
        for op in job.ops:
217
            op_id = op.input.OP_ID
218

    
219
            msg = None
220
            try:
221
                handler_fn = self.op_handlers[op_id.split('_')[1]]
222
                msg, routekey = handler_fn(op, job_id)
223
            except KeyError:
224
                pass
225

    
226
            if not msg:
227
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
228
                continue
229

    
230
            # Generate a unique message identifier
231
            event_time = get_time_from_status(op, job)
232

    
233
            # Get the last line of the op log as message
234
            try:
235
                logmsg = op.log[-1][-1]
236
            except IndexError:
237
                logmsg = None
238

    
239
            # Add shared attributes for all operations
240
            msg.update({"event_time": event_time,
241
                        "operation": op_id,
242
                        "status": op.status,
243
                        "cluster": self.cluster_name,
244
                        "logmsg": logmsg,
245
                        "jobId": job_id})
246

    
247
            if op.status == "success":
248
                msg["result"] = op.result
249

    
250
            if ((op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_STARTUP"] and
251
                 op.status == "success") or
252
                (op_id == "OP_INSTANCE_SET_PARAMS" and
253
                 op.status in ["success", "error", "cancelled"])):
254
                    nics = get_instance_nics(msg["instance"], self.logger)
255
                    msg["nics"] = nics
256

    
257
            msg = json.dumps(msg)
258
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
259

    
260
            # Send the message to RabbitMQ
261
            self.client.basic_publish(settings.EXCHANGE_GANETI,
262
                                      routekey,
263
                                      msg)
264

    
265
    def process_instance_op(self, op, job_id):
266
        """ Process OP_INSTANCE_* opcodes.
267

268
        """
269
        input = op.input
270
        op_id = input.OP_ID
271

    
272
        instances = None
273
        instances = get_field(input, 'instance_name')
274
        if not instances:
275
            instances = get_field(input, 'instances')
276
            if not instances or len(instances) > 1:
277
                # Do not publish messages for jobs with no or multiple
278
                # instances.  Currently snf-dispatcher can not normally handle
279
                # these messages
280
                return None, None
281
            else:
282
                instances = instances[0]
283

    
284
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
285
                          instances, op.status)
286

    
287
        msg = {"type": "ganeti-op-status",
288
               "instance": instances,
289
               "operation": op_id}
290

    
291
        if op_id == "OP_INSTANCE_SET_PARAMS":
292
            beparams = get_field(input, "beparams")
293
            if beparams:
294
                msg["beparams"] = beparams
295

    
296
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
297

    
298
        return msg, routekey
299

    
300
    def process_network_op(self, op, job_id):
301
        """ Process OP_NETWORK_* opcodes.
302

303
        """
304

    
305
        input = op.input
306
        op_id = input.OP_ID
307
        network_name = get_field(input, 'network_name')
308

    
309
        if not network_name:
310
            return None, None
311

    
312
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
313
                          network_name, op.status)
314

    
315
        msg = {'operation':    op_id,
316
               'type':         "ganeti-network-status",
317
               'network':      network_name,
318
               'subnet':       get_field(input, 'network'),
319
               # 'network_mode': get_field(input, 'network_mode'),
320
               # 'network_link': get_field(input, 'network_link'),
321
               'gateway':      get_field(input, 'gateway'),
322
               'group_name':   get_field(input, 'group_name')}
323

    
324
        if op_id == "OP_NETWORK_SET_PARAMS":
325
            msg["add_reserved_ips"] = get_field(input, "add_reserved_ips")
326
            msg["remove_reserved_ips"] = get_field(input,
327
                                                   "remove_reserved_ips")
328
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
329

    
330
        return msg, routekey
331

    
332

    
333
    # def process_group_op(self, op, job_id):
334
    #     """ Process OP_GROUP_* opcodes.
335

    
336
    #     """
337
    #     return None, None
338

    
339

    
340
def find_cluster_name():
341
    global handler_logger
342
    try:
343
        scr = SimpleConfigReader()
344
        name = scr.GetClusterName()
345
    except Exception as e:
346
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
347
        raise e
348

    
349
    return name
350

    
351

    
352
handler_logger = None
353

    
354

    
355
def fatal_signal_handler(signum, frame):
356
    global handler_logger
357

    
358
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
359
                        signum)
360
    raise SystemExit
361

    
362

    
363
def parse_arguments(args):
364
    from optparse import OptionParser
365

    
366
    parser = OptionParser()
367
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
368
                      help="Enable debugging information")
369
    parser.add_option("-l", "--log", dest="log_file",
370
                      default="/var/log/snf-ganeti-eventd.log",
371
                      metavar="FILE",
372
                      help="Write log to FILE instead of %s" %
373
                           "/var/log/snf-ganeti-eventd.log")
374
    parser.add_option('--pid-file', dest="pid_file",
375
                      default="/var/run/snf-ganeti-eventd.pid",
376
                      metavar='PIDFILE',
377
                      help="Save PID to file (default: %s)" %
378
                           "/var/run/snf-ganeti-eventd.pid")
379

    
380
    return parser.parse_args(args)
381

    
382

    
383
def main():
384
    global handler_logger
385

    
386
    (opts, args) = parse_arguments(sys.argv[1:])
387

    
388
    # Initialize logger
389
    lvl = logging.DEBUG if opts.debug else logging.INFO
390
    logger = logging.getLogger("ganeti.eventd")
391
    logger.setLevel(lvl)
392
    formatter = logging.Formatter(
393
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
394
        "%Y-%m-%d %H:%M:%S")
395
    handler = logging.FileHandler(opts.log_file)
396
    handler.setFormatter(formatter)
397
    logger.addHandler(handler)
398
    handler_logger = logger
399

    
400
    # Rename this process so 'ps' output looks like this is a native
401
    # executable.  Can not seperate command-line arguments from actual name of
402
    # the executable by NUL bytes, so only show the name of the executable
403
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
404
    setproctitle.setproctitle(sys.argv[0])
405

    
406
    # Create pidfile
407
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
408

    
409
    # Remove any stale PID files, left behind by previous invocations
410
    if daemon.runner.is_pidfile_stale(pidf):
411
        logger.warning("Removing stale PID lock file %s", pidf.path)
412
        pidf.break_lock()
413

    
414
    # Become a daemon:
415
    # Redirect stdout and stderr to handler.stream to catch
416
    # early errors in the daemonization process [e.g., pidfile creation]
417
    # which will otherwise go to /dev/null.
418
    daemon_context = daemon.DaemonContext(
419
        pidfile=pidf,
420
        umask=022,
421
        stdout=handler.stream,
422
        stderr=handler.stream,
423
        files_preserve=[handler.stream])
424
    try:
425
        daemon_context.open()
426
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
427
        logger.critical("Failed to lock pidfile %s, another instance running?",
428
                        pidf.path)
429
        sys.exit(1)
430

    
431
    logger.info("Became a daemon")
432

    
433
    # Catch signals to ensure graceful shutdown
434
    signal(SIGINT, fatal_signal_handler)
435
    signal(SIGTERM, fatal_signal_handler)
436

    
437
    # Monitor the Ganeti job queue, create and push notifications
438
    wm = pyinotify.WatchManager()
439
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
440
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
441

    
442
    cluster_name = find_cluster_name()
443

    
444
    handler = JobFileHandler(logger, cluster_name)
445
    notifier = pyinotify.Notifier(wm, handler)
446

    
447
    try:
448
        # Fail if adding the inotify() watch fails for any reason
449
        res = wm.add_watch(constants.QUEUE_DIR, mask)
450
        if res[constants.QUEUE_DIR] < 0:
451
            raise Exception("pyinotify add_watch returned negative descriptor")
452

    
453
        logger.info("Now watching %s of %s" %
454
                    (constants.QUEUE_DIR, cluster_name))
455

    
456
        while True:    # loop forever
457
            # process the queue of events as explained above
458
            notifier.process_events()
459
            if notifier.check_events():
460
                # read notified events and enqeue them
461
                notifier.read_events()
462
    except SystemExit:
463
        logger.info("SystemExit")
464
    except:
465
        logger.exception("Caught exception, terminating")
466
    finally:
467
        # destroy the inotify's instance on this interrupt (stop monitoring)
468
        notifier.stop()
469
        raise
470

    
471
if __name__ == "__main__":
472
    sys.exit(main())
473

    
474
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :