Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ a4148a73

History | View | Annotate | Download (12.6 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import daemon.runner
55
from lockfile import LockTimeout
56
from signal import signal, SIGINT, SIGTERM
57

    
58
from ganeti import utils
59
from ganeti import jqueue
60
from ganeti import constants
61
from ganeti import serializer
62
from ganeti.cli import GetClient
63

    
64
from synnefo import settings
65
from synnefo.lib.amqp import AMQPClient
66

    
67

    
68
def get_time_from_status(op, job):
69
    """Generate a unique message identifier for a ganeti job.
70

71
    The identifier is based on the timestamp of the job. Since a ganeti
72
    job passes from multiple states, we need to pick the timestamp that
73
    corresponds to each state.
74

75
    """
76
    status = op.status
77
    if status == constants.JOB_STATUS_QUEUED:
78
        return job.received_timestamp
79
    try:  # Compatibility with Ganeti version
80
        if status == constants.JOB_STATUS_WAITLOCK:
81
            return op.start_timestamp
82
    except AttributeError:
83
        if status == constants.JOB_STATUS_WAITING:
84
            return op.start_timestamp
85
    if status == constants.JOB_STATUS_CANCELING:
86
        return op.start_timestamp
87
    if status == constants.JOB_STATUS_RUNNING:
88
        return op.exec_timestamp
89
    if status in constants.JOBS_FINALIZED:
90
        if op.end_timestamp:
91
            return op.end_timestamp
92
        else:
93
            # Error opcodes do not always have end timestamp
94
            return job.end_timestamp
95

    
96
    raise InvalidBackendStatus(status, job)
97

    
98

    
99
class InvalidBackendStatus(Exception):
100
    def __init__(self, status, job):
101
        self.status = status
102
        self.job = job
103

    
104
    def __str__(self):
105
        return repr("Invalid backend status: %s in job %s"
106
                    % (self.status, self.job))
107

    
108

    
109
def prefix_from_name(name):
110
    return name.split('-')[0]
111

    
112

    
113
def get_field(from_, field):
114
    try:
115
        return getattr(from_, field)
116
    except AttributeError:
117
        None
118

    
119

    
120
class JobFileHandler(pyinotify.ProcessEvent):
121
    def __init__(self, logger, cluster_name):
122
        pyinotify.ProcessEvent.__init__(self)
123
        self.logger = logger
124
        self.cluster_name = cluster_name
125

    
126
        self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25)
127
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
128

    
129
        self.client.connect()
130
        handler_logger.info("Connected succesfully")
131

    
132
        self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
133

    
134
        self.op_handlers = {"INSTANCE": self.process_instance_op,
135
                            "NETWORK": self.process_network_op}
136
                            # "GROUP": self.process_group_op}
137

    
138
    def process_IN_CLOSE_WRITE(self, event):
139
        self.process_IN_MOVED_TO(event)
140

    
141
    def process_IN_MOVED_TO(self, event):
142
        jobfile = os.path.join(event.path, event.name)
143
        if not event.name.startswith("job-"):
144
            self.logger.debug("Not a job file: %s" % event.path)
145
            return
146

    
147
        try:
148
            data = utils.ReadFile(jobfile)
149
        except IOError:
150
            return
151

    
152
        data = serializer.LoadJson(data)
153
        try:  # Compatibility with Ganeti version
154
            job = jqueue._QueuedJob.Restore(None, data, False)
155
        except TypeError:
156
            job = jqueue._QueuedJob.Restore(None, data)
157

    
158
        job_id = int(job.id)
159

    
160
        for op in job.ops:
161
            op_id = op.input.OP_ID
162

    
163
            msg = None
164
            try:
165
                handler_fn = self.op_handlers[op_id.split('_')[1]]
166
                msg, routekey = handler_fn(op, job_id)
167
            except KeyError:
168
                pass
169

    
170
            if not msg:
171
                self.logger.debug("Ignoring job: %s: %s", job_id, op_id)
172
                continue
173

    
174
            # Generate a unique message identifier
175
            event_time = get_time_from_status(op, job)
176

    
177
            # Get the last line of the op log as message
178
            try:
179
                logmsg = op.log[-1][-1]
180
            except IndexError:
181
                logmsg = None
182

    
183
            # Add shared attributes for all operations
184
            msg.update({"event_time": event_time,
185
                        "operation": op_id,
186
                        "status": op.status,
187
                        "cluster": self.cluster_name,
188
                        "logmsg": logmsg,
189
                        "jobId": job_id})
190

    
191
            msg = json.dumps(msg)
192
            self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey)
193

    
194
            # Send the message to RabbitMQ
195
            self.client.basic_publish(settings.EXCHANGE_GANETI,
196
                                      routekey,
197
                                      msg)
198

    
199
    def process_instance_op(self, op, job_id):
200
        """ Process OP_INSTANCE_* opcodes.
201

202
        """
203
        input = op.input
204
        op_id = input.OP_ID
205

    
206
        instances = None
207
        instances = get_field(input, 'instance_name')
208
        if not instances:
209
            instances = get_field(input, 'instances')
210
            if not instances or len(instances) > 1:
211
                # Do not publish messages for jobs with no or multiple
212
                # instances.
213
                # Currently snf-dispatcher can not normally handle these messages
214
                return None, None
215
            else:
216
                instances = instances[0]
217

    
218
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
219
                          instances, op.status)
220

    
221
        msg = {"type": "ganeti-op-status",
222
               "instance": instances,
223
               "operation": op_id}
224

    
225
        routekey = "ganeti.%s.event.op" % prefix_from_name(instances)
226

    
227
        return msg, routekey
228

    
229
    def process_network_op(self, op, job_id):
230
        """ Process OP_NETWORK_* opcodes.
231

232
        """
233

    
234
        input = op.input
235
        op_id = input.OP_ID
236
        network_name = get_field(input, 'network_name')
237

    
238
        if not network_name:
239
            return None, None
240

    
241
        self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id,
242
                          network_name, op.status)
243

    
244
        msg = {'operation':    op_id,
245
               'type':         "ganeti-network-status",
246
               'network':      network_name,
247
               'subnet':       get_field(input, 'network'),
248
               # 'network_mode': get_field(input, 'network_mode'),
249
               # 'network_link': get_field(input, 'network_link'),
250
               'gateway':      get_field(input, 'gateway'),
251
               # 'reserved_ips': get_field(input, 'reserved_ips'),
252
               'group_name':   get_field(input, 'group_name')}
253

    
254
        routekey = "ganeti.%s.event.network" % prefix_from_name(network_name)
255

    
256
        return msg, routekey
257

    
258

    
259
    # def process_group_op(self, op, job_id):
260
    #     """ Process OP_GROUP_* opcodes.
261

    
262
    #     """
263
    #     return None, None
264

    
265

    
266

    
267
def find_cluster_name():
268
    global handler_logger
269
    try:
270
        cl = GetClient()
271
        name = cl.QueryClusterInfo()['name']
272
    except Exception as e:
273
        handler_logger.error('Can not get the name of the Cluster: %s' % e)
274
        raise e
275

    
276
    return name
277

    
278
handler_logger = None
279
def fatal_signal_handler(signum, frame):
280
    global handler_logger
281

    
282
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
283
                        signum)
284
    raise SystemExit
285

    
286
def parse_arguments(args):
287
    from optparse import OptionParser
288

    
289
    parser = OptionParser()
290
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
291
                      help="Enable debugging information")
292
    parser.add_option("-l", "--log", dest="log_file",
293
                      default="/var/log/snf-ganeti-eventd.log",
294
                      metavar="FILE",
295
                      help="Write log to FILE instead of %s" %
296
                          "/var/log/snf-ganeti-eventd.log")
297
    parser.add_option('--pid-file', dest="pid_file",
298
                      default="/var/run/snf-ganeti-eventd.pid",
299
                      metavar='PIDFILE',
300
                      help="Save PID to file (default: %s)" %
301
                          "/var/run/snf-ganeti-eventd.pid")
302

    
303
    return parser.parse_args(args)
304

    
305
def main():
306
    global handler_logger
307

    
308
    (opts, args) = parse_arguments(sys.argv[1:])
309

    
310

    
311
    # Initialize logger
312
    lvl = logging.DEBUG if opts.debug else logging.INFO
313
    logger = logging.getLogger("ganeti.eventd")
314
    logger.setLevel(lvl)
315
    formatter = logging.Formatter(
316
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
317
        "%Y-%m-%d %H:%M:%S")
318
    handler = logging.FileHandler(opts.log_file)
319
    handler.setFormatter(formatter)
320
    logger.addHandler(handler)
321
    handler_logger = logger
322

    
323
    # Create pidfile
324
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
325

    
326
    # Remove any stale PID files, left behind by previous invocations
327
    if daemon.runner.is_pidfile_stale(pidf):
328
        logger.warning("Removing stale PID lock file %s", pidf.path)
329
        pidf.break_lock()
330

    
331
    # Become a daemon:
332
    # Redirect stdout and stderr to handler.stream to catch
333
    # early errors in the daemonization process [e.g., pidfile creation]
334
    # which will otherwise go to /dev/null.
335
    daemon_context = daemon.DaemonContext(
336
            pidfile=pidf,
337
            umask=022,
338
            stdout=handler.stream,
339
            stderr=handler.stream,
340
            files_preserve=[handler.stream])
341
    try:
342
        daemon_context.open()
343
    except (daemon.pidlockfile.AlreadyLocked, LockTimeout):
344
        logger.critical("Failed to lock pidfile %s, another instance running?",
345
                        pidf.path)
346
        sys.exit(1)
347

    
348
    logger.info("Became a daemon")
349

    
350
    # Catch signals to ensure graceful shutdown
351
    signal(SIGINT, fatal_signal_handler)
352
    signal(SIGTERM, fatal_signal_handler)
353

    
354
    # Monitor the Ganeti job queue, create and push notifications
355
    wm = pyinotify.WatchManager()
356
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
357
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
358

    
359
    cluster_name = find_cluster_name()
360

    
361
    handler = JobFileHandler(logger, cluster_name)
362
    notifier = pyinotify.Notifier(wm, handler)
363

    
364
    try:
365
        # Fail if adding the inotify() watch fails for any reason
366
        res = wm.add_watch(constants.QUEUE_DIR, mask)
367
        if res[constants.QUEUE_DIR] < 0:
368
            raise Exception("pyinotify add_watch returned negative descriptor")
369

    
370
        logger.info("Now watching %s of %s" % (constants.QUEUE_DIR,
371
                cluster_name))
372

    
373
        while True:    # loop forever
374
            # process the queue of events as explained above
375
            notifier.process_events()
376
            if notifier.check_events():
377
                # read notified events and enqeue them
378
                notifier.read_events()
379
    except SystemExit:
380
        logger.info("SystemExit")
381
    except:
382
        logger.exception("Caught exception, terminating")
383
    finally:
384
        # destroy the inotify's instance on this interrupt (stop monitoring)
385
        notifier.stop()
386
        raise
387

    
388
if __name__ == "__main__":
389
    sys.exit(main())
390

    
391
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :