Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 449787d3

History | View | Annotate | Download (14.8 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils, jqueue, constants, serializer, cli
60
from ganeti.ssconf import SimpleConfigReader
61

    
62

    
63
from synnefo import settings
64
from synnefo.lib.amqp import AMQPClient
65

    
66

    
67
def get_time_from_status(op, job):
68
    """Generate a unique message identifier for a ganeti job.
69

70
    The identifier is based on the timestamp of the job. Since a ganeti
71
    job passes from multiple states, we need to pick the timestamp that
72
    corresponds to each state.
73

74
    """
75
    status = op.status
76
    if status == constants.JOB_STATUS_QUEUED:
77
        time = job.received_timestamp
78
    try:  # Compatibility with Ganeti version
79
        if status == constants.JOB_STATUS_WAITLOCK:
80
            time = op.start_timestamp
81
    except AttributeError:
82
        if status == constants.JOB_STATUS_WAITING:
83
            time = op.start_timestamp
84
    if status == constants.JOB_STATUS_CANCELING:
85
        time = op.start_timestamp
86
    if status == constants.JOB_STATUS_RUNNING:
87
        time = op.exec_timestamp
88
    if status in constants.JOBS_FINALIZED:
89
        time = op.end_timestamp
90

    
91
    return time and time or job.end_timestamp
92

    
93
    raise InvalidBackendStatus(status, job)
94

    
95

    
96
def get_instance_nics(instance, logger):
97
    """Query Ganeti to a get the instance's NICs.
98

99
    @type instance: string
100
    @param instance: the name of the instance
101
    @rtype: List of dicts
102
    @retrun: Dictionary containing the instance's NICs. Each dictionary
103
             contains the following keys: 'network', 'ip', 'mac', 'mode',
104
             'link' and 'firewall'
105

106
    """
107
    fields = ["nic.networks", "nic.ips", "nic.macs", "nic.modes", "nic.links",
108
              "tags"]
109
    # Get Ganeti client
110
    client = cli.GetClient()
111
    info = client.QueryInstances([instance], fields, use_locking=False)
112
    networks, ips, macs, modes, links, tags = info[0]
113
    nic_keys = ["network", "ip", "mac", "mode", "link"]
114
    nics = zip(networks, ips, macs, modes, links)
115
    nics = map(lambda x: dict(zip(nic_keys, x)), nics)
116
    # Get firewall from instance Tags
117
    # Tags are of the form synnefo:network:N:firewall_mode
118
    for tag in tags:
119
        t = tag.split(":")
120
        if t[0:2] == ["synnefo", "network"]:
121
            if len(t) != 4:
122
                logger.error("Malformed synefo tag %s", tag)
123
                continue
124
            try:
125
                index = int(t[2])
126
                nics[index]['firewall'] = t[3]
127
            except ValueError:
128
                logger.error("Malformed synnefo tag %s", tag)
129
            except IndexError:
130
                logger.error("Found tag %s for non-existent NIC %d",
131
                             tag, index)
132
    return nics
133

    
134

    
135
class InvalidBackendStatus(Exception):
136
    def __init__(self, status, job):
137
        self.status = status
138
        self.job = job
139

    
140
    def __str__(self):
141
        return repr("Invalid backend status: %s in job %s"
142
                    % (self.status, self.job))
143

    
144

    
145
def prefix_from_name(name):
146
    return name.split('-')[0]
147

    
148

    
149
def get_field(from_, field):
150
    try:
151
        return getattr(from_, field)
152
    except AttributeError:
153
        None
154

    
155

    
156
class JobFileHandler(pyinotify.ProcessEvent):
157
    def __init__(self, logger, cluster_name):
158
        pyinotify.ProcessEvent.__init__(self)
159
        self.logger = logger
160
        self.cluster_name = cluster_name
161

    
162
        # Set max_retries to 0 for unlimited retries.
163
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
164
                                 max_retries=0, logger=logger)
165

    
166
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
167

    
168
        self.client.connect()
169
        handler_logger.info("Connected succesfully")
170

    
171
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
172

    
173
        self.op_handlers = {"INSTANCE": self.process_instance_op,
174
                            "NETWORK": self.process_network_op}
175
                            # "GROUP": self.process_group_op}
176

    
177
    def process_IN_CLOSE_WRITE(self, event):
178
        self.process_IN_MOVED_TO(event)
179

    
180
    def process_IN_MOVED_TO(self, event):
181
        jobfile = os.path.join(event.path, event.name)
182
        if not event.name.startswith("job-"):
183
            self.logger.debug("Not a job file: %s" % event.path)
184
            return
185

    
186
        try:
187
            data = utils.ReadFile(jobfile)
188
        except IOError:
189
            return
190

    
191
        data = serializer.LoadJson(data)
192
        try:  # Compatibility with Ganeti version
193
            job = jqueue._QueuedJob.Restore(None, data, False)
194
        except TypeError:
195
            job = jqueue._QueuedJob.Restore(None, data)
196

    
197
        job_id = int(job.id)
198

    
199
        for op in job.ops:
200
            op_id = op.input.OP_ID
201

    
202
            msg = None
203
            try:
204
                handler_fn = self.op_handlers[op_id.split('_')[1]]
205
                msg, routekey = handler_fn(op, job_id)
206
            except KeyError:
207
                pass
208

    
209
            if not msg:
210
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
211
                continue
212

    
213
            # Generate a unique message identifier
214
            event_time = get_time_from_status(op, job)
215

    
216
            # Get the last line of the op log as message
217
            try:
218
                logmsg = op.log[-1][-1]
219
            except IndexError:
220
                logmsg = None
221

    
222
            # Add shared attributes for all operations
223
            msg.update({"event_time": event_time,
224
                        "operation": op_id,
225
                        "status": op.status,
226
                        "cluster": self.cluster_name,
227
                        "logmsg": logmsg,
228
                        "jobId": job_id})
229

    
230
            if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
231
                         "OP_INSTANCE_STARTUP"]:
232
                if op.status == "success":
233
                    nics = get_instance_nics(msg["instance"], self.logger)
234
                    msg["nics"] = nics
235

    
236
            msg = json.dumps(msg)
237
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
238

    
239
            # Send the message to RabbitMQ
240
            self.client.basic_publish(settings.EXCHANGE_GANETI,
241
                                      routekey,
242
                                      msg)
243

    
244
    def process_instance_op(self, op, job_id):
245
        """ Process OP_INSTANCE_* opcodes.
246

247
        """
248
        input = op.input
249
        op_id = input.OP_ID
250

    
251
        instances = None
252
        instances = get_field(input, 'instance_name')
253
        if not instances:
254
            instances = get_field(input, 'instances')
255
            if not instances or len(instances) > 1:
256
                # Do not publish messages for jobs with no or multiple
257
                # instances.  Currently snf-dispatcher can not normally handle
258
                # these messages
259
                return None, None
260
            else:
261
                instances = instances[0]
262

    
263
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
264
                          instances, op.status)
265

    
266
        msg = {"type": "ganeti-op-status",
267
               "instance": instances,
268
               "operation": op_id}
269

    
270
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
271

    
272
        return msg, routekey
273

    
274
    def process_network_op(self, op, job_id):
275
        """ Process OP_NETWORK_* opcodes.
276

277
        """
278

    
279
        input = op.input
280
        op_id = input.OP_ID
281
        network_name = get_field(input, 'network_name')
282

    
283
        if not network_name:
284
            return None, None
285

    
286
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
287
                          network_name, op.status)
288

    
289
        msg = {'operation':    op_id,
290
               'type':         "ganeti-network-status",
291
               'network':      network_name,
292
               'subnet':       get_field(input, 'network'),
293
               # 'network_mode': get_field(input, 'network_mode'),
294
               # 'network_link': get_field(input, 'network_link'),
295
               'gateway':      get_field(input, 'gateway'),
296
               'group_name':   get_field(input, 'group_name')}
297

    
298
        if op_id == "OP_NETWORK_SET_PARAMS":
299
            msg["add_reserved_ips"] = get_field(input, "add_reserved_ips")
300
            msg["remove_reserved_ips"] = get_field(input,
301
                                                   "remove_reserved_ips")
302
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
303

    
304
        return msg, routekey
305

    
306

    
307
    # def process_group_op(self, op, job_id):
308
    #     """ Process OP_GROUP_* opcodes.
309

    
310
    #     """
311
    #     return None, None
312

    
313

    
314
def find_cluster_name():
315
    global handler_logger
316
    try:
317
        scr = SimpleConfigReader()
318
        name = scr.GetClusterName()
319
    except Exception as e:
320
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
321
        raise e
322

    
323
    return name
324

    
325

    
326
handler_logger = None
327

    
328

    
329
def fatal_signal_handler(signum, frame):
330
    global handler_logger
331

    
332
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
333
                        signum)
334
    raise SystemExit
335

    
336

    
337
def parse_arguments(args):
338
    from optparse import OptionParser
339

    
340
    parser = OptionParser()
341
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
342
                      help="Enable debugging information")
343
    parser.add_option("-l", "--log", dest="log_file",
344
                      default="/var/log/snf-ganeti-eventd.log",
345
                      metavar="FILE",
346
                      help="Write log to FILE instead of %s" %
347
                           "/var/log/snf-ganeti-eventd.log")
348
    parser.add_option('--pid-file', dest="pid_file",
349
                      default="/var/run/snf-ganeti-eventd.pid",
350
                      metavar='PIDFILE',
351
                      help="Save PID to file (default: %s)" %
352
                           "/var/run/snf-ganeti-eventd.pid")
353

    
354
    return parser.parse_args(args)
355

    
356

    
357
def main():
358
    global handler_logger
359

    
360
    (opts, args) = parse_arguments(sys.argv[1:])
361

    
362
    # Initialize logger
363
    lvl = logging.DEBUG if opts.debug else logging.INFO
364
    logger = logging.getLogger("ganeti.eventd")
365
    logger.setLevel(lvl)
366
    formatter = logging.Formatter(
367
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
368
        "%Y-%m-%d %H:%M:%S")
369
    handler = logging.FileHandler(opts.log_file)
370
    handler.setFormatter(formatter)
371
    logger.addHandler(handler)
372
    handler_logger = logger
373

    
374
    # Rename this process so 'ps' output looks like this is a native
375
    # executable.  Can not seperate command-line arguments from actual name of
376
    # the executable by NUL bytes, so only show the name of the executable
377
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
378
    setproctitle.setproctitle(sys.argv[0])
379

    
380
    # Create pidfile
381
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
382

    
383
    # Remove any stale PID files, left behind by previous invocations
384
    if daemon.runner.is_pidfile_stale(pidf):
385
        logger.warning("Removing stale PID lock file %s", pidf.path)
386
        pidf.break_lock()
387

    
388
    # Become a daemon:
389
    # Redirect stdout and stderr to handler.stream to catch
390
    # early errors in the daemonization process [e.g., pidfile creation]
391
    # which will otherwise go to /dev/null.
392
    daemon_context = daemon.DaemonContext(
393
        pidfile=pidf,
394
        umask=022,
395
        stdout=handler.stream,
396
        stderr=handler.stream,
397
        files_preserve=[handler.stream])
398
    try:
399
        daemon_context.open()
400
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
401
        logger.critical("Failed to lock pidfile %s, another instance running?",
402
                        pidf.path)
403
        sys.exit(1)
404

    
405
    logger.info("Became a daemon")
406

    
407
    # Catch signals to ensure graceful shutdown
408
    signal(SIGINT, fatal_signal_handler)
409
    signal(SIGTERM, fatal_signal_handler)
410

    
411
    # Monitor the Ganeti job queue, create and push notifications
412
    wm = pyinotify.WatchManager()
413
    mask = (pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] |
414
            pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"])
415

    
416
    cluster_name = find_cluster_name()
417

    
418
    handler = JobFileHandler(logger, cluster_name)
419
    notifier = pyinotify.Notifier(wm, handler)
420

    
421
    try:
422
        # Fail if adding the inotify() watch fails for any reason
423
        res = wm.add_watch(constants.QUEUE_DIR, mask)
424
        if res[constants.QUEUE_DIR] < 0:
425
            raise Exception("pyinotify add_watch returned negative descriptor")
426

    
427
        logger.info("Now watching %s of %s" %
428
                    (constants.QUEUE_DIR, cluster_name))
429

    
430
        while True:    # loop forever
431
            # process the queue of events as explained above
432
            notifier.process_events()
433
            if notifier.check_events():
434
                # read notified events and enqeue them
435
                notifier.read_events()
436
    except SystemExit:
437
        logger.info("SystemExit")
438
    except:
439
        logger.exception("Caught exception, terminating")
440
    finally:
441
        # destroy the inotify's instance on this interrupt (stop monitoring)
442
        notifier.stop()
443
        raise
444

    
445
if __name__ == "__main__":
446
    sys.exit(main())
447

    
448
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :