Revision dac67c0a ganeti/ganeti-0mqd.py

b/ganeti/ganeti-0mqd.py
2 2
#
3 3
# Copyright (c) 2010 Greek Research and Technology Network
4 4
#
5
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
5
"""Ganeti notification daemon for 0mqd
6

  
7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications over a 0mq PUB endpoint.
9

  
10
"""
11

  
12
from django.core.management import setup_environ
6 13

  
7
import os
8 14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

  
20
setup_environ(settings)
21

  
9 22
import zmq
10 23
import time
11 24
import json
12 25
import logging
13 26
import pyinotify
27
import daemon
28
import daemon.pidlockfile
29
from signal import signal, SIGINT, SIGTERM
30

  
31
from threading import Thread, Event, currentThread
14 32

  
15 33
from ganeti import utils
16 34
from ganeti import jqueue
17 35
from ganeti import constants
18 36
from ganeti import serializer
19 37

  
20
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
38

  
39
class StoppableThread(Thread):
40
    """Thread class with a stop() method.
41

  
42
    The thread needs to check regularly for the stopped() condition.
43
    When it does, it exits, so that another thread may .join() it.
44

  
45
    """
46
    def __init__(self, *args, **kwargs):
47
        Thread.__init__(self, *args, **kwargs)
48
        self._stop = Event()
49

  
50
    def stop(self):
51
        self._stop.set()
52

  
53
    def stopped(self):
54
        return self._stop.isSet()
55

  
56

  
57
class GanetiZMQThread(StoppableThread):
58
    """The 0mq processing thread: PULLs and then PUBlishes notifications.
59

  
60
    This thread runs until stopped, receiving notifications over a
61
    0mq PULL socket, and publishing them over a 0mq PUB socket.
62

  
63
    The are currently two sources of notifications:
64
    a. ganeti-0mqd itself, monitoring the Ganeti job queue
65
    b. hooks running in the context of Ganeti
66

  
67
    """
68
    def __init__(self, logger, puller, publisher):
69
        StoppableThread.__init__(self)
70
        self.logger = logger
71
        self.puller = puller
72
        self.publisher = publisher
73

  
74
    def run(self):
75
        self.logger.debug("0mq thread ready")
76
        try:
77
            while True:
78
                # Pull
79
                self.logger.debug("Waiting on the 0mq PULL socket")
80
                data = self.puller.recv()
81
                self.logger.debug("Received message on 0mq PULL socket")
82
                if currentThread().stopped():
83
                    self.logger.debug("Thread has been stopped, leaving request loop")
84
                    return
85
                try:
86
                    msg = json.loads(data)
87
                    if msg['type'] not in ('ganeti-op-status'):
88
                        self.logger.error("Not forwarding message of unknown type: %s", msg.dumps(data))
89
                        continue
90
                except Exception, e:
91
                    self.logger.exception("Unexpected Exception decoding msg: %s", data)
92
                    continue
93

  
94
                # Publish
95
                self.logger.debug("PUBlishing msg: %s", json.dumps(msg))
96
                self.publisher.send_json(msg)
97

  
98
        except:
99
            self.logger.exception("Caught exception, terminating")
100
            os.kill(os.getpid(), SIGTERM)
21 101

  
22 102

  
23 103
class JobFileHandler(pyinotify.ProcessEvent):
24
    def __init__(self, publisher):
104
    def __init__(self, logger, pusher):
25 105
            pyinotify.ProcessEvent.__init__(self)
26
            self.publisher = publisher
106
            self.logger = logger
107
            self.pusher = pusher
27 108
                  
28 109
    def process_IN_CLOSE_WRITE(self, event):
29 110
        jobfile = os.path.join(event.path, event.name)
30 111
        if not event.name.startswith("job-"):
31
            logging.debug("Not a job file: %s" % event.path)
112
            self.logger.debug("Not a job file: %s" % event.path)
32 113
            return
33 114

  
34 115
        try:
......
58 139
            except IndexError:
59 140
                logmsg = None
60 141
            
61
            logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
62
            if op.status in constants.JOBS_FINALIZED:
63
                logging.info("%d: %s" % (int(job.id), op.status))
142
            self.logger.debug("%d: %s(%s) %s %s",
143
                int(job.id), op.input.OP_ID, instances, op.status, logmsg)
64 144

  
65 145
            # Construct message
66 146
            msg = {
......
69 149
                "operation": op.input.OP_ID,
70 150
                "jobId": int(job.id),
71 151
                "status": op.status,
72
		"logmsg": logmsg
152
                "logmsg": logmsg
73 153
            }
74 154
            if logmsg:
75 155
                msg["message"] = logmsg
76 156
            
77
            # Output as JSON
78
            print json.dumps(msg)
79
            
80
            self.publisher.send_json(msg)
157
            # Push to the 0mq thread for PUBlication
158
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
159
            self.pusher.send_json(msg)
160

  
161

  
162
handler_logger = None
163
def fatal_signal_handler(signum, frame):
164
    global handler_logger
81 165

  
166
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
167
        signum)
168
    raise SystemExit
169

  
170
def parse_arguments(args):
171
    from optparse import OptionParser
172

  
173
    parser = OptionParser()
174
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
175
                      help="Enable debugging information")
176
    parser.add_option("-l", "--log", dest="log_file",
177
                      default=settings.GANETI_0MQD_LOG_FILE,
178
                      metavar="FILE",
179
                      help="Write log to FILE instead of %s" %
180
                      settings.GANETI_0MQD_LOG_FILE),
181
    parser.add_option('--pid-file', dest="pid_file",
182
                      default=settings.GANETI_0MQD_PID_FILE,
183
                      metavar='PIDFILE',
184
                      help="Save PID to file (default: %s)" %
185
                      settings.GANETI_0MQD_PID_FILE)
186
    parser.add_option("-p", "--pull-port", dest="pull_port",
187
                      default=settings.GANETI_0MQD_PULL_PORT, type="int", metavar="PULL_PORT",
188
                      help="The TCP port number to use for the 0mq PULL endpoint")
189
    parser.add_option("-P", "--pub-port", dest="pub_port",
190
                      default=settings.GANETI_0MQD_PUB_PORT, type="int", metavar="PUB_PORT",
191
                      help="The TCP port number to use for the 0mq PUB endpoint")
192

  
193
    return parser.parse_args(args)
82 194

  
83 195
def main():
196
    global handler_logger
197

  
198
    (opts, args) = parse_arguments(sys.argv[1:])
199

  
200
    # The 0mq endpoints to use for receiving and publishing notifications.
201
    GANETI_0MQD_PUB_ENDPOINT = "tcp://*:%d" % int(opts.pub_port)
202
    GANETI_0MQD_PULL_ENDPOINT = "tcp://*:%d" % int(opts.pull_port)
203
    GANETI_0MQD_INPROC_ENDPOINT = "inproc://ganeti-0mqd"
204

  
205
    # Create pidfile
206
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(
207
        opts.pid_file, 10)
208

  
209
    # Initialize logger
210
    lvl = logging.DEBUG if opts.debug else logging.INFO
211
    logger = logging.getLogger("ganeti-0mqd")
212
    logger.setLevel(lvl)
213
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
214
        "%Y-%m-%d %H:%M:%S")
215
    handler = logging.FileHandler(opts.log_file)
216
    handler.setFormatter(formatter)
217
    logger.addHandler(handler)
218
    handler_logger = logger
219

  
220
    # Become a daemon:
221
    # Redirect stdout and stderr to handler.stream to catch
222
    # early errors in the daemonization process [e.g., pidfile creation]
223
    # which will otherwise go to /dev/null.
224
    daemon_context = daemon.DaemonContext(
225
        pidfile=pidf,
226
        umask=022,
227
        stdout=handler.stream,
228
        stderr=handler.stream,
229
        files_preserve=[handler.stream])
230
    daemon_context.open()
231
    logger.info("Became a daemon")
232

  
233
    # Catch signals to ensure graceful shutdown
234
    signal(SIGINT, fatal_signal_handler)
235
    signal(SIGTERM, fatal_signal_handler)
236

  
237
    # Create 0mq sockets: One for the PUBlisher, one for the PULLer,
238
    # one inproc PUSHer for inter-thread communication.
84 239
    zmqc = zmq.Context()
85
    publisher = zmqc.socket(zmq.PUB)
86
    publisher.bind(GANETI_ZMQ_PUBLISHER)
87
    logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
240
    puller = zmqc.socket(zmq.PULL)
241
    puller.bind(GANETI_0MQD_PULL_ENDPOINT)
242
    puller.bind(GANETI_0MQD_INPROC_ENDPOINT)
88 243
    
244
    publisher = zmqc.socket(zmq.PUB)
245
    publisher.bind(GANETI_0MQD_PUB_ENDPOINT)
246
  
247
    pusher = zmqc.socket(zmq.PUSH)
248
    pusher.connect(GANETI_0MQD_INPROC_ENDPOINT)
249
    logger.info("PUSHing to %s", GANETI_0MQD_INPROC_ENDPOINT)
250
    logger.info("PULLing from (%s, %s)",
251
        GANETI_0MQD_PULL_ENDPOINT, GANETI_0MQD_INPROC_ENDPOINT)
252
    logger.info("PUBlishing on %s", GANETI_0MQD_PUB_ENDPOINT)
253

  
254
    # Use a separate thread for 0mq processing,
255
    # needed because the Python runtime interacts badly with 0mq's blocking semantics.
256
    zmqt = GanetiZMQThread(logger, puller, publisher)
257
    zmqt.start()
258

  
259
    # Monitor the Ganeti job queue, create and push notifications
89 260
    wm = pyinotify.WatchManager()
90 261
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
91
    handler = JobFileHandler(publisher)
262
    handler = JobFileHandler(logger, pusher)
92 263
    notifier = pyinotify.Notifier(wm, handler)
93
    wm.add_watch(constants.QUEUE_DIR, mask)
94 264

  
95
    logging.info("Now watching %s" % constants.QUEUE_DIR)
265
    try:
266
        # Fail if adding the inotify() watch fails for any reason
267
        res = wm.add_watch(constants.QUEUE_DIR, mask)
268
        if res[constants.QUEUE_DIR] < 0:
269
            raise Exception("pyinotify add_watch returned negative watch descriptor")
270
        
271
        logger.info("Now watching %s" % constants.QUEUE_DIR)
96 272

  
97
    while True:    # loop forever
98
        try:
99
            # process the queue of events as explained above
273
        while True:    # loop forever
274
        # process the queue of events as explained above
100 275
            notifier.process_events()
101 276
            if notifier.check_events():
102 277
                # read notified events and enqeue them
103 278
                notifier.read_events()
104
        except KeyboardInterrupt:
105
            # destroy the inotify's instance on this interrupt (stop monitoring)
106
            notifier.stop()
107
            break
279
    except SystemExit:
280
        logger.info("SystemExit")
281
    except:
282
        logger.exception("Caught exception, terminating")
283
    finally:
284
        # destroy the inotify's instance on this interrupt (stop monitoring)
285
        notifier.stop()
286
        # mark the 0mq thread as stopped, wake it up so that it notices
287
        zmqt.stop()
288
        pusher.send_json({'type': 'null'})
289
        raise
108 290

  
109 291

  
110 292
if __name__ == "__main__":
111
    logging.basicConfig(level=logging.DEBUG)
112 293
    sys.exit(main())
113 294

  
114 295
# vim: set ts=4 sts=4 sw=4 et ai :

Also available in: Unified diff