Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-gtools / synnefo / ganeti / eventd.py @ c4e55622

History | View | Annotate | Download (9.2 kB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
#
4
# Copyright 2011 GRNET S.A. All rights reserved.
5
#
6
# Redistribution and use in source and binary forms, with or
7
# without modification, are permitted provided that the following
8
# conditions are met:
9
#
10
#   1. Redistributions of source code must retain the above
11
#      copyright notice, this list of conditions and the following
12
#      disclaimer.
13
#
14
#   2. Redistributions in binary form must reproduce the above
15
#      copyright notice, this list of conditions and the following
16
#      disclaimer in the documentation and/or other materials
17
#      provided with the distribution.
18
#
19
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
20
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
23
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
# POSSIBILITY OF SUCH DAMAGE.
31
#
32
# The views and conclusions contained in the software and
33
# documentation are those of the authors and should not be
34
# interpreted as representing official policies, either expressed
35
# or implied, of GRNET S.A.
36
#
37

    
38
"""Ganeti notification daemon with AMQP support
39

40
A daemon to monitor the Ganeti job queue and publish job progress
41
and Ganeti VM state notifications to the ganeti exchange
42
"""
43

    
44
import sys
45
import os
46
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
47
sys.path.append(path)
48

    
49
import json
50
import logging
51
import pyinotify
52
import daemon
53
import daemon.pidlockfile
54
import socket
55
from signal import signal, SIGINT, SIGTERM
56

    
57
from ganeti import utils
58
from ganeti import jqueue
59
from ganeti import constants
60
from ganeti import serializer
61

    
62
from synnefo import settings
63
from synnefo.lib.amqp import AMQPClient
64

    
65
def get_time_from_status(op, job):
66
    """Generate a unique message identifier for a ganeti job.
67

68
    The identifier is based on the timestamp of the job. Since a ganeti
69
    job passes from multiple states, we need to pick the timestamp that
70
    corresponds to each state.
71

72
    """
73
    status = op.status
74
    if status == constants.JOB_STATUS_QUEUED:
75
        return job.received_timestamp
76
    if status == constants.JOB_STATUS_WAITLOCK:
77
    #if status == constants.JOB_STATUS_WAITING:
78
        return op.start_timestamp
79
    if status == constants.JOB_STATUS_CANCELING:
80
        return op.start_timestamp
81
    if status == constants.JOB_STATUS_RUNNING:
82
        return op.exec_timestamp
83
    if status in constants.JOBS_FINALIZED:
84
        # success, canceled, error
85
        return op.end_timestamp
86

    
87
    raise InvalidBackendState(status, job)
88

    
89

    
90
class InvalidBackendStatus(Exception):
91
    def __init__(self, status, job):
92
        self.status = status
93
        self.job = job
94

    
95
    def __str__(self):
96
        return repr("Invalid backend status: %s in job %s"
97
                    % (self.status, self.job))
98

    
99

    
100
class JobFileHandler(pyinotify.ProcessEvent):
101
    def __init__(self, logger):
102
        pyinotify.ProcessEvent.__init__(self)
103
        self.logger = logger
104
        self.client = AMQPClient()
105
        handler_logger.info("Attempting to connect to RabbitMQ hosts")
106
        self.client.connect()
107
        handler_logger.info("Connected succesfully")
108

    
109
    def process_IN_CLOSE_WRITE(self, event):
110
        self.process_IN_MOVED_TO(event)
111

    
112
    def process_IN_MOVED_TO(self, event):
113
        jobfile = os.path.join(event.path, event.name)
114
        if not event.name.startswith("job-"):
115
            self.logger.debug("Not a job file: %s" % event.path)
116
            return
117

    
118
        try:
119
            data = utils.ReadFile(jobfile)
120
        except IOError:
121
            return
122

    
123
        data = serializer.LoadJson(data)
124
        try: # Version compatibility issue with Ganeti
125
            job = jqueue._QueuedJob.Restore(None, data, False)
126
        except TypeError:
127
            job = jqueue._QueuedJob.Restore(None, data)
128

    
129

    
130
        for op in job.ops:
131
            instances = ""
132
            try:
133
                instances = " ".join(op.input.instances)
134
            except AttributeError:
135
                pass
136

    
137
            try:
138
                instances = op.input.instance_name
139
            except AttributeError:
140
                pass
141

    
142
            # Get the last line of the op log as message
143
            try:
144
                logmsg = op.log[-1][-1]
145
            except IndexError:
146
                logmsg = None
147

    
148
            # Generate a unique message identifier
149
            event_time = get_time_from_status(op, job)
150

    
151
            self.logger.debug("Job: %d: %s(%s) %s %s",
152
                    int(job.id), op.input.OP_ID, instances, op.status, logmsg)
153

    
154
            # Construct message
155
            msg = {
156
                    "event_time" : event_time,
157
                    "type": "ganeti-op-status",
158
                    "instance": instances,
159
                    "operation": op.input.OP_ID,
160
                    "jobId": int(job.id),
161
                    "status": op.status,
162
                    "logmsg": logmsg
163
                    }
164

    
165
            instance_prefix = instances.split('-')[0]
166
            routekey = "ganeti.%s.event.op" % instance_prefix
167

    
168
            self.logger.debug("Delivering msg: %s (key=%s)", json.dumps(msg),
169
                              routekey)
170
            msg = json.dumps(msg)
171

    
172
            # Send the message to RabbitMQ
173
            self.client.basic_publish(exchange=settings.EXCHANGE_GANETI,
174
                                      routing_key=routekey,
175
                                      body=msg)
176

    
177
handler_logger = None
178
def fatal_signal_handler(signum, frame):
179
    global handler_logger
180

    
181
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
182
                        signum)
183
    raise SystemExit
184

    
185
def parse_arguments(args):
186
    from optparse import OptionParser
187

    
188
    parser = OptionParser()
189
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
190
                      help="Enable debugging information")
191
    parser.add_option("-l", "--log", dest="log_file",
192
                      default="/var/log/snf-ganeti-eventd.log",
193
                      metavar="FILE",
194
                      help="Write log to FILE instead of %s" %
195
                          "/var/log/snf-ganeti-eventd.log")
196
    parser.add_option('--pid-file', dest="pid_file",
197
                      default="/var/run/snf-ganeti-eventd.pid",
198
                      metavar='PIDFILE',
199
                      help="Save PID to file (default: %s)" %
200
                          "/var/run/snf-ganeti-eventd.pid")
201

    
202
    return parser.parse_args(args)
203

    
204
def main():
205
    global handler_logger
206

    
207
    (opts, args) = parse_arguments(sys.argv[1:])
208

    
209
    # Create pidfile
210
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
211

    
212
    # Initialize logger
213
    lvl = logging.DEBUG if opts.debug else logging.INFO
214
    logger = logging.getLogger("ganeti.eventd")
215
    logger.setLevel(lvl)
216
    formatter = logging.Formatter(
217
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
218
        "%Y-%m-%d %H:%M:%S")
219
    handler = logging.FileHandler(opts.log_file)
220
    handler.setFormatter(formatter)
221
    logger.addHandler(handler)
222
    handler_logger = logger
223

    
224
    # Become a daemon:
225
    # Redirect stdout and stderr to handler.stream to catch
226
    # early errors in the daemonization process [e.g., pidfile creation]
227
    # which will otherwise go to /dev/null.
228
    daemon_context = daemon.DaemonContext(
229
            pidfile=pidf,
230
            umask=022,
231
            stdout=handler.stream,
232
            stderr=handler.stream,
233
            files_preserve=[handler.stream])
234
    daemon_context.open()
235
    logger.info("Became a daemon")
236

    
237
    # Catch signals to ensure graceful shutdown
238
    signal(SIGINT, fatal_signal_handler)
239
    signal(SIGTERM, fatal_signal_handler)
240

    
241
    # Monitor the Ganeti job queue, create and push notifications
242
    wm = pyinotify.WatchManager()
243
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \
244
           pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
245
    handler = JobFileHandler(logger)
246
    notifier = pyinotify.Notifier(wm, handler)
247

    
248
    try:
249
        # Fail if adding the inotify() watch fails for any reason
250
        res = wm.add_watch(constants.QUEUE_DIR, mask)
251
        if res[constants.QUEUE_DIR] < 0:
252
            raise Exception("pyinotify add_watch returned negative descriptor")
253

    
254
        logger.info("Now watching %s" % constants.QUEUE_DIR)
255

    
256
        while True:    # loop forever
257
            # process the queue of events as explained above
258
            notifier.process_events()
259
            if notifier.check_events():
260
                # read notified events and enqeue them
261
                notifier.read_events()
262
    except SystemExit:
263
        logger.info("SystemExit")
264
    except:
265
        logger.exception("Caught exception, terminating")
266
    finally:
267
        # destroy the inotify's instance on this interrupt (stop monitoring)
268
        notifier.stop()
269
        raise
270

    
271
if __name__ == "__main__":
272
    sys.exit(main())
273

    
274
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :