Revision 2cd99e7a logic/dispatcher.py

b/logic/dispatcher.py
2 2
#
3 3
# Copyright (c) 2011 Greek Research and Technology Network
4 4
#
5
"""Connect to a queue 
6

  
7
This daemon receives job notifications from a number of queues
5
""" Message queue setup and dispatch
8 6

  
7
This program sets up connections to the queues configured in settings.py
8
and implements the message wait and dispatch loops. Actual messages are
9
handled in the dispatched functions.
9 10

  
10 11
"""
11 12

  
......
20 21
setup_environ(settings)
21 22

  
22 23
from amqplib import client_0_8 as amqp
23

  
24 24
from signal import signal, SIGINT, SIGTERM
25 25

  
26 26
import logging
......
29 29

  
30 30
from synnefo.logic import dispatcher_callbacks
31 31

  
32
# List of worker ids
33
global children
34 32

  
35 33
class Dispatcher:
36 34

  
......
53 51
            except socket.error:
54 52
                self.logger.error("Server went away, reconnecting...")
55 53
                self._init()
56
                pass
57 54

  
58 55
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
59 56
        self.chan.close()
......
73 70
                                       virtual_host=settings.RABBIT_VHOST)
74 71
            except socket.error:
75 72
                time.sleep(1)
76
                pass
77 73

  
78 74
        self.logger.info("Connection succesful, opening channel")
79 75
        self.chan = conn.channel()
......
98 94
        # Bind queues to handler methods
99 95
        for binding in bindings:
100 96
            try:
101
                cb = getattr(dispatcher_callbacks, binding[3])
97
                callback = getattr(dispatcher_callbacks, binding[3])
102 98
            except AttributeError:
103 99
                self.logger.error("Cannot find callback %s" % binding[3])
104 100
                continue
105 101

  
106 102
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
107 103
                                 routing_key=binding[2])
108
            tag = self.chan.basic_consume(queue=binding[0], callback=cb)
104
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
109 105
            self.logger.debug("Binding %s(%s) to queue %s with handler %s" %
110 106
                              (binding[1], binding[2], binding[0], binding[3]))
111 107
            self.clienttags.append(tag)
......
113 109

  
114 110
def _exit_handler(signum, frame):
115 111
    """"Catch exit signal in children processes."""
116
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(),signum)
112
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(), signum)
117 113
    raise SystemExit
118 114

  
119 115

  
......
129 125

  
130 126
    # Cmd line argument parsing
131 127
    (opts, args) = parse_arguments(cmdline)
132
    d = Dispatcher(debug = opts.debug, logger = logger)
128
    disp = Dispatcher(debug = opts.debug, logger = logger)
133 129

  
134 130
    # Start the event loop
135
    d.wait()
131
    disp.wait()
136 132

  
137 133

  
138 134
def parse_arguments(args):
......
140 136

  
141 137
    parser = OptionParser()
142 138
    parser.add_option("-d", "--debug", action="store_true", default=False,
143
                      dest="debug",
144
            help="Enable debug mode")
139
                      dest="debug", help="Enable debug mode")
145 140
    parser.add_option("-l", "--log", dest="log_file",
146
            default=settings.DISPATCHER_LOG_FILE,
147
            metavar="FILE",
148
            help="Write log to FILE instead of %s" %
149
            settings.DISPATCHER_LOG_FILE)
141
                      default=settings.DISPATCHER_LOG_FILE, metavar="FILE",
142
                      help="Write log to FILE instead of %s" %
143
                           settings.DISPATCHER_LOG_FILE)
150 144
    parser.add_option("-c", "--cleanup-queues", action="store_true",
151 145
                      default=False, dest="cleanup_queues",
152
            help="Remove all queues declared in settings.py (DANGEROUS!)")
146
                      help="Remove all queues declared in settings.py (DANGEROUS!)")
153 147
    parser.add_option("-w", "--workers", default=2, dest="workers",
154
            help="Number of workers to spawn", type="int")
148
                      help="Number of workers to spawn", type="int")
155 149
    
156 150
    return parser.parse_args(args)
157 151

  
......
197 191
    logger = logging.getLogger("synnefo.dispatcher")
198 192
    logger.setLevel(lvl)
199 193
    formatter = logging.Formatter(
200
            "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
201
            "%Y-%m-%d %H:%M:%S")
194
        "%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
195
        "%Y-%m-%d %H:%M:%S")
202 196
    handler = logging.FileHandler(opts.log_file)
203 197
    handler.setFormatter(formatter)
204 198
    logger.addHandler(handler)
......
231 225
    signal(SIGTERM, _parent_handler)
232 226

  
233 227
    # Wait for all children process to die, one by one
234
    for c in children:
228
    for pid in children:
235 229
        try:
236
            os.wait()
230
            os.waitpid(pid)
237 231
        except Exception:
238 232
            pass
239 233

  

Also available in: Unified diff