Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 22ee6892

History | View | Annotate | Download (12.3 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

    
58
from ganeti import utils
59
from ganeti import jqueue
60
from ganeti import constants
61
from ganeti import serializer
62

    
63
from synnefo import settings
64
from synnefo.lib.amqp import AMQPClient
65

    
66

    
67
def get_time_from_status(op, job):
68
    """Generate a unique message identifier for a ganeti job.
69

70
    The identifier is based on the timestamp of the job. Since a ganeti
71
    job passes from multiple states, we need to pick the timestamp that
72
    corresponds to each state.
73

74
    """
75
    status = op.status
76
    if status == constants.JOB_STATUS_QUEUED:
77
        return job.received_timestamp
78
    try:  # Compatibility with Ganeti version
79
        if status == constants.JOB_STATUS_WAITLOCK:
80
            return op.start_timestamp
81
    except AttributeError:
82
        if status == constants.JOB_STATUS_WAITING:
83
            return op.start_timestamp
84
    if status == constants.JOB_STATUS_CANCELING:
85
        return op.start_timestamp
86
    if status == constants.JOB_STATUS_RUNNING:
87
        return op.exec_timestamp
88
    if status in constants.JOBS_FINALIZED:
89
        # success, canceled, error
90
        return op.end_timestamp
91

    
92
    raise InvalidBackendStatus(status, job)
93

    
94

    
95
class InvalidBackendStatus(Exception):
96
    def __init__(self, status, job):
97
        self.status = status
98
        self.job = job
99

    
100
    def __str__(self):
101
        return repr("Invalid backend status: %s in job %s"
102
                    % (self.status, self.job))
103

    
104

    
105
def prefix_from_name(name):
106
    return name.split('-')[0]
107

    
108

    
109
def get_field(from_, field):
110
    try:
111
        return getattr(from_, field)
112
    except AttributeError:
113
        None
114

    
115

    
116
class JobFileHandler(pyinotify.ProcessEvent):
117
    def __init__(self, logger, cluster_name):
118
        pyinotify.ProcessEvent.__init__(self)
119
        self.logger = logger
120
        self.cluster_name = cluster_name
121

    
122
        self.client = AMQPClient(confirm_buffer=25)
123
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
124

    
125
        self.client.connect()
126
        handler_logger.info("Connected succesfully")
127

    
128
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
129

    
130
        self.op_handlers = {"INSTANCE": self.process_instance_op,
131
                            "NETWORK": self.process_network_op}
132
                            # "GROUP": self.process_group_op}
133

    
134
    def process_IN_CLOSE_WRITE(self, event):
135
        self.process_IN_MOVED_TO(event)
136

    
137
    def process_IN_MOVED_TO(self, event):
138
        jobfile = os.path.join(event.path, event.name)
139
        if not event.name.startswith("job-"):
140
            self.logger.debug("Not a job file: %s" % event.path)
141
            return
142

    
143
        try:
144
            data = utils.ReadFile(jobfile)
145
        except IOError:
146
            return
147

    
148
        data = serializer.LoadJson(data)
149
        try:  # Compatibility with Ganeti version
150
            job = jqueue._QueuedJob.Restore(None, data, False)
151
        except TypeError:
152
            job = jqueue._QueuedJob.Restore(None, data)
153

    
154
        job_id = int(job.id)
155

    
156
        for op in job.ops:
157
            op_id = op.input.OP_ID
158

    
159
            msg = None
160
            try:
161
                handler_fn = self.op_handlers[op_id.split('_')[1]]
162
                msg, routekey = handler_fn(op, job_id)
163
            except KeyError:
164
                pass
165

    
166
            if not msg:
167
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
168
                continue
169

    
170
            # Generate a unique message identifier
171
            event_time = get_time_from_status(op, job)
172

    
173
            # Get the last line of the op log as message
174
            try:
175
                logmsg = op.log[-1][-1]
176
            except IndexError:
177
                logmsg = None
178

    
179
            # Add shared attributes for all operations
180
            msg.update({"event_time": event_time,
181
                        "operation": op_id,
182
                        "status": op.status,
183
                        "cluster": self.cluster_name,
184
                        "logmsg": logmsg,
185
                        "jobId": job_id})
186

    
187
            msg = json.dumps(msg)
188
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
189

    
190
            # Send the message to RabbitMQ
191
            self.client.basic_publish(settings.EXCHANGE_GANETI,
192
                                      routekey,
193
                                      msg)
194

    
195
    def process_instance_op(self, op, job_id):
196
        """ Process OP_INSTANCE_* opcodes.
197

198
        """
199
        input = op.input
200
        op_id = input.OP_ID
201

    
202
        instances = None
203
        instances = get_field(input, 'instance_name')
204
        if not instances:
205
            instances = get_field(input, 'instances')
206
            if not instances or len(instances) > 1:
207
                # Do not publish messages for jobs with no or multiple
208
                # instances.
209
                # Currently snf-dispatcher can not normally handle these messages
210
                return None, None
211
            else:
212
                instances = instances[0]
213

    
214
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
215
                          instances, op.status)
216

    
217
        msg = {"type": "ganeti-op-status",
218
               "instance": instances,
219
               "operation": op_id}
220

    
221
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
222

    
223
        return msg, routekey
224

    
225
    def process_network_op(self, op, job_id):
226
        """ Process OP_NETWORK_* opcodes.
227

228
        """
229

    
230
        input = op.input
231
        op_id = input.OP_ID
232
        network_name = get_field(input, 'network_name')
233

    
234
        if not network_name:
235
            return None, None
236

    
237
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
238
                          network_name, op.status)
239

    
240
        msg = {'operation':    op_id,
241
               'type':         "ganeti-network-status",
242
               'network':      network_name,
243
               'subnet':       get_field(input, 'network'),
244
               # 'network_mode': get_field(input, 'network_mode'),
245
               # 'network_link': get_field(input, 'network_link'),
246
               'gateway':      get_field(input, 'gateway'),
247
               # 'reserved_ips': get_field(input, 'reserved_ips'),
248
               'group_name':   get_field(input, 'group_name')}
249

    
250
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
251

    
252
        return msg, routekey
253

    
254

    
255
    # def process_group_op(self, op, job_id):
256
    #     """ Process OP_GROUP_* opcodes.
257

    
258
    #     """
259
    #     return None, None
260

    
261

    
262

    
263

    
264
def find_cluster_name():
265
    path = constants.DATA_DIR + "/ssconf_" + constants.SS_CLUSTER_NAME
266
    f = open(path)
267
    name = f.readline().rstrip()
268
    f.close()
269
    return name
270

    
271
handler_logger = None
272
def fatal_signal_handler(signum, frame):
273
    global handler_logger
274

    
275
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
276
                        signum)
277
    raise SystemExit
278

    
279
def parse_arguments(args):
280
    from optparse import OptionParser
281

    
282
    parser = OptionParser()
283
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
284
                      help="Enable debugging information")
285
    parser.add_option("-l", "--log", dest="log_file",
286
                      default="/var/log/snf-ganeti-eventd.log",
287
                      metavar="FILE",
288
                      help="Write log to FILE instead of %s" %
289
                          "/var/log/snf-ganeti-eventd.log")
290
    parser.add_option('--pid-file', dest="pid_file",
291
                      default="/var/run/snf-ganeti-eventd.pid",
292
                      metavar='PIDFILE',
293
                      help="Save PID to file (default: %s)" %
294
                          "/var/run/snf-ganeti-eventd.pid")
295

    
296
    return parser.parse_args(args)
297

    
298
def main():
299
    global handler_logger
300

    
301
    (opts, args) = parse_arguments(sys.argv[1:])
302

    
303

    
304
    # Initialize logger
305
    lvl = logging.DEBUG if opts.debug else logging.INFO
306
    logger = logging.getLogger("ganeti.eventd")
307
    logger.setLevel(lvl)
308
    formatter = logging.Formatter(
309
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
310
        "%Y-%m-%d %H:%M:%S")
311
    handler = logging.FileHandler(opts.log_file)
312
    handler.setFormatter(formatter)
313
    logger.addHandler(handler)
314
    handler_logger = logger
315

    
316
    # Create pidfile
317
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
318

    
319
    # Remove any stale PID files, left behind by previous invocations
320
    if daemon.runner.is_pidfile_stale(pidf):
321
        logger.warning("Removing stale PID lock file %s", pidf.path)
322
        pidf.break_lock()
323

    
324
    # Become a daemon:
325
    # Redirect stdout and stderr to handler.stream to catch
326
    # early errors in the daemonization process [e.g., pidfile creation]
327
    # which will otherwise go to /dev/null.
328
    daemon_context = daemon.DaemonContext(
329
            pidfile=pidf,
330
            umask=022,
331
            stdout=handler.stream,
332
            stderr=handler.stream,
333
            files_preserve=[handler.stream])
334
    try:
335
        daemon_context.open()
336
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
337
        logger.critical("Failed to lock pidfile %s, another instance running?",
338
                        pidf.path)
339
        sys.exit(1)
340

    
341
    logger.info("Became a daemon")
342

    
343
    # Catch signals to ensure graceful shutdown
344
    signal(SIGINT, fatal_signal_handler)
345
    signal(SIGTERM, fatal_signal_handler)
346

    
347
    # Monitor the Ganeti job queue, create and push notifications
348
    wm = pyinotify.WatchManager()
349
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
350
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
351
    cluster_name = find_cluster_name()
352
    handler = JobFileHandler(logger, cluster_name)
353
    notifier = pyinotify.Notifier(wm, handler)
354

    
355
    try:
356
        # Fail if adding the inotify() watch fails for any reason
357
        res = wm.add_watch(constants.QUEUE_DIR, mask)
358
        if res[constants.QUEUE_DIR] < 0:
359
            raise Exception("pyinotify add_watch returned negative descriptor")
360

    
361
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
362
                cluster_name))
363

    
364
        while True:    # loop forever
365
            # process the queue of events as explained above
366
            notifier.process_events()
367
            if notifier.check_events():
368
                # read notified events and enqeue them
369
                notifier.read_events()
370
    except SystemExit:
371
        logger.info("SystemExit")
372
    except:
373
        logger.exception("Caught exception, terminating")
374
    finally:
375
        # destroy the inotify's instance on this interrupt (stop monitoring)
376
        notifier.stop()
377
        raise
378

    
379
if __name__ == "__main__":
380
    sys.exit(main())
381

    
382
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :