Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ 51fc0054

History | View | Annotate | Download (12.5 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

    
58
from ganeti import utils
59
from ganeti import jqueue
60
from ganeti import constants
61
from ganeti import serializer
62
from ganeti.cli import GetClient
63

    
64
from synnefo import settings
65
from synnefo.lib.amqp import AMQPClient
66

    
67

    
68
def get_time_from_status(op, job):
69
    """Generate a unique message identifier for a ganeti job.
70

71
    The identifier is based on the timestamp of the job. Since a ganeti
72
    job passes from multiple states, we need to pick the timestamp that
73
    corresponds to each state.
74

75
    """
76
    status = op.status
77
    if status == constants.JOB_STATUS_QUEUED:
78
        return job.received_timestamp
79
    try:  # Compatibility with Ganeti version
80
        if status == constants.JOB_STATUS_WAITLOCK:
81
            return op.start_timestamp
82
    except AttributeError:
83
        if status == constants.JOB_STATUS_WAITING:
84
            return op.start_timestamp
85
    if status == constants.JOB_STATUS_CANCELING:
86
        return op.start_timestamp
87
    if status == constants.JOB_STATUS_RUNNING:
88
        return op.exec_timestamp
89
    if status in constants.JOBS_FINALIZED:
90
        # success, canceled, error
91
        return op.end_timestamp
92

    
93
    raise InvalidBackendStatus(status, job)
94

    
95

    
96
class InvalidBackendStatus(Exception):
97
    def __init__(self, status, job):
98
        self.status = status
99
        self.job = job
100

    
101
    def __str__(self):
102
        return repr("Invalid backend status: %s in job %s"
103
                    % (self.status, self.job))
104

    
105

    
106
def prefix_from_name(name):
107
    return name.split('-')[0]
108

    
109

    
110
def get_field(from_, field):
111
    try:
112
        return getattr(from_, field)
113
    except AttributeError:
114
        None
115

    
116

    
117
class JobFileHandler(pyinotify.ProcessEvent):
118
    def __init__(self, logger, cluster_name):
119
        pyinotify.ProcessEvent.__init__(self)
120
        self.logger = logger
121
        self.cluster_name = cluster_name
122

    
123
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25)
124
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
125

    
126
        self.client.connect()
127
        handler_logger.info("Connected succesfully")
128

    
129
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
130

    
131
        self.op_handlers = {"INSTANCE": self.process_instance_op,
132
                            "NETWORK": self.process_network_op}
133
                            # "GROUP": self.process_group_op}
134

    
135
    def process_IN_CLOSE_WRITE(self, event):
136
        self.process_IN_MOVED_TO(event)
137

    
138
    def process_IN_MOVED_TO(self, event):
139
        jobfile = os.path.join(event.path, event.name)
140
        if not event.name.startswith("job-"):
141
            self.logger.debug("Not a job file: %s" % event.path)
142
            return
143

    
144
        try:
145
            data = utils.ReadFile(jobfile)
146
        except IOError:
147
            return
148

    
149
        data = serializer.LoadJson(data)
150
        try:  # Compatibility with Ganeti version
151
            job = jqueue._QueuedJob.Restore(None, data, False)
152
        except TypeError:
153
            job = jqueue._QueuedJob.Restore(None, data)
154

    
155
        job_id = int(job.id)
156

    
157
        for op in job.ops:
158
            op_id = op.input.OP_ID
159

    
160
            msg = None
161
            try:
162
                handler_fn = self.op_handlers[op_id.split('_')[1]]
163
                msg, routekey = handler_fn(op, job_id)
164
            except KeyError:
165
                pass
166

    
167
            if not msg:
168
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
169
                continue
170

    
171
            # Generate a unique message identifier
172
            event_time = get_time_from_status(op, job)
173

    
174
            # Get the last line of the op log as message
175
            try:
176
                logmsg = op.log[-1][-1]
177
            except IndexError:
178
                logmsg = None
179

    
180
            # Add shared attributes for all operations
181
            msg.update({"event_time": event_time,
182
                        "operation": op_id,
183
                        "status": op.status,
184
                        "cluster": self.cluster_name,
185
                        "logmsg": logmsg,
186
                        "jobId": job_id})
187

    
188
            msg = json.dumps(msg)
189
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
190

    
191
            # Send the message to RabbitMQ
192
            self.client.basic_publish(settings.EXCHANGE_GANETI,
193
                                      routekey,
194
                                      msg)
195

    
196
    def process_instance_op(self, op, job_id):
197
        """ Process OP_INSTANCE_* opcodes.
198

199
        """
200
        input = op.input
201
        op_id = input.OP_ID
202

    
203
        instances = None
204
        instances = get_field(input, 'instance_name')
205
        if not instances:
206
            instances = get_field(input, 'instances')
207
            if not instances or len(instances) > 1:
208
                # Do not publish messages for jobs with no or multiple
209
                # instances.
210
                # Currently snf-dispatcher can not normally handle these messages
211
                return None, None
212
            else:
213
                instances = instances[0]
214

    
215
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
216
                          instances, op.status)
217

    
218
        msg = {"type": "ganeti-op-status",
219
               "instance": instances,
220
               "operation": op_id}
221

    
222
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
223

    
224
        return msg, routekey
225

    
226
    def process_network_op(self, op, job_id):
227
        """ Process OP_NETWORK_* opcodes.
228

229
        """
230

    
231
        input = op.input
232
        op_id = input.OP_ID
233
        network_name = get_field(input, 'network_name')
234

    
235
        if not network_name:
236
            return None, None
237

    
238
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
239
                          network_name, op.status)
240

    
241
        msg = {'operation':    op_id,
242
               'type':         "ganeti-network-status",
243
               'network':      network_name,
244
               'subnet':       get_field(input, 'network'),
245
               # 'network_mode': get_field(input, 'network_mode'),
246
               # 'network_link': get_field(input, 'network_link'),
247
               'gateway':      get_field(input, 'gateway'),
248
               # 'reserved_ips': get_field(input, 'reserved_ips'),
249
               'group_name':   get_field(input, 'group_name')}
250

    
251
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
252

    
253
        return msg, routekey
254

    
255

    
256
    # def process_group_op(self, op, job_id):
257
    #     """ Process OP_GROUP_* opcodes.
258

    
259
    #     """
260
    #     return None, None
261

    
262

    
263

    
264
def find_cluster_name():
265
    global handler_logger
266
    try:
267
        cl = GetClient()
268
        name = cl.QueryClusterInfo()['name']
269
    except Exception as e:
270
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
271
        raise e
272

    
273
    return name
274

    
275
handler_logger = None
276
def fatal_signal_handler(signum, frame):
277
    global handler_logger
278

    
279
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
280
                        signum)
281
    raise SystemExit
282

    
283
def parse_arguments(args):
284
    from optparse import OptionParser
285

    
286
    parser = OptionParser()
287
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
288
                      help="Enable debugging information")
289
    parser.add_option("-l", "--log", dest="log_file",
290
                      default="/var/log/snf-ganeti-eventd.log",
291
                      metavar="FILE",
292
                      help="Write log to FILE instead of %s" %
293
                          "/var/log/snf-ganeti-eventd.log")
294
    parser.add_option('--pid-file', dest="pid_file",
295
                      default="/var/run/snf-ganeti-eventd.pid",
296
                      metavar='PIDFILE',
297
                      help="Save PID to file (default: %s)" %
298
                          "/var/run/snf-ganeti-eventd.pid")
299

    
300
    return parser.parse_args(args)
301

    
302
def main():
303
    global handler_logger
304

    
305
    (opts, args) = parse_arguments(sys.argv[1:])
306

    
307

    
308
    # Initialize logger
309
    lvl = logging.DEBUG if opts.debug else logging.INFO
310
    logger = logging.getLogger("ganeti.eventd")
311
    logger.setLevel(lvl)
312
    formatter = logging.Formatter(
313
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
314
        "%Y-%m-%d %H:%M:%S")
315
    handler = logging.FileHandler(opts.log_file)
316
    handler.setFormatter(formatter)
317
    logger.addHandler(handler)
318
    handler_logger = logger
319

    
320
    # Create pidfile
321
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
322

    
323
    # Remove any stale PID files, left behind by previous invocations
324
    if daemon.runner.is_pidfile_stale(pidf):
325
        logger.warning("Removing stale PID lock file %s", pidf.path)
326
        pidf.break_lock()
327

    
328
    # Become a daemon:
329
    # Redirect stdout and stderr to handler.stream to catch
330
    # early errors in the daemonization process [e.g., pidfile creation]
331
    # which will otherwise go to /dev/null.
332
    daemon_context = daemon.DaemonContext(
333
            pidfile=pidf,
334
            umask=022,
335
            stdout=handler.stream,
336
            stderr=handler.stream,
337
            files_preserve=[handler.stream])
338
    try:
339
        daemon_context.open()
340
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
341
        logger.critical("Failed to lock pidfile %s, another instance running?",
342
                        pidf.path)
343
        sys.exit(1)
344

    
345
    logger.info("Became a daemon")
346

    
347
    # Catch signals to ensure graceful shutdown
348
    signal(SIGINT, fatal_signal_handler)
349
    signal(SIGTERM, fatal_signal_handler)
350

    
351
    # Monitor the Ganeti job queue, create and push notifications
352
    wm = pyinotify.WatchManager()
353
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
354
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
355

    
356
    cluster_name = find_cluster_name()
357

    
358
    handler = JobFileHandler(logger, cluster_name)
359
    notifier = pyinotify.Notifier(wm, handler)
360

    
361
    try:
362
        # Fail if adding the inotify() watch fails for any reason
363
        res = wm.add_watch(constants.QUEUE_DIR, mask)
364
        if res[constants.QUEUE_DIR] < 0:
365
            raise Exception("pyinotify add_watch returned negative descriptor")
366

    
367
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
368
                cluster_name))
369

    
370
        while True:    # loop forever
371
            # process the queue of events as explained above
372
            notifier.process_events()
373
            if notifier.check_events():
374
                # read notified events and enqeue them
375
                notifier.read_events()
376
    except SystemExit:
377
        logger.info("SystemExit")
378
    except:
379
        logger.exception("Caught exception, terminating")
380
    finally:
381
        # destroy the inotify's instance on this interrupt (stop monitoring)
382
        notifier.stop()
383
        raise
384

    
385
if __name__ == "__main__":
386
    sys.exit(main())
387

    
388
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :