Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ a8858945

History | View | Annotate | Download (13.2 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils
60
from ganeti import jqueue
61
from ganeti import constants
62
from ganeti import serializer
63
from ganeti.ssconf import SimpleConfigReader
64

    
65

    
66
from synnefo import settings
67
from synnefo.lib.amqp import AMQPClient
68

    
69

    
70

    
71
def get_time_from_status(op, job):
72
    """Generate a unique message identifier for a ganeti job.
73

74
    The identifier is based on the timestamp of the job. Since a ganeti
75
    job passes from multiple states, we need to pick the timestamp that
76
    corresponds to each state.
77

78
    """
79
    status = op.status
80
    if status == constants.JOB_STATUS_QUEUED:
81
        time = job.received_timestamp
82
    try:  # Compatibility with Ganeti version
83
        if status == constants.JOB_STATUS_WAITLOCK:
84
            time = op.start_timestamp
85
    except AttributeError:
86
        if status == constants.JOB_STATUS_WAITING:
87
            time = op.start_timestamp
88
    if status == constants.JOB_STATUS_CANCELING:
89
        time = op.start_timestamp
90
    if status == constants.JOB_STATUS_RUNNING:
91
        time = op.exec_timestamp
92
    if status in constants.JOBS_FINALIZED:
93
        time = op.end_timestamp
94

    
95
    return time and time or job.end_timestamp
96

    
97
    raise InvalidBackendStatus(status, job)
98

    
99

    
100
class InvalidBackendStatus(Exception):
101
    def __init__(self, status, job):
102
        self.status = status
103
        self.job = job
104

    
105
    def __str__(self):
106
        return repr("Invalid backend status: %s in job %s"
107
                    % (self.status, self.job))
108

    
109

    
110
def prefix_from_name(name):
111
    return name.split('-')[0]
112

    
113

    
114
def get_field(from_, field):
115
    try:
116
        return getattr(from_, field)
117
    except AttributeError:
118
        None
119

    
120

    
121
class JobFileHandler(pyinotify.ProcessEvent):
122
    def __init__(self, logger, cluster_name):
123
        pyinotify.ProcessEvent.__init__(self)
124
        self.logger = logger
125
        self.cluster_name = cluster_name
126

    
127
        # Set max_retries to 0 for unlimited retries.
128
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
129
                                 max_retries=0, logger=logger)
130

    
131
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
132

    
133
        self.client.connect()
134
        handler_logger.info("Connected succesfully")
135

    
136
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
137

    
138
        self.op_handlers = {"INSTANCE": self.process_instance_op,
139
                            "NETWORK": self.process_network_op}
140
                            # "GROUP": self.process_group_op}
141

    
142
    def process_IN_CLOSE_WRITE(self, event):
143
        self.process_IN_MOVED_TO(event)
144

    
145
    def process_IN_MOVED_TO(self, event):
146
        jobfile = os.path.join(event.path, event.name)
147
        if not event.name.startswith("job-"):
148
            self.logger.debug("Not a job file: %s" % event.path)
149
            return
150

    
151
        try:
152
            data = utils.ReadFile(jobfile)
153
        except IOError:
154
            return
155

    
156
        data = serializer.LoadJson(data)
157
        try:  # Compatibility with Ganeti version
158
            job = jqueue._QueuedJob.Restore(None, data, False)
159
        except TypeError:
160
            job = jqueue._QueuedJob.Restore(None, data)
161

    
162
        job_id = int(job.id)
163

    
164
        for op in job.ops:
165
            op_id = op.input.OP_ID
166

    
167
            msg = None
168
            try:
169
                handler_fn = self.op_handlers[op_id.split('_')[1]]
170
                msg, routekey = handler_fn(op, job_id)
171
            except KeyError:
172
                pass
173

    
174
            if not msg:
175
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
176
                continue
177

    
178
            # Generate a unique message identifier
179
            event_time = get_time_from_status(op, job)
180

    
181
            # Get the last line of the op log as message
182
            try:
183
                logmsg = op.log[-1][-1]
184
            except IndexError:
185
                logmsg = None
186

    
187
            # Add shared attributes for all operations
188
            msg.update({"event_time": event_time,
189
                        "operation": op_id,
190
                        "status": op.status,
191
                        "cluster": self.cluster_name,
192
                        "logmsg": logmsg,
193
                        "jobId": job_id})
194

    
195
            msg = json.dumps(msg)
196
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
197

    
198
            # Send the message to RabbitMQ
199
            self.client.basic_publish(settings.EXCHANGE_GANETI,
200
                                      routekey,
201
                                      msg)
202

    
203
    def process_instance_op(self, op, job_id):
204
        """ Process OP_INSTANCE_* opcodes.
205

206
        """
207
        input = op.input
208
        op_id = input.OP_ID
209

    
210
        instances = None
211
        instances = get_field(input, 'instance_name')
212
        if not instances:
213
            instances = get_field(input, 'instances')
214
            if not instances or len(instances) > 1:
215
                # Do not publish messages for jobs with no or multiple
216
                # instances.
217
                # Currently snf-dispatcher can not normally handle these messages
218
                return None, None
219
            else:
220
                instances = instances[0]
221

    
222
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
223
                          instances, op.status)
224

    
225
        msg = {"type": "ganeti-op-status",
226
               "instance": instances,
227
               "operation": op_id}
228

    
229
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
230

    
231
        return msg, routekey
232

    
233
    def process_network_op(self, op, job_id):
234
        """ Process OP_NETWORK_* opcodes.
235

236
        """
237

    
238
        input = op.input
239
        op_id = input.OP_ID
240
        network_name = get_field(input, 'network_name')
241

    
242
        if not network_name:
243
            return None, None
244

    
245
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
246
                          network_name, op.status)
247

    
248
        msg = {'operation':    op_id,
249
               'type':         "ganeti-network-status",
250
               'network':      network_name,
251
               'subnet':       get_field(input, 'network'),
252
               # 'network_mode': get_field(input, 'network_mode'),
253
               # 'network_link': get_field(input, 'network_link'),
254
               'gateway':      get_field(input, 'gateway'),
255
               'group_name':   get_field(input, 'group_name')}
256

    
257
        if op_id == "OP_NETWORK_SET_PARAMS":
258
            msg.update(
259
                {'add_reserved_ips':    get_field(input, 'add_reserved_ips'),
260
                 'remove_reserved_ips': get_field(input, 'remove_reserved_ips')
261
                })
262

    
263
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
264

    
265
        return msg, routekey
266

    
267

    
268
    # def process_group_op(self, op, job_id):
269
    #     """ Process OP_GROUP_* opcodes.
270

    
271
    #     """
272
    #     return None, None
273

    
274

    
275

    
276
def find_cluster_name():
277
    global handler_logger
278
    try:
279
        scr = SimpleConfigReader()
280
        name = scr.GetClusterName()
281
    except Exception as e:
282
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
283
        raise e
284

    
285
    return name
286

    
287
handler_logger = None
288
def fatal_signal_handler(signum, frame):
289
    global handler_logger
290

    
291
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
292
                        signum)
293
    raise SystemExit
294

    
295
def parse_arguments(args):
296
    from optparse import OptionParser
297

    
298
    parser = OptionParser()
299
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
300
                      help="Enable debugging information")
301
    parser.add_option("-l", "--log", dest="log_file",
302
                      default="/var/log/snf-ganeti-eventd.log",
303
                      metavar="FILE",
304
                      help="Write log to FILE instead of %s" %
305
                          "/var/log/snf-ganeti-eventd.log")
306
    parser.add_option('--pid-file', dest="pid_file",
307
                      default="/var/run/snf-ganeti-eventd.pid",
308
                      metavar='PIDFILE',
309
                      help="Save PID to file (default: %s)" %
310
                          "/var/run/snf-ganeti-eventd.pid")
311

    
312
    return parser.parse_args(args)
313

    
314
def main():
315
    global handler_logger
316

    
317
    (opts, args) = parse_arguments(sys.argv[1:])
318

    
319

    
320
    # Initialize logger
321
    lvl = logging.DEBUG if opts.debug else logging.INFO
322
    logger = logging.getLogger("ganeti.eventd")
323
    logger.setLevel(lvl)
324
    formatter = logging.Formatter(
325
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
326
        "%Y-%m-%d %H:%M:%S")
327
    handler = logging.FileHandler(opts.log_file)
328
    handler.setFormatter(formatter)
329
    logger.addHandler(handler)
330
    handler_logger = logger
331

    
332
    # Rename this process so 'ps' output looks like this is a native
333
    # executable.  Can not seperate command-line arguments from actual name of
334
    # the executable by NUL bytes, so only show the name of the executable
335
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
336
    setproctitle.setproctitle(sys.argv[0])
337

    
338
    # Create pidfile
339
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
340

    
341
    # Remove any stale PID files, left behind by previous invocations
342
    if daemon.runner.is_pidfile_stale(pidf):
343
        logger.warning("Removing stale PID lock file %s", pidf.path)
344
        pidf.break_lock()
345

    
346
    # Become a daemon:
347
    # Redirect stdout and stderr to handler.stream to catch
348
    # early errors in the daemonization process [e.g., pidfile creation]
349
    # which will otherwise go to /dev/null.
350
    daemon_context = daemon.DaemonContext(
351
            pidfile=pidf,
352
            umask=022,
353
            stdout=handler.stream,
354
            stderr=handler.stream,
355
            files_preserve=[handler.stream])
356
    try:
357
        daemon_context.open()
358
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
359
        logger.critical("Failed to lock pidfile %s, another instance running?",
360
                        pidf.path)
361
        sys.exit(1)
362

    
363
    logger.info("Became a daemon")
364

    
365
    # Catch signals to ensure graceful shutdown
366
    signal(SIGINT, fatal_signal_handler)
367
    signal(SIGTERM, fatal_signal_handler)
368

    
369
    # Monitor the Ganeti job queue, create and push notifications
370
    wm = pyinotify.WatchManager()
371
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
372
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
373

    
374
    cluster_name = find_cluster_name()
375

    
376
    handler = JobFileHandler(logger, cluster_name)
377
    notifier = pyinotify.Notifier(wm, handler)
378

    
379
    try:
380
        # Fail if adding the inotify() watch fails for any reason
381
        res = wm.add_watch(constants.QUEUE_DIR, mask)
382
        if res[constants.QUEUE_DIR] < 0:
383
            raise Exception("pyinotify add_watch returned negative descriptor")
384

    
385
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
386
                cluster_name))
387

    
388
        while True:    # loop forever
389
            # process the queue of events as explained above
390
            notifier.process_events()
391
            if notifier.check_events():
392
                # read notified events and enqeue them
393
                notifier.read_events()
394
    except SystemExit:
395
        logger.info("SystemExit")
396
    except:
397
        logger.exception("Caught exception, terminating")
398
    finally:
399
        # destroy the inotify's instance on this interrupt (stop monitoring)
400
        notifier.stop()
401
        raise
402

    
403
if __name__ == "__main__":
404
    sys.exit(main())
405

    
406
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :