Revision c183005e logic/dispatcher.py

b/logic/dispatcher.py
1 1
#!/usr/bin/env python
2 2
#
3
# Copyright (c) 201! Greek Research and Technology Network
3
# Copyright (c) 2011 Greek Research and Technology Network
4 4
#
5 5
"""Connect to a queue 
6 6

  
7
This daemon receives job notifications from ganeti-amqpd
8
and updates VM state in the DB accordingly.
7
This daemon receives job notifications from a number of queues
8

  
9 9

  
10 10
"""
11 11

  
......
29 29

  
30 30
from synnefo.logic import dispatcher_callbacks
31 31

  
32
#List of worker ids
32
# List of worker ids
33 33
global children
34 34

  
35 35
class Dispatcher:
......
61 61
        sys.exit()
62 62

  
63 63
    def _init(self):
64
        # Connect to RabbitMQ
64 65
        conn = None
65 66
        while conn == None:
66
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
67
            self.logger.info("Attempting to connect to %s",
68
                             settings.RABBIT_HOST)
67 69
            try:
68 70
                conn = amqp.Connection( host=settings.RABBIT_HOST,
69 71
                                    userid=settings.RABBIT_USERNAME,
......
76 78
        self.logger.info("Connection succesful, opening channel")
77 79
        self.chan = conn.channel()
78 80

  
79
        #Declare queues and exchanges
81
        # Declare queues and exchanges
80 82
        for exchange in settings.EXCHANGES:
81
            self.chan.exchange_declare(exchange=exchange, type="topic", durable=True, auto_delete=False)
83
            self.chan.exchange_declare(exchange=exchange, type="topic",
84
                                       durable=True, auto_delete=False)
82 85

  
83 86
        for queue in settings.QUEUES:
84
            self.chan.queue_declare(queue=queue, durable=True, exclusive=False, auto_delete=False)
87
            self.chan.queue_declare(queue=queue, durable=True,
88
                                    exclusive=False, auto_delete=False)
85 89

  
86 90
        bindings = settings.BINDINGS
87 91

  
92
        # Special queue for debugging, should not appear in production
88 93
        if self.debug:
89
            #Special queue handling, should not appear in production
90
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True, exclusive=False, auto_delete=False)
94
            self.chan.queue_declare(queue=settings.QUEUE_DEBUG, durable=True,
95
                                    exclusive=False, auto_delete=False)
91 96
            bindings += settings.BINDINGS_DEBUG
92 97

  
93
        #Bind queues to handler methods
98
        # Bind queues to handler methods
94 99
        for binding in bindings:
95 100
            try:
96 101
                cb = getattr(dispatcher_callbacks, binding[3])
97 102
            except AttributeError:
98 103
                self.logger.error("Cannot find callback %s" % binding[3])
104
                continue
99 105

  
100
            self.chan.queue_bind(queue=binding[0], exchange=binding[1], routing_key=binding[2])
106
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
107
                                 routing_key=binding[2])
101 108
            tag = self.chan.basic_consume(queue=binding[0], callback=cb)
102 109
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
103 110
                              (binding[1], binding[2], binding[0], binding[3]))
104 111
            self.clienttags.append(tag)
105 112

  
106
def exit_handler(signum, frame):
113

  
114
def _exit_handler(signum, frame):
115
    """"Catch exit signal in children processes."""
107 116
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(),signum)
108 117
    raise SystemExit
109 118

  
110
def parent_handler(signum, frame):
119

  
120
def _parent_handler(signum, frame):
121
    """"Catch exit signal in parent process and forward it to children."""
111 122
    global children
112 123
    print "Caught signal %d, sending kill signal to children" % signum
113 124
    [os.kill(pid, SIGTERM) for pid in children]
114 125

  
126

  
115 127
def child(cmdline, logger):
116
    #Cmd line argument parsing
128
    """The context of the child process"""
129

  
130
    # Cmd line argument parsing
117 131
    (opts, args) = parse_arguments(cmdline)
118 132
    d = Dispatcher(debug = opts.debug, logger = logger)
119 133

  
134
    # Start the event loop
120 135
    d.wait()
121 136

  
137

  
122 138
def parse_arguments(args):
123 139
    from optparse import OptionParser
124 140

  
125 141
    parser = OptionParser()
126
    parser.add_option("-d", "--debug", action="store_true", default=False, dest="debug",
142
    parser.add_option("-d", "--debug", action="store_true", default=False,
143
                      dest="debug",
127 144
            help="Enable debug mode")
128 145
    parser.add_option("-l", "--log", dest="log_file",
129 146
            default=settings.DISPATCHER_LOG_FILE,
130 147
            metavar="FILE",
131 148
            help="Write log to FILE instead of %s" %
132 149
            settings.DISPATCHER_LOG_FILE)
133
    parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
134
            help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
135
    parser.add_option("-w", "--workers", default=1, dest="workers",
136
            help="Number of workers to spawn")
150
    parser.add_option("-c", "--cleanup-queues", action="store_true",
151
                      default=False, dest="cleanup_queues",
152
            help="Remove all queues declared in settings.py (DANGEROUS!)")
153
    parser.add_option("-w", "--workers", default=2, dest="workers",
154
            help="Number of workers to spawn", type="int")
137 155
    
138 156
    return parser.parse_args(args)
139 157

  
140
def cleanup_queues() :
141 158

  
159
def cleanup_queues() :
160
    """Delete declared queues from RabbitMQ. Use with care!"""
142 161
    conn = amqp.Connection( host=settings.RABBIT_HOST,
143 162
                            userid=settings.RABBIT_USERNAME,
144 163
                            password=settings.RABBIT_PASSWORD,
......
168 187
    chan.close()
169 188
    chan.connection.close()
170 189

  
190

  
171 191
def main():
172 192
    global children, logger
173 193
    (opts, args) = parse_arguments(sys.argv[1:])
......
176 196
    lvl = logging.DEBUG if opts.debug else logging.INFO
177 197
    logger = logging.getLogger("synnefo.dispatcher")
178 198
    logger.setLevel(lvl)
179
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
199
    formatter = logging.Formatter(
200
            "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
180 201
            "%Y-%m-%d %H:%M:%S")
181 202
    handler = logging.FileHandler(opts.log_file)
182 203
    handler.setFormatter(formatter)
183 204
    logger.addHandler(handler)
184 205

  
185
    #Special case for the clean up queues action
206
    # Special case for the clean up queues action
186 207
    if opts.cleanup_queues:
187 208
        cleanup_queues()
188 209
        return
189 210

  
190
    #Fork workers
211
    # Fork workers
191 212
    children = []
192 213

  
193 214
    i = 0
......
195 216
        newpid = os.fork()
196 217

  
197 218
        if newpid == 0:
198
            signal(SIGINT, exit_handler)
199
            signal(SIGTERM, exit_handler)
200
            #child(sys.argv[1:], logger)
219
            signal(SIGINT, _exit_handler)
220
            signal(SIGTERM, _exit_handler)
221
            child(sys.argv[1:], logger)
201 222
            time.sleep(5)
202 223
            sys.exit(0)
203 224
        else:
......
207 228
        i += 1
208 229

  
209 230
    # Catch signals to ensure graceful shutdown
210
    signal(SIGINT,  parent_handler)
211
    signal(SIGTERM, parent_handler)
231
    signal(SIGINT,  _parent_handler)
232
    signal(SIGTERM, _parent_handler)
212 233

  
234
    # Wait for the last child process to exit
213 235
    try:
214 236
        os.wait()
215 237
    except Exception :
216 238
        pass
217 239

  
240

  
218 241
if __name__ == "__main__":
219 242
    logging.basicConfig(level=logging.DEBUG)
220 243
    sys.exit(main())

Also available in: Unified diff