Statistics
| Branch: | Tag: | Revision:

root / ganeti / ganeti-0mqd.py @ dac67c0a

History | View | Annotate | Download (9.7 kB)

1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon for 0mqd
6

7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications over a 0mq PUB endpoint.
9

10
"""
11

    
12
from django.core.management import setup_environ
13

    
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

    
20
setup_environ(settings)
21

    
22
import zmq
23
import time
24
import json
25
import logging
26
import pyinotify
27
import daemon
28
import daemon.pidlockfile
29
from signal import signal, SIGINT, SIGTERM
30

    
31
from threading import Thread, Event, currentThread
32

    
33
from ganeti import utils
34
from ganeti import jqueue
35
from ganeti import constants
36
from ganeti import serializer
37

    
38

    
39
class StoppableThread(Thread):
40
    """Thread class with a stop() method.
41

42
    The thread needs to check regularly for the stopped() condition.
43
    When it does, it exits, so that another thread may .join() it.
44

45
    """
46
    def __init__(self, *args, **kwargs):
47
        Thread.__init__(self, *args, **kwargs)
48
        self._stop = Event()
49

    
50
    def stop(self):
51
        self._stop.set()
52

    
53
    def stopped(self):
54
        return self._stop.isSet()
55

    
56

    
57
class GanetiZMQThread(StoppableThread):
58
    """The 0mq processing thread: PULLs and then PUBlishes notifications.
59

60
    This thread runs until stopped, receiving notifications over a
61
    0mq PULL socket, and publishing them over a 0mq PUB socket.
62

63
    The are currently two sources of notifications:
64
    a. ganeti-0mqd itself, monitoring the Ganeti job queue
65
    b. hooks running in the context of Ganeti
66

67
    """
68
    def __init__(self, logger, puller, publisher):
69
        StoppableThread.__init__(self)
70
        self.logger = logger
71
        self.puller = puller
72
        self.publisher = publisher
73

    
74
    def run(self):
75
        self.logger.debug("0mq thread ready")
76
        try:
77
            while True:
78
                # Pull
79
                self.logger.debug("Waiting on the 0mq PULL socket")
80
                data = self.puller.recv()
81
                self.logger.debug("Received message on 0mq PULL socket")
82
                if currentThread().stopped():
83
                    self.logger.debug("Thread has been stopped, leaving request loop")
84
                    return
85
                try:
86
                    msg = json.loads(data)
87
                    if msg['type'] not in ('ganeti-op-status'):
88
                        self.logger.error("Not forwarding message of unknown type: %s", msg.dumps(data))
89
                        continue
90
                except Exception, e:
91
                    self.logger.exception("Unexpected Exception decoding msg: %s", data)
92
                    continue
93

    
94
                # Publish
95
                self.logger.debug("PUBlishing msg: %s", json.dumps(msg))
96
                self.publisher.send_json(msg)
97

    
98
        except:
99
            self.logger.exception("Caught exception, terminating")
100
            os.kill(os.getpid(), SIGTERM)
101

    
102

    
103
class JobFileHandler(pyinotify.ProcessEvent):
104
    def __init__(self, logger, pusher):
105
            pyinotify.ProcessEvent.__init__(self)
106
            self.logger = logger
107
            self.pusher = pusher
108
                  
109
    def process_IN_CLOSE_WRITE(self, event):
110
        jobfile = os.path.join(event.path, event.name)
111
        if not event.name.startswith("job-"):
112
            self.logger.debug("Not a job file: %s" % event.path)
113
            return
114

    
115
        try:
116
            data = utils.ReadFile(jobfile)
117
        except IOError:
118

    
119
            return
120

    
121
        data = serializer.LoadJson(data)
122
        job = jqueue._QueuedJob.Restore(None, data)
123

    
124
        for op in job.ops:
125
            instances = ""
126
            try:
127
                instances = " ".join(op.input.instances)
128
            except AttributeError:
129
                pass
130

    
131
            try:
132
                instances = op.input.instance_name
133
            except AttributeError:
134
                pass
135

    
136
            # Get the last line of the op log as message
137
            try:
138
                logmsg = op.log[-1][-1]
139
            except IndexError:
140
                logmsg = None
141
            
142
            self.logger.debug("%d: %s(%s) %s %s",
143
                int(job.id), op.input.OP_ID, instances, op.status, logmsg)
144

    
145
            # Construct message
146
            msg = {
147
                "type": "ganeti-op-status",
148
                "instance": instances,
149
                "operation": op.input.OP_ID,
150
                "jobId": int(job.id),
151
                "status": op.status,
152
                "logmsg": logmsg
153
            }
154
            if logmsg:
155
                msg["message"] = logmsg
156
            
157
            # Push to the 0mq thread for PUBlication
158
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
159
            self.pusher.send_json(msg)
160

    
161

    
162
handler_logger = None
163
def fatal_signal_handler(signum, frame):
164
    global handler_logger
165

    
166
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
167
        signum)
168
    raise SystemExit
169

    
170
def parse_arguments(args):
171
    from optparse import OptionParser
172

    
173
    parser = OptionParser()
174
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
175
                      help="Enable debugging information")
176
    parser.add_option("-l", "--log", dest="log_file",
177
                      default=settings.GANETI_0MQD_LOG_FILE,
178
                      metavar="FILE",
179
                      help="Write log to FILE instead of %s" %
180
                      settings.GANETI_0MQD_LOG_FILE),
181
    parser.add_option('--pid-file', dest="pid_file",
182
                      default=settings.GANETI_0MQD_PID_FILE,
183
                      metavar='PIDFILE',
184
                      help="Save PID to file (default: %s)" %
185
                      settings.GANETI_0MQD_PID_FILE)
186
    parser.add_option("-p", "--pull-port", dest="pull_port",
187
                      default=settings.GANETI_0MQD_PULL_PORT, type="int", metavar="PULL_PORT",
188
                      help="The TCP port number to use for the 0mq PULL endpoint")
189
    parser.add_option("-P", "--pub-port", dest="pub_port",
190
                      default=settings.GANETI_0MQD_PUB_PORT, type="int", metavar="PUB_PORT",
191
                      help="The TCP port number to use for the 0mq PUB endpoint")
192

    
193
    return parser.parse_args(args)
194

    
195
def main():
196
    global handler_logger
197

    
198
    (opts, args) = parse_arguments(sys.argv[1:])
199

    
200
    # The 0mq endpoints to use for receiving and publishing notifications.
201
    GANETI_0MQD_PUB_ENDPOINT = "tcp://*:%d" % int(opts.pub_port)
202
    GANETI_0MQD_PULL_ENDPOINT = "tcp://*:%d" % int(opts.pull_port)
203
    GANETI_0MQD_INPROC_ENDPOINT = "inproc://ganeti-0mqd"
204

    
205
    # Create pidfile
206
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(
207
        opts.pid_file, 10)
208

    
209
    # Initialize logger
210
    lvl = logging.DEBUG if opts.debug else logging.INFO
211
    logger = logging.getLogger("ganeti-0mqd")
212
    logger.setLevel(lvl)
213
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
214
        "%Y-%m-%d %H:%M:%S")
215
    handler = logging.FileHandler(opts.log_file)
216
    handler.setFormatter(formatter)
217
    logger.addHandler(handler)
218
    handler_logger = logger
219

    
220
    # Become a daemon:
221
    # Redirect stdout and stderr to handler.stream to catch
222
    # early errors in the daemonization process [e.g., pidfile creation]
223
    # which will otherwise go to /dev/null.
224
    daemon_context = daemon.DaemonContext(
225
        pidfile=pidf,
226
        umask=022,
227
        stdout=handler.stream,
228
        stderr=handler.stream,
229
        files_preserve=[handler.stream])
230
    daemon_context.open()
231
    logger.info("Became a daemon")
232

    
233
    # Catch signals to ensure graceful shutdown
234
    signal(SIGINT, fatal_signal_handler)
235
    signal(SIGTERM, fatal_signal_handler)
236

    
237
    # Create 0mq sockets: One for the PUBlisher, one for the PULLer,
238
    # one inproc PUSHer for inter-thread communication.
239
    zmqc = zmq.Context()
240
    puller = zmqc.socket(zmq.PULL)
241
    puller.bind(GANETI_0MQD_PULL_ENDPOINT)
242
    puller.bind(GANETI_0MQD_INPROC_ENDPOINT)
243
    
244
    publisher = zmqc.socket(zmq.PUB)
245
    publisher.bind(GANETI_0MQD_PUB_ENDPOINT)
246
  
247
    pusher = zmqc.socket(zmq.PUSH)
248
    pusher.connect(GANETI_0MQD_INPROC_ENDPOINT)
249
    logger.info("PUSHing to %s", GANETI_0MQD_INPROC_ENDPOINT)
250
    logger.info("PULLing from (%s, %s)",
251
        GANETI_0MQD_PULL_ENDPOINT, GANETI_0MQD_INPROC_ENDPOINT)
252
    logger.info("PUBlishing on %s", GANETI_0MQD_PUB_ENDPOINT)
253

    
254
    # Use a separate thread for 0mq processing,
255
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
256
    zmqt = GanetiZMQThread(logger, puller, publisher)
257
    zmqt.start()
258

    
259
    # Monitor the Ganeti job queue, create and push notifications
260
    wm = pyinotify.WatchManager()
261
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
262
    handler = JobFileHandler(logger, pusher)
263
    notifier = pyinotify.Notifier(wm, handler)
264

    
265
    try:
266
        # Fail if adding the inotify() watch fails for any reason
267
        res = wm.add_watch(constants.QUEUE_DIR, mask)
268
        if res[constants.QUEUE_DIR] < 0:
269
            raise Exception("pyinotify add_watch returned negative watch descriptor")
270
        
271
        logger.info("Now watching %s" % constants.QUEUE_DIR)
272

    
273
        while True:    # loop forever
274
        # process the queue of events as explained above
275
            notifier.process_events()
276
            if notifier.check_events():
277
                # read notified events and enqeue them
278
                notifier.read_events()
279
    except SystemExit:
280
        logger.info("SystemExit")
281
    except:
282
        logger.exception("Caught exception, terminating")
283
    finally:
284
        # destroy the inotify's instance on this interrupt (stop monitoring)
285
        notifier.stop()
286
        # mark the 0mq thread as stopped, wake it up so that it notices
287
        zmqt.stop()
288
        pusher.send_json({'type': 'null'})
289
        raise
290

    
291

    
292
if __name__ == "__main__":
293
    sys.exit(main())
294

    
295
# vim: set ts=4 sts=4 sw=4 et ai :