Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ ffc4b474

History | View | Annotate | Download (16 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48
# Since Ganeti 2.7, debian package ships the majority of the python code in
49
# a private module under '/usr/share/ganeti'. Add this directory to path
50
# in order to be able to import ganeti. Also, add it to the start of path
51
# to allow conflicts with Ganeti RAPI client.
52
sys.path.insert(0, "/usr/share/ganeti")
53

    
54
import json
55
import logging
56
import pyinotify
57
import daemon
58
import daemon.pidlockfile
59
import daemon.runner
60
from lockfile import LockTimeout
61
from signal import signal, SIGINT, SIGTERM
62
import setproctitle
63

    
64
from ganeti import utils, jqueue, constants, serializer, pathutils, cli
65
from ganeti import errors as ganeti_errors
66
from ganeti.ssconf import SimpleStore
67

    
68

    
69
from synnefo import settings
70
from synnefo.lib.amqp import AMQPClient
71

    
72

    
73
def get_time_from_status(op, job):
74
    """Generate a unique message identifier for a ganeti job.
75

76
    The identifier is based on the timestamp of the job. Since a ganeti
77
    job passes from multiple states, we need to pick the timestamp that
78
    corresponds to each state.
79

80
    """
81
    status = op.status
82
    if status == constants.JOB_STATUS_QUEUED:
83
        time = job.received_timestamp
84
    try:  # Compatibility with Ganeti version
85
        if status == constants.JOB_STATUS_WAITLOCK:
86
            time = op.start_timestamp
87
    except AttributeError:
88
        if status == constants.JOB_STATUS_WAITING:
89
            time = op.start_timestamp
90
    if status == constants.JOB_STATUS_CANCELING:
91
        time = op.start_timestamp
92
    if status == constants.JOB_STATUS_RUNNING:
93
        time = op.exec_timestamp
94
    if status in constants.JOBS_FINALIZED:
95
        time = op.end_timestamp
96

    
97
    return time and time or job.end_timestamp
98

    
99
    raise InvalidBackendStatus(status, job)
100

    
101

    
102
def get_instance_nics(instance, logger):
103
    """Query Ganeti to a get the instance's NICs.
104

105
    Get instance's NICs from Ganeti configuration data. If running on master,
106
    query Ganeti via Ganeti CLI client. Otherwise, get the nics from Ganeti
107
    configuration file.
108

109
    @type instance: string
110
    @param instance: the name of the instance
111
    @rtype: List of dicts
112
    @return: Dictionary containing the instance's NICs. Each dictionary
113
             contains the following keys: 'network', 'ip', 'mac', 'mode',
114
             'link' and 'firewall'
115

116
    """
117
    try:
118
        client = cli.GetClient()
119
        fields = ["nic.networks.names", "nic.ips", "nic.macs", "nic.modes",
120
                  "nic.links", "tags"]
121
        info = client.QueryInstances([instance], fields, use_locking=False)
122
        networks, ips, macs, modes, links, tags = info[0]
123
        nic_keys = ["network", "ip", "mac", "mode", "link"]
124
        nics = zip(networks, ips, macs, modes, links)
125
        nics = map(lambda x: dict(zip(nic_keys, x)), nics)
126
    except ganeti_errors.OpPrereqError:
127
        # Not running on master! Load the conf file
128
        raw_data = utils.ReadFile(pathutils.CLUSTER_CONF_FILE)
129
        config = serializer.LoadJson(raw_data)
130
        i = config["instances"][instance]
131
        nics = []
132
        for nic in i["nics"]:
133
            params = nic.pop("nicparams")
134
            nic["mode"] = params["mode"]
135
            nic["link"] = params["link"]
136
            nics.append(nic)
137
        tags = i.get("tags", [])
138
    # Get firewall from instance Tags
139
    # Tags are of the form synnefo:network:N:firewall_mode
140
    for tag in tags:
141
        t = tag.split(":")
142
        if t[0:2] == ["synnefo", "network"]:
143
            if len(t) != 4:
144
                logger.error("Malformed synefo tag %s", tag)
145
                continue
146
            try:
147
                index = int(t[2])
148
                nics[index]['firewall'] = t[3]
149
            except ValueError:
150
                logger.error("Malformed synnefo tag %s", tag)
151
            except IndexError:
152
                logger.error("Found tag %s for non-existent NIC %d",
153
                             tag, index)
154
    return nics
155

    
156

    
157
class InvalidBackendStatus(Exception):
158
    def __init__(self, status, job):
159
        self.status = status
160
        self.job = job
161

    
162
    def __str__(self):
163
        return repr("Invalid backend status: %s in job %s"
164
                    % (self.status, self.job))
165

    
166

    
167
def prefix_from_name(name):
168
    return name.split('-')[0]
169

    
170

    
171
def get_field(from_, field):
172
    try:
173
        return getattr(from_, field)
174
    except AttributeError:
175
        None
176

    
177

    
178
class JobFileHandler(pyinotify.ProcessEvent):
179
    def __init__(self, logger, cluster_name):
180
        pyinotify.ProcessEvent.__init__(self)
181
        self.logger = logger
182
        self.cluster_name = cluster_name
183

    
184
        # Set max_retries to 0 for unlimited retries.
185
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
186
                                 max_retries=0, logger=logger)
187

    
188
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
189

    
190
        self.client.connect()
191
        handler_logger.info("Connected succesfully")
192

    
193
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
194

    
195
        self.op_handlers = {"INSTANCE": self.process_instance_op,
196
                            "NETWORK": self.process_network_op}
197
                            # "GROUP": self.process_group_op}
198

    
199
    def process_IN_CLOSE_WRITE(self, event):
200
        self.process_IN_MOVED_TO(event)
201

    
202
    def process_IN_MOVED_TO(self, event):
203
        jobfile = os.path.join(event.path, event.name)
204
        if not event.name.startswith("job-"):
205
            self.logger.debug("Not a job file: %s" % event.path)
206
            return
207

    
208
        try:
209
            data = utils.ReadFile(jobfile)
210
        except IOError:
211
            return
212

    
213
        data = serializer.LoadJson(data)
214
        job = jqueue._QueuedJob.Restore(None, data, False, False)
215

    
216
        job_id = int(job.id)
217

    
218
        for op in job.ops:
219
            op_id = op.input.OP_ID
220

    
221
            msg = None
222
            try:
223
                handler_fn = self.op_handlers[op_id.split('_')[1]]
224
                msg, routekey = handler_fn(op, job_id)
225
            except KeyError:
226
                pass
227

    
228
            if not msg:
229
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
230
                continue
231

    
232
            # Generate a unique message identifier
233
            event_time = get_time_from_status(op, job)
234

    
235
            # Get the last line of the op log as message
236
            try:
237
                logmsg = op.log[-1][-1]
238
            except IndexError:
239
                logmsg = None
240

    
241
            # Add shared attributes for all operations
242
            msg.update({"event_time": event_time,
243
                        "operation": op_id,
244
                        "status": op.status,
245
                        "cluster": self.cluster_name,
246
                        "logmsg": logmsg,
247
                        "result": op.result,
248
                        "jobId": job_id})
249

    
250
            if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
251
                         "OP_INSTANCE_STARTUP"]:
252
                if op.status == "success":
253
                    nics = get_instance_nics(msg["instance"], self.logger)
254
                    msg["nics"] = nics
255

    
256
            if op_id == "OP_INSTANCE_CREATE" and op.status == "error":
257
                # In case an instance creation fails send the job input
258
                # so that the job can be retried if needed.
259
                msg["job_fields"] = op.Serialize()["input"]
260

    
261
            msg = json.dumps(msg)
262

    
263
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
264

    
265
            # Send the message to RabbitMQ
266
            self.client.basic_publish(settings.EXCHANGE_GANETI,
267
                                      routekey,
268
                                      msg)
269

    
270
    def process_instance_op(self, op, job_id):
271
        """ Process OP_INSTANCE_* opcodes.
272

273
        """
274
        input = op.input
275
        op_id = input.OP_ID
276

    
277
        instances = None
278
        instances = get_field(input, 'instance_name')
279
        if not instances:
280
            instances = get_field(input, 'instances')
281
            if not instances or len(instances) > 1:
282
                # Do not publish messages for jobs with no or multiple
283
                # instances.  Currently snf-dispatcher can not normally handle
284
                # these messages
285
                return None, None
286
            else:
287
                instances = instances[0]
288

    
289
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
290
                          instances, op.status)
291

    
292
        msg = {"type": "ganeti-op-status",
293
               "instance": instances,
294
               "operation": op_id}
295

    
296
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
297

    
298
        return msg, routekey
299

    
300
    def process_network_op(self, op, job_id):
301
        """ Process OP_NETWORK_* opcodes.
302

303
        """
304

    
305
        input = op.input
306
        op_id = input.OP_ID
307
        network_name = get_field(input, 'network_name')
308

    
309
        if not network_name:
310
            return None, None
311

    
312
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
313
                          network_name, op.status)
314

    
315
        msg = {'operation':    op_id,
316
               'type':         "ganeti-network-status",
317
               'network':      network_name,
318
               'subnet':       get_field(input, 'network'),
319
               # 'network_mode': get_field(input, 'network_mode'),
320
               # 'network_link': get_field(input, 'network_link'),
321
               'gateway':      get_field(input, 'gateway'),
322
               'group_name':   get_field(input, 'group_name')}
323

    
324
        if op_id == "OP_NETWORK_SET_PARAMS":
325
            msg["add_reserved_ips"] = get_field(input, "add_reserved_ips")
326
            msg["remove_reserved_ips"] = get_field(input,
327
                                                   "remove_reserved_ips")
328
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
329

    
330
        return msg, routekey
331

    
332

    
333
    # def process_group_op(self, op, job_id):
334
    #     """ Process OP_GROUP_* opcodes.
335

    
336
    #     """
337
    #     return None, None
338

    
339

    
340
def find_cluster_name():
341
    global handler_logger
342
    try:
343
        ss = SimpleStore()
344
        name = ss.GetClusterName()
345
    except Exception as e:
346
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
347
        raise e
348

    
349
    return name
350

    
351

    
352
handler_logger = None
353

    
354

    
355
def fatal_signal_handler(signum, frame):
356
    global handler_logger
357

    
358
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
359
                        signum)
360
    raise SystemExit
361

    
362

    
363
def parse_arguments(args):
364
    from optparse import OptionParser
365

    
366
    parser = OptionParser()
367
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
368
                      help="Enable debugging information")
369
    parser.add_option("-l", "--log", dest="log_file",
370
                      default="/var/log/snf-ganeti-eventd.log",
371
                      metavar="FILE",
372
                      help="Write log to FILE instead of %s" %
373
                           "/var/log/snf-ganeti-eventd.log")
374
    parser.add_option('--pid-file', dest="pid_file",
375
                      default="/var/run/snf-ganeti-eventd.pid",
376
                      metavar='PIDFILE',
377
                      help="Save PID to file (default: %s)" %
378
                           "/var/run/snf-ganeti-eventd.pid")
379

    
380
    return parser.parse_args(args)
381

    
382

    
383
def main():
384
    global handler_logger
385

    
386
    (opts, args) = parse_arguments(sys.argv[1:])
387

    
388
    # Initialize logger
389
    lvl = logging.DEBUG if opts.debug else logging.INFO
390
    logger = logging.getLogger("ganeti.eventd")
391
    logger.setLevel(lvl)
392
    formatter = logging.Formatter(
393
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
394
        "%Y-%m-%d %H:%M:%S")
395
    handler = logging.FileHandler(opts.log_file)
396
    handler.setFormatter(formatter)
397
    logger.addHandler(handler)
398
    handler_logger = logger
399

    
400
    # Rename this process so 'ps' output looks like this is a native
401
    # executable.  Can not seperate command-line arguments from actual name of
402
    # the executable by NUL bytes, so only show the name of the executable
403
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
404
    setproctitle.setproctitle(sys.argv[0])
405

    
406
    # Create pidfile
407
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
408

    
409
    # Remove any stale PID files, left behind by previous invocations
410
    if daemon.runner.is_pidfile_stale(pidf):
411
        logger.warning("Removing stale PID lock file %s", pidf.path)
412
        pidf.break_lock()
413

    
414
    # Become a daemon:
415
    # Redirect stdout and stderr to handler.stream to catch
416
    # early errors in the daemonization process [e.g., pidfile creation]
417
    # which will otherwise go to /dev/null.
418
    daemon_context = daemon.DaemonContext(
419
        pidfile=pidf,
420
        umask=022,
421
        stdout=handler.stream,
422
        stderr=handler.stream,
423
        files_preserve=[handler.stream])
424
    try:
425
        daemon_context.open()
426
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
427
        logger.critical("Failed to lock pidfile %s, another instance running?",
428
                        pidf.path)
429
        sys.exit(1)
430

    
431
    logger.info("Became a daemon")
432

    
433
    # Catch signals to ensure graceful shutdown
434
    signal(SIGINT, fatal_signal_handler)
435
    signal(SIGTERM, fatal_signal_handler)
436

    
437
    # Monitor the Ganeti job queue, create and push notifications
438
    wm = pyinotify.WatchManager()
439
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
440
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
441

    
442
    cluster_name = find_cluster_name()
443

    
444
    handler = JobFileHandler(logger, cluster_name)
445
    notifier = pyinotify.Notifier(wm, handler)
446

    
447
    try:
448
        # Fail if adding the inotify() watch fails for any reason
449
        res = wm.add_watch(pathutils.QUEUE_DIR, mask)
450
        if res[pathutils.QUEUE_DIR] < 0:
451
            raise Exception("pyinotify add_watch returned negative descriptor")
452

    
453
        logger.info("Now watching %s of %s" % (pathutils.QUEUE_DIR,
454
                    cluster_name))
455

    
456
        while True:    # loop forever
457
            # process the queue of events as explained above
458
            notifier.process_events()
459
            if notifier.check_events():
460
                # read notified events and enqeue them
461
                notifier.read_events()
462
    except SystemExit:
463
        logger.info("SystemExit")
464
    except:
465
        logger.exception("Caught exception, terminating")
466
    finally:
467
        # destroy the inotify's instance on this interrupt (stop monitoring)
468
        notifier.stop()
469
        raise
470

    
471
if __name__ == "__main__":
472
    sys.exit(main())
473

    
474
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :