Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 765ff3ff

History | View | Annotate | Download (12.8 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils
60
from ganeti import jqueue
61
from ganeti import constants
62
from ganeti import serializer
63
from ganeti.cli import GetClient
64

    
65
from synnefo import settings
66
from synnefo.lib.amqp import AMQPClient
67

    
68

    
69
def get_time_from_status(op, job):
70
    """Generate a unique message identifier for a ganeti job.
71

72
    The identifier is based on the timestamp of the job. Since a ganeti
73
    job passes from multiple states, we need to pick the timestamp that
74
    corresponds to each state.
75

76
    """
77
    status = op.status
78
    if status == constants.JOB_STATUS_QUEUED:
79
        time = job.received_timestamp
80
    try:  # Compatibility with Ganeti version
81
        if status == constants.JOB_STATUS_WAITLOCK:
82
            time = op.start_timestamp
83
    except AttributeError:
84
        if status == constants.JOB_STATUS_WAITING:
85
            time = op.start_timestamp
86
    if status == constants.JOB_STATUS_CANCELING:
87
        time = op.start_timestamp
88
    if status == constants.JOB_STATUS_RUNNING:
89
        time = op.exec_timestamp
90
    if status in constants.JOBS_FINALIZED:
91
        time = op.end_timestamp
92

    
93
    return time and time or job.end_timestamp
94

    
95
    raise InvalidBackendStatus(status, job)
96

    
97

    
98
class InvalidBackendStatus(Exception):
99
    def __init__(self, status, job):
100
        self.status = status
101
        self.job = job
102

    
103
    def __str__(self):
104
        return repr("Invalid backend status: %s in job %s"
105
                    % (self.status, self.job))
106

    
107

    
108
def prefix_from_name(name):
109
    return name.split('-')[0]
110

    
111

    
112
def get_field(from_, field):
113
    try:
114
        return getattr(from_, field)
115
    except AttributeError:
116
        None
117

    
118

    
119
class JobFileHandler(pyinotify.ProcessEvent):
120
    def __init__(self, logger, cluster_name):
121
        pyinotify.ProcessEvent.__init__(self)
122
        self.logger = logger
123
        self.cluster_name = cluster_name
124

    
125
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25)
126
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
127

    
128
        self.client.connect()
129
        handler_logger.info("Connected succesfully")
130

    
131
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
132

    
133
        self.op_handlers = {"INSTANCE": self.process_instance_op,
134
                            "NETWORK": self.process_network_op}
135
                            # "GROUP": self.process_group_op}
136

    
137
    def process_IN_CLOSE_WRITE(self, event):
138
        self.process_IN_MOVED_TO(event)
139

    
140
    def process_IN_MOVED_TO(self, event):
141
        jobfile = os.path.join(event.path, event.name)
142
        if not event.name.startswith("job-"):
143
            self.logger.debug("Not a job file: %s" % event.path)
144
            return
145

    
146
        try:
147
            data = utils.ReadFile(jobfile)
148
        except IOError:
149
            return
150

    
151
        data = serializer.LoadJson(data)
152
        try:  # Compatibility with Ganeti version
153
            job = jqueue._QueuedJob.Restore(None, data, False)
154
        except TypeError:
155
            job = jqueue._QueuedJob.Restore(None, data)
156

    
157
        job_id = int(job.id)
158

    
159
        for op in job.ops:
160
            op_id = op.input.OP_ID
161

    
162
            msg = None
163
            try:
164
                handler_fn = self.op_handlers[op_id.split('_')[1]]
165
                msg, routekey = handler_fn(op, job_id)
166
            except KeyError:
167
                pass
168

    
169
            if not msg:
170
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
171
                continue
172

    
173
            # Generate a unique message identifier
174
            event_time = get_time_from_status(op, job)
175

    
176
            # Get the last line of the op log as message
177
            try:
178
                logmsg = op.log[-1][-1]
179
            except IndexError:
180
                logmsg = None
181

    
182
            # Add shared attributes for all operations
183
            msg.update({"event_time": event_time,
184
                        "operation": op_id,
185
                        "status": op.status,
186
                        "cluster": self.cluster_name,
187
                        "logmsg": logmsg,
188
                        "jobId": job_id})
189

    
190
            msg = json.dumps(msg)
191
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
192

    
193
            # Send the message to RabbitMQ
194
            self.client.basic_publish(settings.EXCHANGE_GANETI,
195
                                      routekey,
196
                                      msg)
197

    
198
    def process_instance_op(self, op, job_id):
199
        """ Process OP_INSTANCE_* opcodes.
200

201
        """
202
        input = op.input
203
        op_id = input.OP_ID
204

    
205
        instances = None
206
        instances = get_field(input, 'instance_name')
207
        if not instances:
208
            instances = get_field(input, 'instances')
209
            if not instances or len(instances) > 1:
210
                # Do not publish messages for jobs with no or multiple
211
                # instances.
212
                # Currently snf-dispatcher can not normally handle these messages
213
                return None, None
214
            else:
215
                instances = instances[0]
216

    
217
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
218
                          instances, op.status)
219

    
220
        msg = {"type": "ganeti-op-status",
221
               "instance": instances,
222
               "operation": op_id}
223

    
224
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
225

    
226
        return msg, routekey
227

    
228
    def process_network_op(self, op, job_id):
229
        """ Process OP_NETWORK_* opcodes.
230

231
        """
232

    
233
        input = op.input
234
        op_id = input.OP_ID
235
        network_name = get_field(input, 'network_name')
236

    
237
        if not network_name:
238
            return None, None
239

    
240
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
241
                          network_name, op.status)
242

    
243
        msg = {'operation':    op_id,
244
               'type':         "ganeti-network-status",
245
               'network':      network_name,
246
               'subnet':       get_field(input, 'network'),
247
               # 'network_mode': get_field(input, 'network_mode'),
248
               # 'network_link': get_field(input, 'network_link'),
249
               'gateway':      get_field(input, 'gateway'),
250
               # 'reserved_ips': get_field(input, 'reserved_ips'),
251
               'group_name':   get_field(input, 'group_name')}
252

    
253
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
254

    
255
        return msg, routekey
256

    
257

    
258
    # def process_group_op(self, op, job_id):
259
    #     """ Process OP_GROUP_* opcodes.
260

    
261
    #     """
262
    #     return None, None
263

    
264

    
265

    
266
def find_cluster_name():
267
    global handler_logger
268
    try:
269
        cl = GetClient()
270
        name = cl.QueryClusterInfo()['name']
271
    except Exception as e:
272
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
273
        raise e
274

    
275
    return name
276

    
277
handler_logger = None
278
def fatal_signal_handler(signum, frame):
279
    global handler_logger
280

    
281
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
282
                        signum)
283
    raise SystemExit
284

    
285
def parse_arguments(args):
286
    from optparse import OptionParser
287

    
288
    parser = OptionParser()
289
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
290
                      help="Enable debugging information")
291
    parser.add_option("-l", "--log", dest="log_file",
292
                      default="/var/log/snf-ganeti-eventd.log",
293
                      metavar="FILE",
294
                      help="Write log to FILE instead of %s" %
295
                          "/var/log/snf-ganeti-eventd.log")
296
    parser.add_option('--pid-file', dest="pid_file",
297
                      default="/var/run/snf-ganeti-eventd.pid",
298
                      metavar='PIDFILE',
299
                      help="Save PID to file (default: %s)" %
300
                          "/var/run/snf-ganeti-eventd.pid")
301

    
302
    return parser.parse_args(args)
303

    
304
def main():
305
    global handler_logger
306

    
307
    (opts, args) = parse_arguments(sys.argv[1:])
308

    
309

    
310
    # Initialize logger
311
    lvl = logging.DEBUG if opts.debug else logging.INFO
312
    logger = logging.getLogger("ganeti.eventd")
313
    logger.setLevel(lvl)
314
    formatter = logging.Formatter(
315
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
316
        "%Y-%m-%d %H:%M:%S")
317
    handler = logging.FileHandler(opts.log_file)
318
    handler.setFormatter(formatter)
319
    logger.addHandler(handler)
320
    handler_logger = logger
321

    
322
    # Rename this process so 'ps' output looks like this is a native
323
    # executable.  Can not seperate command-line arguments from actual name of
324
    # the executable by NUL bytes, so only show the name of the executable
325
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
326
    setproctitle.setproctitle(sys.argv[0])
327

    
328
    # Create pidfile
329
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
330

    
331
    # Remove any stale PID files, left behind by previous invocations
332
    if daemon.runner.is_pidfile_stale(pidf):
333
        logger.warning("Removing stale PID lock file %s", pidf.path)
334
        pidf.break_lock()
335

    
336
    # Become a daemon:
337
    # Redirect stdout and stderr to handler.stream to catch
338
    # early errors in the daemonization process [e.g., pidfile creation]
339
    # which will otherwise go to /dev/null.
340
    daemon_context = daemon.DaemonContext(
341
            pidfile=pidf,
342
            umask=022,
343
            stdout=handler.stream,
344
            stderr=handler.stream,
345
            files_preserve=[handler.stream])
346
    try:
347
        daemon_context.open()
348
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
349
        logger.critical("Failed to lock pidfile %s, another instance running?",
350
                        pidf.path)
351
        sys.exit(1)
352

    
353
    logger.info("Became a daemon")
354

    
355
    # Catch signals to ensure graceful shutdown
356
    signal(SIGINT, fatal_signal_handler)
357
    signal(SIGTERM, fatal_signal_handler)
358

    
359
    # Monitor the Ganeti job queue, create and push notifications
360
    wm = pyinotify.WatchManager()
361
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
362
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
363

    
364
    cluster_name = find_cluster_name()
365

    
366
    handler = JobFileHandler(logger, cluster_name)
367
    notifier = pyinotify.Notifier(wm, handler)
368

    
369
    try:
370
        # Fail if adding the inotify() watch fails for any reason
371
        res = wm.add_watch(constants.QUEUE_DIR, mask)
372
        if res[constants.QUEUE_DIR] < 0:
373
            raise Exception("pyinotify add_watch returned negative descriptor")
374

    
375
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
376
                cluster_name))
377

    
378
        while True:    # loop forever
379
            # process the queue of events as explained above
380
            notifier.process_events()
381
            if notifier.check_events():
382
                # read notified events and enqeue them
383
                notifier.read_events()
384
    except SystemExit:
385
        logger.info("SystemExit")
386
    except:
387
        logger.exception("Caught exception, terminating")
388
    finally:
389
        # destroy the inotify's instance on this interrupt (stop monitoring)
390
        notifier.stop()
391
        raise
392

    
393
if __name__ == "__main__":
394
    sys.exit(main())
395

    
396
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :