Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ fd2bdbb2

History | View | Annotate | Download (13.1 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils
60
from ganeti import jqueue
61
from ganeti import constants
62
from ganeti import serializer
63
from ganeti.ssconf import SimpleConfigReader
64

    
65

    
66
from synnefo import settings
67
from synnefo.lib.amqp import AMQPClient
68

    
69

    
70
def get_time_from_status(op, job):
71
    """Generate a unique message identifier for a ganeti job.
72

73
    The identifier is based on the timestamp of the job. Since a ganeti
74
    job passes from multiple states, we need to pick the timestamp that
75
    corresponds to each state.
76

77
    """
78
    status = op.status
79
    if status == constants.JOB_STATUS_QUEUED:
80
        time = job.received_timestamp
81
    try:  # Compatibility with Ganeti version
82
        if status == constants.JOB_STATUS_WAITLOCK:
83
            time = op.start_timestamp
84
    except AttributeError:
85
        if status == constants.JOB_STATUS_WAITING:
86
            time = op.start_timestamp
87
    if status == constants.JOB_STATUS_CANCELING:
88
        time = op.start_timestamp
89
    if status == constants.JOB_STATUS_RUNNING:
90
        time = op.exec_timestamp
91
    if status in constants.JOBS_FINALIZED:
92
        time = op.end_timestamp
93

    
94
    return time and time or job.end_timestamp
95

    
96
    raise InvalidBackendStatus(status, job)
97

    
98

    
99
class InvalidBackendStatus(Exception):
100
    def __init__(self, status, job):
101
        self.status = status
102
        self.job = job
103

    
104
    def __str__(self):
105
        return repr("Invalid backend status: %s in job %s"
106
                    % (self.status, self.job))
107

    
108

    
109
def prefix_from_name(name):
110
    return name.split('-')[0]
111

    
112

    
113
def get_field(from_, field):
114
    try:
115
        return getattr(from_, field)
116
    except AttributeError:
117
        None
118

    
119

    
120
class JobFileHandler(pyinotify.ProcessEvent):
121
    def __init__(self, logger, cluster_name):
122
        pyinotify.ProcessEvent.__init__(self)
123
        self.logger = logger
124
        self.cluster_name = cluster_name
125

    
126
        # Set max_retries to 0 for unlimited retries.
127
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
128
                                 max_retries=0)
129

    
130
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
131

    
132
        self.client.connect()
133
        handler_logger.info("Connected succesfully")
134

    
135
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
136

    
137
        self.op_handlers = {"INSTANCE": self.process_instance_op,
138
                            "NETWORK": self.process_network_op}
139
                            # "GROUP": self.process_group_op}
140

    
141
    def process_IN_CLOSE_WRITE(self, event):
142
        self.process_IN_MOVED_TO(event)
143

    
144
    def process_IN_MOVED_TO(self, event):
145
        jobfile = os.path.join(event.path, event.name)
146
        if not event.name.startswith("job-"):
147
            self.logger.debug("Not a job file: %s" % event.path)
148
            return
149

    
150
        try:
151
            data = utils.ReadFile(jobfile)
152
        except IOError:
153
            return
154

    
155
        data = serializer.LoadJson(data)
156
        try:  # Compatibility with Ganeti version
157
            job = jqueue._QueuedJob.Restore(None, data, False)
158
        except TypeError:
159
            job = jqueue._QueuedJob.Restore(None, data)
160

    
161
        job_id = int(job.id)
162

    
163
        for op in job.ops:
164
            op_id = op.input.OP_ID
165

    
166
            msg = None
167
            try:
168
                handler_fn = self.op_handlers[op_id.split('_')[1]]
169
                msg, routekey = handler_fn(op, job_id)
170
            except KeyError:
171
                pass
172

    
173
            if not msg:
174
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
175
                continue
176

    
177
            # Generate a unique message identifier
178
            event_time = get_time_from_status(op, job)
179

    
180
            # Get the last line of the op log as message
181
            try:
182
                logmsg = op.log[-1][-1]
183
            except IndexError:
184
                logmsg = None
185

    
186
            # Add shared attributes for all operations
187
            msg.update({"event_time": event_time,
188
                        "operation": op_id,
189
                        "status": op.status,
190
                        "cluster": self.cluster_name,
191
                        "logmsg": logmsg,
192
                        "jobId": job_id})
193

    
194
            msg = json.dumps(msg)
195
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
196

    
197
            # Send the message to RabbitMQ
198
            self.client.basic_publish(settings.EXCHANGE_GANETI,
199
                                      routekey,
200
                                      msg)
201

    
202
    def process_instance_op(self, op, job_id):
203
        """ Process OP_INSTANCE_* opcodes.
204

205
        """
206
        input = op.input
207
        op_id = input.OP_ID
208

    
209
        instances = None
210
        instances = get_field(input, 'instance_name')
211
        if not instances:
212
            instances = get_field(input, 'instances')
213
            if not instances or len(instances) > 1:
214
                # Do not publish messages for jobs with no or multiple
215
                # instances.
216
                # Currently snf-dispatcher can not normally handle these messages
217
                return None, None
218
            else:
219
                instances = instances[0]
220

    
221
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
222
                          instances, op.status)
223

    
224
        msg = {"type": "ganeti-op-status",
225
               "instance": instances,
226
               "operation": op_id}
227

    
228
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
229

    
230
        return msg, routekey
231

    
232
    def process_network_op(self, op, job_id):
233
        """ Process OP_NETWORK_* opcodes.
234

235
        """
236

    
237
        input = op.input
238
        op_id = input.OP_ID
239
        network_name = get_field(input, 'network_name')
240

    
241
        if not network_name:
242
            return None, None
243

    
244
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
245
                          network_name, op.status)
246

    
247
        msg = {'operation':    op_id,
248
               'type':         "ganeti-network-status",
249
               'network':      network_name,
250
               'subnet':       get_field(input, 'network'),
251
               # 'network_mode': get_field(input, 'network_mode'),
252
               # 'network_link': get_field(input, 'network_link'),
253
               'gateway':      get_field(input, 'gateway'),
254
               'group_name':   get_field(input, 'group_name')}
255

    
256
        if op_id == "OP_NETWORK_SET_PARAMS":
257
            msg.update(
258
                {'add_reserved_ips':    get_field(input, 'add_reserved_ips'),
259
                 'remove_reserved_ips': get_field(input, 'remove_reserved_ips')
260
                })
261

    
262
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
263

    
264
        return msg, routekey
265

    
266

    
267
    # def process_group_op(self, op, job_id):
268
    #     """ Process OP_GROUP_* opcodes.
269

    
270
    #     """
271
    #     return None, None
272

    
273

    
274

    
275
def find_cluster_name():
276
    global handler_logger
277
    try:
278
        scr = SimpleConfigReader()
279
        name = scr.GetClusterName()
280
    except Exception as e:
281
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
282
        raise e
283

    
284
    return name
285

    
286
handler_logger = None
287
def fatal_signal_handler(signum, frame):
288
    global handler_logger
289

    
290
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
291
                        signum)
292
    raise SystemExit
293

    
294
def parse_arguments(args):
295
    from optparse import OptionParser
296

    
297
    parser = OptionParser()
298
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
299
                      help="Enable debugging information")
300
    parser.add_option("-l", "--log", dest="log_file",
301
                      default="/var/log/snf-ganeti-eventd.log",
302
                      metavar="FILE",
303
                      help="Write log to FILE instead of %s" %
304
                          "/var/log/snf-ganeti-eventd.log")
305
    parser.add_option('--pid-file', dest="pid_file",
306
                      default="/var/run/snf-ganeti-eventd.pid",
307
                      metavar='PIDFILE',
308
                      help="Save PID to file (default: %s)" %
309
                          "/var/run/snf-ganeti-eventd.pid")
310

    
311
    return parser.parse_args(args)
312

    
313
def main():
314
    global handler_logger
315

    
316
    (opts, args) = parse_arguments(sys.argv[1:])
317

    
318

    
319
    # Initialize logger
320
    lvl = logging.DEBUG if opts.debug else logging.INFO
321
    logger = logging.getLogger("ganeti.eventd")
322
    logger.setLevel(lvl)
323
    formatter = logging.Formatter(
324
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
325
        "%Y-%m-%d %H:%M:%S")
326
    handler = logging.FileHandler(opts.log_file)
327
    handler.setFormatter(formatter)
328
    logger.addHandler(handler)
329
    handler_logger = logger
330

    
331
    # Rename this process so 'ps' output looks like this is a native
332
    # executable.  Can not seperate command-line arguments from actual name of
333
    # the executable by NUL bytes, so only show the name of the executable
334
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
335
    setproctitle.setproctitle(sys.argv[0])
336

    
337
    # Create pidfile
338
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
339

    
340
    # Remove any stale PID files, left behind by previous invocations
341
    if daemon.runner.is_pidfile_stale(pidf):
342
        logger.warning("Removing stale PID lock file %s", pidf.path)
343
        pidf.break_lock()
344

    
345
    # Become a daemon:
346
    # Redirect stdout and stderr to handler.stream to catch
347
    # early errors in the daemonization process [e.g., pidfile creation]
348
    # which will otherwise go to /dev/null.
349
    daemon_context = daemon.DaemonContext(
350
            pidfile=pidf,
351
            umask=022,
352
            stdout=handler.stream,
353
            stderr=handler.stream,
354
            files_preserve=[handler.stream])
355
    try:
356
        daemon_context.open()
357
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
358
        logger.critical("Failed to lock pidfile %s, another instance running?",
359
                        pidf.path)
360
        sys.exit(1)
361

    
362
    logger.info("Became a daemon")
363

    
364
    # Catch signals to ensure graceful shutdown
365
    signal(SIGINT, fatal_signal_handler)
366
    signal(SIGTERM, fatal_signal_handler)
367

    
368
    # Monitor the Ganeti job queue, create and push notifications
369
    wm = pyinotify.WatchManager()
370
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
371
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
372

    
373
    cluster_name = find_cluster_name()
374

    
375
    handler = JobFileHandler(logger, cluster_name)
376
    notifier = pyinotify.Notifier(wm, handler)
377

    
378
    try:
379
        # Fail if adding the inotify() watch fails for any reason
380
        res = wm.add_watch(constants.QUEUE_DIR, mask)
381
        if res[constants.QUEUE_DIR] < 0:
382
            raise Exception("pyinotify add_watch returned negative descriptor")
383

    
384
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
385
                cluster_name))
386

    
387
        while True:    # loop forever
388
            # process the queue of events as explained above
389
            notifier.process_events()
390
            if notifier.check_events():
391
                # read notified events and enqeue them
392
                notifier.read_events()
393
    except SystemExit:
394
        logger.info("SystemExit")
395
    except:
396
        logger.exception("Caught exception, terminating")
397
    finally:
398
        # destroy the inotify's instance on this interrupt (stop monitoring)
399
        notifier.stop()
400
        raise
401

    
402
if __name__ == "__main__":
403
    sys.exit(main())
404

    
405
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :