Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 0e1f3323

History | View | Annotate | Download (14.8 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils, jqueue, constants, serializer, cli
60
from ganeti.ssconf import SimpleConfigReader
61

    
62

    
63
from synnefo import settings
64
from synnefo.lib.amqp import AMQPClient
65

    
66

    
67
def get_time_from_status(op, job):
68
    """Generate a unique message identifier for a ganeti job.
69

70
    The identifier is based on the timestamp of the job. Since a ganeti
71
    job passes from multiple states, we need to pick the timestamp that
72
    corresponds to each state.
73

74
    """
75
    status = op.status
76
    if status == constants.JOB_STATUS_QUEUED:
77
        time = job.received_timestamp
78
    try:  # Compatibility with Ganeti version
79
        if status == constants.JOB_STATUS_WAITLOCK:
80
            time = op.start_timestamp
81
    except AttributeError:
82
        if status == constants.JOB_STATUS_WAITING:
83
            time = op.start_timestamp
84
    if status == constants.JOB_STATUS_CANCELING:
85
        time = op.start_timestamp
86
    if status == constants.JOB_STATUS_RUNNING:
87
        time = op.exec_timestamp
88
    if status in constants.JOBS_FINALIZED:
89
        time = op.end_timestamp
90

    
91
    return time and time or job.end_timestamp
92

    
93
    raise InvalidBackendStatus(status, job)
94

    
95

    
96
def get_instance_nics(instance, logger):
97
    """Query Ganeti to a get the instance's NICs.
98

99
    @type instance: string
100
    @param instance: the name of the instance
101
    @rtype: List of dicts
102
    @retrun: Dictionary containing the instance's NICs. Each dictionary
103
             contains the following keys: 'network', 'ip', 'mac', 'mode',
104
             'link' and 'firewall'
105

106
    """
107
    fields = ["nic.networks", "nic.ips", "nic.macs", "nic.modes", "nic.links",
108
              "tags"]
109
    # Get Ganeti client
110
    client = cli.GetClient()
111
    info = client.QueryInstances([instance], fields, use_locking=False)
112
    networks, ips, macs, modes, links, tags = info[0]
113
    nic_keys = ["network", "ip", "mac", "mode", "link"]
114
    nics = zip(networks, ips, macs, modes, links)
115
    nics = map(lambda x: dict(zip(nic_keys, x)), nics)
116
    # Get firewall from instance Tags
117
    # Tags are of the form synnefo:network:N:firewall_mode
118
    for tag in tags:
119
        t = tag.split(":")
120
        if t[0:2] == ["synnefo", "network"]:
121
            if len(t) != 4:
122
                logger.error("Malformed synefo tag %s", tag)
123
                continue
124
            try:
125
                index = int(t[2])
126
                nics[index]['firewall'] = t[3]
127
            except ValueError:
128
                logger.error("Malformed synnefo tag %s", tag)
129
            except IndexError:
130
                logger.error("Found tag %s for non-existent NIC %d",
131
                             tag, index)
132
    return nics
133

    
134

    
135
class InvalidBackendStatus(Exception):
136
    def __init__(self, status, job):
137
        self.status = status
138
        self.job = job
139

    
140
    def __str__(self):
141
        return repr("Invalid backend status: %s in job %s"
142
                    % (self.status, self.job))
143

    
144

    
145
def prefix_from_name(name):
146
    return name.split('-')[0]
147

    
148

    
149
def get_field(from_, field):
150
    try:
151
        return getattr(from_, field)
152
    except AttributeError:
153
        None
154

    
155

    
156
class JobFileHandler(pyinotify.ProcessEvent):
157
    def __init__(self, logger, cluster_name):
158
        pyinotify.ProcessEvent.__init__(self)
159
        self.logger = logger
160
        self.cluster_name = cluster_name
161

    
162
        # Set max_retries to 0 for unlimited retries.
163
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
164
                                 max_retries=0, logger=logger)
165

    
166
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
167

    
168
        self.client.connect()
169
        handler_logger.info("Connected succesfully")
170

    
171
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
172

    
173
        self.op_handlers = {"INSTANCE": self.process_instance_op,
174
                            "NETWORK": self.process_network_op}
175
                            # "GROUP": self.process_group_op}
176

    
177
    def process_IN_CLOSE_WRITE(self, event):
178
        self.process_IN_MOVED_TO(event)
179

    
180
    def process_IN_MOVED_TO(self, event):
181
        jobfile = os.path.join(event.path, event.name)
182
        if not event.name.startswith("job-"):
183
            self.logger.debug("Not a job file: %s" % event.path)
184
            return
185

    
186
        try:
187
            data = utils.ReadFile(jobfile)
188
        except IOError:
189
            return
190

    
191
        data = serializer.LoadJson(data)
192
        try:  # Compatibility with Ganeti version
193
            job = jqueue._QueuedJob.Restore(None, data, False)
194
        except TypeError:
195
            job = jqueue._QueuedJob.Restore(None, data)
196

    
197
        job_id = int(job.id)
198

    
199
        for op in job.ops:
200
            op_id = op.input.OP_ID
201

    
202
            msg = None
203
            try:
204
                handler_fn = self.op_handlers[op_id.split('_')[1]]
205
                msg, routekey = handler_fn(op, job_id)
206
            except KeyError:
207
                pass
208

    
209
            if not msg:
210
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
211
                continue
212

    
213
            # Generate a unique message identifier
214
            event_time = get_time_from_status(op, job)
215

    
216
            # Get the last line of the op log as message
217
            try:
218
                logmsg = op.log[-1][-1]
219
            except IndexError:
220
                logmsg = None
221

    
222
            # Add shared attributes for all operations
223
            msg.update({"event_time": event_time,
224
                        "operation": op_id,
225
                        "status": op.status,
226
                        "cluster": self.cluster_name,
227
                        "logmsg": logmsg,
228
                        "jobId": job_id})
229

    
230
            if op_id in ["OP_INSTANCE_CREATE", "OP_INSTANCE_SET_PARAMS",
231
                         "OP_INSTANCE_STARTUP"]:
232
                if op.status == "success":
233
                    nics = get_instance_nics(msg["instance"], self.logger)
234
                    msg["nics"] = nics
235

    
236
            msg = json.dumps(msg)
237
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
238

    
239
            # Send the message to RabbitMQ
240
            self.client.basic_publish(settings.EXCHANGE_GANETI,
241
                                      routekey,
242
                                      msg)
243

    
244
    def process_instance_op(self, op, job_id):
245
        """ Process OP_INSTANCE_* opcodes.
246

247
        """
248
        input = op.input
249
        op_id = input.OP_ID
250

    
251
        instances = None
252
        instances = get_field(input, 'instance_name')
253
        if not instances:
254
            instances = get_field(input, 'instances')
255
            if not instances or len(instances) > 1:
256
                # Do not publish messages for jobs with no or multiple
257
                # instances.
258
                # Currently snf-dispatcher can not normally handle these messages
259
                return None, None
260
            else:
261
                instances = instances[0]
262

    
263
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
264
                          instances, op.status)
265

    
266
        msg = {"type": "ganeti-op-status",
267
               "instance": instances,
268
               "operation": op_id}
269

    
270
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
271

    
272
        return msg, routekey
273

    
274
    def process_network_op(self, op, job_id):
275
        """ Process OP_NETWORK_* opcodes.
276

277
        """
278

    
279
        input = op.input
280
        op_id = input.OP_ID
281
        network_name = get_field(input, 'network_name')
282

    
283
        if not network_name:
284
            return None, None
285

    
286
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
287
                          network_name, op.status)
288

    
289
        msg = {'operation':    op_id,
290
               'type':         "ganeti-network-status",
291
               'network':      network_name,
292
               'subnet':       get_field(input, 'network'),
293
               # 'network_mode': get_field(input, 'network_mode'),
294
               # 'network_link': get_field(input, 'network_link'),
295
               'gateway':      get_field(input, 'gateway'),
296
               'group_name':   get_field(input, 'group_name')}
297

    
298
        if op_id == "OP_NETWORK_SET_PARAMS":
299
            msg.update(
300
                {'add_reserved_ips':    get_field(input, 'add_reserved_ips'),
301
                 'remove_reserved_ips': get_field(input, 'remove_reserved_ips')
302
                })
303

    
304
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
305

    
306
        return msg, routekey
307

    
308

    
309
    # def process_group_op(self, op, job_id):
310
    #     """ Process OP_GROUP_* opcodes.
311

    
312
    #     """
313
    #     return None, None
314

    
315

    
316

    
317
def find_cluster_name():
318
    global handler_logger
319
    try:
320
        scr = SimpleConfigReader()
321
        name = scr.GetClusterName()
322
    except Exception as e:
323
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
324
        raise e
325

    
326
    return name
327

    
328
handler_logger = None
329
def fatal_signal_handler(signum, frame):
330
    global handler_logger
331

    
332
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
333
                        signum)
334
    raise SystemExit
335

    
336
def parse_arguments(args):
337
    from optparse import OptionParser
338

    
339
    parser = OptionParser()
340
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
341
                      help="Enable debugging information")
342
    parser.add_option("-l", "--log", dest="log_file",
343
                      default="/var/log/snf-ganeti-eventd.log",
344
                      metavar="FILE",
345
                      help="Write log to FILE instead of %s" %
346
                          "/var/log/snf-ganeti-eventd.log")
347
    parser.add_option('--pid-file', dest="pid_file",
348
                      default="/var/run/snf-ganeti-eventd.pid",
349
                      metavar='PIDFILE',
350
                      help="Save PID to file (default: %s)" %
351
                          "/var/run/snf-ganeti-eventd.pid")
352

    
353
    return parser.parse_args(args)
354

    
355
def main():
356
    global handler_logger
357

    
358
    (opts, args) = parse_arguments(sys.argv[1:])
359

    
360

    
361
    # Initialize logger
362
    lvl = logging.DEBUG if opts.debug else logging.INFO
363
    logger = logging.getLogger("ganeti.eventd")
364
    logger.setLevel(lvl)
365
    formatter = logging.Formatter(
366
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
367
        "%Y-%m-%d %H:%M:%S")
368
    handler = logging.FileHandler(opts.log_file)
369
    handler.setFormatter(formatter)
370
    logger.addHandler(handler)
371
    handler_logger = logger
372

    
373
    # Rename this process so 'ps' output looks like this is a native
374
    # executable.  Can not seperate command-line arguments from actual name of
375
    # the executable by NUL bytes, so only show the name of the executable
376
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
377
    setproctitle.setproctitle(sys.argv[0])
378

    
379
    # Create pidfile
380
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
381

    
382
    # Remove any stale PID files, left behind by previous invocations
383
    if daemon.runner.is_pidfile_stale(pidf):
384
        logger.warning("Removing stale PID lock file %s", pidf.path)
385
        pidf.break_lock()
386

    
387
    # Become a daemon:
388
    # Redirect stdout and stderr to handler.stream to catch
389
    # early errors in the daemonization process [e.g., pidfile creation]
390
    # which will otherwise go to /dev/null.
391
    daemon_context = daemon.DaemonContext(
392
            pidfile=pidf,
393
            umask=022,
394
            stdout=handler.stream,
395
            stderr=handler.stream,
396
            files_preserve=[handler.stream])
397
    try:
398
        daemon_context.open()
399
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
400
        logger.critical("Failed to lock pidfile %s, another instance running?",
401
                        pidf.path)
402
        sys.exit(1)
403

    
404
    logger.info("Became a daemon")
405

    
406
    # Catch signals to ensure graceful shutdown
407
    signal(SIGINT, fatal_signal_handler)
408
    signal(SIGTERM, fatal_signal_handler)
409

    
410
    # Monitor the Ganeti job queue, create and push notifications
411
    wm = pyinotify.WatchManager()
412
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
413
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
414

    
415
    cluster_name = find_cluster_name()
416

    
417
    handler = JobFileHandler(logger, cluster_name)
418
    notifier = pyinotify.Notifier(wm, handler)
419

    
420
    try:
421
        # Fail if adding the inotify() watch fails for any reason
422
        res = wm.add_watch(constants.QUEUE_DIR, mask)
423
        if res[constants.QUEUE_DIR] < 0:
424
            raise Exception("pyinotify add_watch returned negative descriptor")
425

    
426
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
427
                cluster_name))
428

    
429
        while True:    # loop forever
430
            # process the queue of events as explained above
431
            notifier.process_events()
432
            if notifier.check_events():
433
                # read notified events and enqeue them
434
                notifier.read_events()
435
    except SystemExit:
436
        logger.info("SystemExit")
437
    except:
438
        logger.exception("Caught exception, terminating")
439
    finally:
440
        # destroy the inotify's instance on this interrupt (stop monitoring)
441
        notifier.stop()
442
        raise
443

    
444
if __name__ == "__main__":
445
    sys.exit(main())
446

    
447
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :