Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ e66a7e34

History | View | Annotate | Download (13 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57
import setproctitle
58

    
59
from ganeti import utils
60
from ganeti import jqueue
61
from ganeti import constants
62
from ganeti import serializer
63
from ganeti.ssconf import SimpleConfigReader
64

    
65

    
66
from synnefo import settings
67
from synnefo.lib.amqp import AMQPClient
68

    
69

    
70
def get_time_from_status(op, job):
71
    """Generate a unique message identifier for a ganeti job.
72

73
    The identifier is based on the timestamp of the job. Since a ganeti
74
    job passes from multiple states, we need to pick the timestamp that
75
    corresponds to each state.
76

77
    """
78
    status = op.status
79
    if status == constants.JOB_STATUS_QUEUED:
80
        time = job.received_timestamp
81
    try:  # Compatibility with Ganeti version
82
        if status == constants.JOB_STATUS_WAITLOCK:
83
            time = op.start_timestamp
84
    except AttributeError:
85
        if status == constants.JOB_STATUS_WAITING:
86
            time = op.start_timestamp
87
    if status == constants.JOB_STATUS_CANCELING:
88
        time = op.start_timestamp
89
    if status == constants.JOB_STATUS_RUNNING:
90
        time = op.exec_timestamp
91
    if status in constants.JOBS_FINALIZED:
92
        time = op.end_timestamp
93

    
94
    return time and time or job.end_timestamp
95

    
96
    raise InvalidBackendStatus(status, job)
97

    
98

    
99
class InvalidBackendStatus(Exception):
100
    def __init__(self, status, job):
101
        self.status = status
102
        self.job = job
103

    
104
    def __str__(self):
105
        return repr("Invalid backend status: %s in job %s"
106
                    % (self.status, self.job))
107

    
108

    
109
def prefix_from_name(name):
110
    return name.split('-')[0]
111

    
112

    
113
def get_field(from_, field):
114
    try:
115
        return getattr(from_, field)
116
    except AttributeError:
117
        None
118

    
119

    
120
class JobFileHandler(pyinotify.ProcessEvent):
121
    def __init__(self, logger, cluster_name):
122
        pyinotify.ProcessEvent.__init__(self)
123
        self.logger = logger
124
        self.cluster_name = cluster_name
125

    
126
        # Set max_retries to 0 for unlimited retries.
127
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
128
                                 max_retries=0)
129

    
130
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
131

    
132
        self.client.connect()
133
        handler_logger.info("Connected succesfully")
134

    
135
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
136

    
137
        self.op_handlers = {"INSTANCE": self.process_instance_op,
138
                            "NETWORK": self.process_network_op}
139
                            # "GROUP": self.process_group_op}
140

    
141
    def process_IN_CLOSE_WRITE(self, event):
142
        self.process_IN_MOVED_TO(event)
143

    
144
    def process_IN_MOVED_TO(self, event):
145
        jobfile = os.path.join(event.path, event.name)
146
        if not event.name.startswith("job-"):
147
            self.logger.debug("Not a job file: %s" % event.path)
148
            return
149

    
150
        try:
151
            data = utils.ReadFile(jobfile)
152
        except IOError:
153
            return
154

    
155
        data = serializer.LoadJson(data)
156
        try:  # Compatibility with Ganeti version
157
            job = jqueue._QueuedJob.Restore(None, data, False)
158
        except TypeError:
159
            job = jqueue._QueuedJob.Restore(None, data)
160

    
161
        job_id = int(job.id)
162

    
163
        for op in job.ops:
164
            op_id = op.input.OP_ID
165

    
166
            msg = None
167
            try:
168
                handler_fn = self.op_handlers[op_id.split('_')[1]]
169
                msg, routekey = handler_fn(op, job_id)
170
            except KeyError:
171
                pass
172

    
173
            if not msg:
174
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
175
                continue
176

    
177
            # Generate a unique message identifier
178
            event_time = get_time_from_status(op, job)
179

    
180
            # Get the last line of the op log as message
181
            try:
182
                logmsg = op.log[-1][-1]
183
            except IndexError:
184
                logmsg = None
185

    
186
            # Add shared attributes for all operations
187
            msg.update({"event_time": event_time,
188
                        "operation": op_id,
189
                        "status": op.status,
190
                        "cluster": self.cluster_name,
191
                        "logmsg": logmsg,
192
                        "jobId": job_id})
193

    
194
            msg = json.dumps(msg)
195
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
196

    
197
            # Send the message to RabbitMQ
198
            self.client.basic_publish(settings.EXCHANGE_GANETI,
199
                                      routekey,
200
                                      msg)
201

    
202
    def process_instance_op(self, op, job_id):
203
        """ Process OP_INSTANCE_* opcodes.
204

205
        """
206
        input = op.input
207
        op_id = input.OP_ID
208

    
209
        instances = None
210
        instances = get_field(input, 'instance_name')
211
        if not instances:
212
            instances = get_field(input, 'instances')
213
            if not instances or len(instances) > 1:
214
                # Do not publish messages for jobs with no or multiple
215
                # instances.
216
                # Currently snf-dispatcher can not normally handle these messages
217
                return None, None
218
            else:
219
                instances = instances[0]
220

    
221
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
222
                          instances, op.status)
223

    
224
        msg = {"type": "ganeti-op-status",
225
               "instance": instances,
226
               "operation": op_id}
227

    
228
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
229

    
230
        return msg, routekey
231

    
232
    def process_network_op(self, op, job_id):
233
        """ Process OP_NETWORK_* opcodes.
234

235
        """
236

    
237
        input = op.input
238
        op_id = input.OP_ID
239
        network_name = get_field(input, 'network_name')
240

    
241
        if not network_name:
242
            return None, None
243

    
244
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
245
                          network_name, op.status)
246

    
247
        msg = {'operation':    op_id,
248
               'type':         "ganeti-network-status",
249
               'network':      network_name,
250
               'subnet':       get_field(input, 'network'),
251
               # 'network_mode': get_field(input, 'network_mode'),
252
               # 'network_link': get_field(input, 'network_link'),
253
               'gateway':      get_field(input, 'gateway'),
254
               # 'reserved_ips': get_field(input, 'reserved_ips'),
255
               'group_name':   get_field(input, 'group_name')}
256

    
257
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
258

    
259
        return msg, routekey
260

    
261

    
262
    # def process_group_op(self, op, job_id):
263
    #     """ Process OP_GROUP_* opcodes.
264

    
265
    #     """
266
    #     return None, None
267

    
268

    
269

    
270
def find_cluster_name():
271
    global handler_logger
272
    try:
273
        scr = SimpleConfigReader()
274
        name = scr.GetClusterName()
275
    except Exception as e:
276
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
277
        raise e
278

    
279
    return name
280

    
281
handler_logger = None
282
def fatal_signal_handler(signum, frame):
283
    global handler_logger
284

    
285
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
286
                        signum)
287
    raise SystemExit
288

    
289
def parse_arguments(args):
290
    from optparse import OptionParser
291

    
292
    parser = OptionParser()
293
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
294
                      help="Enable debugging information")
295
    parser.add_option("-l", "--log", dest="log_file",
296
                      default="/var/log/snf-ganeti-eventd.log",
297
                      metavar="FILE",
298
                      help="Write log to FILE instead of %s" %
299
                          "/var/log/snf-ganeti-eventd.log")
300
    parser.add_option('--pid-file', dest="pid_file",
301
                      default="/var/run/snf-ganeti-eventd.pid",
302
                      metavar='PIDFILE',
303
                      help="Save PID to file (default: %s)" %
304
                          "/var/run/snf-ganeti-eventd.pid")
305

    
306
    return parser.parse_args(args)
307

    
308
def main():
309
    global handler_logger
310

    
311
    (opts, args) = parse_arguments(sys.argv[1:])
312

    
313

    
314
    # Initialize logger
315
    lvl = logging.DEBUG if opts.debug else logging.INFO
316
    logger = logging.getLogger("ganeti.eventd")
317
    logger.setLevel(lvl)
318
    formatter = logging.Formatter(
319
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
320
        "%Y-%m-%d %H:%M:%S")
321
    handler = logging.FileHandler(opts.log_file)
322
    handler.setFormatter(formatter)
323
    logger.addHandler(handler)
324
    handler_logger = logger
325

    
326
    # Rename this process so 'ps' output looks like this is a native
327
    # executable.  Can not seperate command-line arguments from actual name of
328
    # the executable by NUL bytes, so only show the name of the executable
329
    # instead.  setproctitle.setproctitle("\x00".join(sys.argv))
330
    setproctitle.setproctitle(sys.argv[0])
331

    
332
    # Create pidfile
333
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
334

    
335
    # Remove any stale PID files, left behind by previous invocations
336
    if daemon.runner.is_pidfile_stale(pidf):
337
        logger.warning("Removing stale PID lock file %s", pidf.path)
338
        pidf.break_lock()
339

    
340
    # Become a daemon:
341
    # Redirect stdout and stderr to handler.stream to catch
342
    # early errors in the daemonization process [e.g., pidfile creation]
343
    # which will otherwise go to /dev/null.
344
    daemon_context = daemon.DaemonContext(
345
            pidfile=pidf,
346
            umask=022,
347
            stdout=handler.stream,
348
            stderr=handler.stream,
349
            files_preserve=[handler.stream])
350
    try:
351
        daemon_context.open()
352
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
353
        logger.critical("Failed to lock pidfile %s, another instance running?",
354
                        pidf.path)
355
        sys.exit(1)
356

    
357
    logger.info("Became a daemon")
358

    
359
    # Catch signals to ensure graceful shutdown
360
    signal(SIGINT, fatal_signal_handler)
361
    signal(SIGTERM, fatal_signal_handler)
362

    
363
    # Monitor the Ganeti job queue, create and push notifications
364
    wm = pyinotify.WatchManager()
365
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
366
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
367

    
368
    cluster_name = find_cluster_name()
369

    
370
    handler = JobFileHandler(logger, cluster_name)
371
    notifier = pyinotify.Notifier(wm, handler)
372

    
373
    try:
374
        # Fail if adding the inotify() watch fails for any reason
375
        res = wm.add_watch(constants.QUEUE_DIR, mask)
376
        if res[constants.QUEUE_DIR] < 0:
377
            raise Exception("pyinotify add_watch returned negative descriptor")
378

    
379
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
380
                cluster_name))
381

    
382
        while True:    # loop forever
383
            # process the queue of events as explained above
384
            notifier.process_events()
385
            if notifier.check_events():
386
                # read notified events and enqeue them
387
                notifier.read_events()
388
    except SystemExit:
389
        logger.info("SystemExit")
390
    except:
391
        logger.exception("Caught exception, terminating")
392
    finally:
393
        # destroy the inotify's instance on this interrupt (stop monitoring)
394
        notifier.stop()
395
        raise
396

    
397
if __name__ == "__main__":
398
    sys.exit(main())
399

    
400
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :