Revision b024b97a

b/ganeti/ganeti-eventd.py
1
#!/usr/bin/env python
2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
4
#
5
"""Ganeti notification daemon with ampq
6

  
7
A daemon to monitor the Ganeti job queue and publish job progress
8
and Ganeti VM state notifications over a 0mq PUB endpoint.
9

  
10
"""
11

  
12
from django.core.management import setup_environ
13

  
14
import sys
15
import os
16
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
17
sys.path.append(path)
18
import synnefo.settings as settings
19

  
20
setup_environ(settings)
21

  
22
import time
23
import json
24
import logging
25
import pyinotify
26
import daemon
27
import daemon.pidlockfile
28
from signal import signal, SIGINT, SIGTERM
29

  
30
from carrot.connection import BrokerConnection
31
from carrot.messaging import Publisher
32

  
33
from threading import Thread, Event, currentThread
34

  
35
from ganeti import utils
36
from ganeti import jqueue
37
from ganeti import constants
38
from ganeti import serializer
39

  
40
class JobFileHandler(pyinotify.ProcessEvent):
41
    def __init__(self, logger, amqpd):
42
            pyinotify.ProcessEvent.__init__(self)
43
            self.logger = logger
44
            self.amqpd = amqpd
45

  
46
    def process_IN_CLOSE_WRITE(self, event):
47
        jobfile = os.path.join(event.path, event.name)
48
        if not event.name.startswith("job-"):
49
            self.logger.debug("Not a job file: %s" % event.path)
50
            return
51

  
52
        try:
53
            data = utils.ReadFile(jobfile)
54
        except IOError:
55

  
56
            return
57

  
58
        data = serializer.LoadJson(data)
59
        job = jqueue._QueuedJob.Restore(None, data)
60

  
61
        for op in job.ops:
62
            instances = ""
63
            try:
64
                instances = " ".join(op.input.instances)
65
            except AttributeError:
66
                pass
67

  
68
            try:
69
                instances = op.input.instance_name
70
            except AttributeError:
71
                pass
72

  
73
            # Get the last line of the op log as message
74
            try:
75
                logmsg = op.log[-1][-1]
76
            except IndexError:
77
                logmsg = None
78
            
79
            self.logger.debug("%d: %s(%s) %s %s",
80
                int(job.id), op.input.OP_ID, instances, op.status, logmsg)
81

  
82
            # Construct message
83
            msg = {
84
                "type": "ganeti-op-status",
85
                "instance": instances,
86
                "operation": op.input.OP_ID,
87
                "jobId": int(job.id),
88
                "status": op.status,
89
                "logmsg": logmsg
90
            }
91
            if logmsg:
92
                msg["message"] = logmsg
93
            
94
            self.logger.debug("PUSHing msg: %s", json.dumps(msg))
95
            amqpd.send(json.dumps(msg))
96

  
97

  
98
handler_logger = None
99
def fatal_signal_handler(signum, frame):
100
    global handler_logger
101

  
102
    handler_logger.info("Caught fatal signal %d, will raise SystemExit",
103
        signum)
104
    raise SystemExit
105

  
106
def parse_arguments(args):
107
    from optparse import OptionParser
108

  
109
    parser = OptionParser()
110
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
111
                      help="Enable debugging information")
112
    parser.add_option("-l", "--log", dest="log_file",
113
                      default=settings.GANETI_EVENTD_LOG_FILE,
114
                      metavar="FILE",
115
                      help="Write log to FILE instead of %s" %
116
                      settings.GANETI_EVENTD_LOG_FILE),
117
    parser.add_option('--pid-file', dest="pid_file",
118
                      default=settings.GANETI_EVENTD_PID_FILE,
119
                      metavar='PIDFILE',
120
                      help="Save PID to file (default: %s)" %
121
                      settings.GANETI_EVENTD_PID_FILE)
122

  
123
    return parser.parse_args(args)
124

  
125
def main():
126
    global handler_logger
127

  
128
    (opts, args) = parse_arguments(sys.argv[1:])
129

  
130
    # Create pidfile
131
    pidf = daemon.pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
132

  
133
    # Initialize logger
134
    lvl = logging.DEBUG if opts.debug else logging.INFO
135
    logger = logging.getLogger("ganeti-amqpd")
136
    logger.setLevel(lvl)
137
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
138
        "%Y-%m-%d %H:%M:%S")
139
    handler = logging.FileHandler(opts.log_file)
140
    handler.setFormatter(formatter)
141
    logger.addHandler(handler)
142
    handler_logger = logger
143

  
144
#    # Become a daemon:
145
#    # Redirect stdout and stderr to handler.stream to catch
146
#    # early errors in the daemonization process [e.g., pidfile creation]
147
#    # which will otherwise go to /dev/null.
148
#    daemon_context = daemon.DaemonContext(
149
#        pidfile=pidf,
150
#        umask=022,
151
#        stdout=handler.stream,
152
#        stderr=handler.stream,
153
#        files_preserve=[handler.stream])
154
#    daemon_context.open()
155
#    logger.info("Became a daemon")
156
#
157
#    # Catch signals to ensure graceful shutdown
158
#    signal(SIGINT, fatal_signal_handler)
159
#    signal(SIGTERM, fatal_signal_handler)
160

  
161
    #Init connection to RabbitMQ
162
    conn = BrokerConnection(hostname="localhost", port=5672,userid="guest",
163
                            password="guest",virtual_host="/")
164
    publisher = Publisher(connection=conn, exchange="ganeti",
165
                          routing_key="importer")
166

  
167

  
168
    # Monitor the Ganeti job queue, create and push notifications
169
    wm = pyinotify.WatchManager()
170
    mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
171
    handler = JobFileHandler(logger, publisher)
172
    notifier = pyinotify.Notifier(wm, handler)
173

  
174
    try:
175
        # Fail if adding the inotify() watch fails for any reason
176
        res = wm.add_watch(constants.QUEUE_DIR, mask)
177
        if res[constants.QUEUE_DIR] < 0:
178
            raise Exception("pyinotify add_watch returned negative watch descriptor")
179
        
180
        logger.info("Now watching %s" % constants.QUEUE_DIR)
181

  
182
        while True:    # loop forever
183
        # process the queue of events as explained above
184
            notifier.process_events()
185
            if notifier.check_events():
186
                # read notified events and enqeue them
187
                notifier.read_events()
188
    except SystemExit:
189
        logger.info("SystemExit")
190
    except:
191
        logger.exception("Caught exception, terminating")
192
    finally:
193
        # destroy the inotify's instance on this interrupt (stop monitoring)
194
        notifier.stop()
195
        # mark the 0mq thread as stopped, wake it up so that it notices
196
        raise
197

  
198
if __name__ == "__main__":
199
    sys.exit(main())
200

  
201
# vim: set ts=4 sts=4 sw=4 et ai :
b/settings.py.dist
162 162
# parameter refers to a point in time more than POLL_LIMIT seconds ago.
163 163
POLL_LIMIT = 3600
164 164

  
165
# Configuration for ganeti-0mqd, the Ganeti notification daemon
166
#
167
# ganeti-0mqd uses two 0mqd endpoints:
168
#   *  A PULL endpoint for receiving job updates from code running inside
169
#      Ganeti hooks, running on TCP port GANETI_0MQD_PULL_PORT on the
170
#      Ganeti master IP,
171
#   *  A PUB endpoint for publishing notifications to the rest of the
172
#      infrastructure, running on TCP port GANETI_0MQD_PUB_PORT on the
173
#      Ganeti master IP.
174
#
175
GANETI_0MQD_PUB_PORT = "5801"
176
GANETI_0MQD_PULL_PORT = "5802"
177
GANETI_0MQD_LOG_FILE = "/var/log/synnefo/ganeti-0mqd.log"
178
GANETI_0MQD_PID_FILE = "/var/run/synnefo/ganeti-0mqd.pid"
165
# Configuration for ganeti-eventd, the Ganeti notification daemon
166
GANETI_EVENTD_LOG_FILE = "/var/log/synnefo/ganeti-eventd.log"
167
GANETI_EVENTD_PID_FILE = "/var/run/synnefo/ganeti-eventd.pid"
168

  
169
#Rabbit work queue end point
170
RABBIT_HOST=""
171
RABBIT_PORT="5672"
172
RABBIT_USERNAME="guest"
173
RABBIT_PASSWORD="guest"
174
RABBIT_QUEUE="/"
179 175

  
180 176
# The API implementation needs to accept and return absolute references
181 177
# to its resources. Thus, it needs to know its public URL.

Also available in: Unified diff