Revision 8861126f

b/logic/dispatcher.py
1 1
#!/usr/bin/env python
2 2
#
3
# Copyright (c) 2010 Greek Research and Technology Network
3
# Copyright (c) 201! Greek Research and Technology Network
4 4
#
5
"""Receive Ganeti events over RabbitMQ, update VM state in DB.
5
"""Connect to a queue 
6 6

  
7 7
This daemon receives job notifications from ganeti-amqpd
8 8
and updates VM state in the DB accordingly.
......
21 21

  
22 22
from amqplib import client_0_8 as amqp
23 23

  
24
import daemon
25
from signal import signal, SIGINT, SIGTERM, SIGKILL
24
from signal import signal, SIGINT, SIGTERM
26 25

  
27 26
import logging
28 27
import time
......
30 29

  
31 30
from synnefo.logic import dispatcher_callbacks
32 31

  
32
#List of worker ids
33
global children
34

  
33 35
class Dispatcher:
34 36

  
35 37
    logger = None
......
56 58
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
57 59
        self.chan.close()
58 60
        self.chan.connection.close()
61
        sys.exit()
59 62

  
60 63
    def _init(self):
61 64
        conn = None
......
101 104
            self.clienttags.append(tag)
102 105

  
103 106
def exit_handler(signum, frame):
104
    global handler_logger
105

  
106
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
107
    print "%d: Caught signal %d, will raise SystemExit" % (os.getpid(),signum)
107 108
    raise SystemExit
108 109

  
109
def child(cmdline):
110
    global logger
110
def parent_handler(signum, frame):
111
    global children
112
    print "Caught signal %d, sending kill signal to children" % signum
113
    [os.kill(pid, SIGTERM) for pid in children]
114

  
115
def child(cmdline, logger):
111 116
    #Cmd line argument parsing
112 117
    (opts, args) = parse_arguments(cmdline)
113

  
114
    # Initialize logger
115
    lvl = logging.DEBUG if opts.debug else logging.INFO
116
    logger = logging.getLogger("synnefo.dispatcher")
117
    logger.setLevel(lvl)
118
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
119
            "%Y-%m-%d %H:%M:%S")
120
    handler = logging.FileHandler(opts.log_file)
121
    handler.setFormatter(formatter)
122
    logger.addHandler(handler)
123

  
124 118
    d = Dispatcher(debug = opts.debug, logger = logger)
125 119

  
126 120
    d.wait()
......
138 132
            settings.DISPATCHER_LOG_FILE)
139 133
    parser.add_option("-c", "--cleanup-queues", action="store_true", default=False, dest="cleanup_queues",
140 134
            help="Remove from RabbitMQ all queues declared in settings.py (DANGEROUS!)")
135
    parser.add_option("-w", "--workers", default=1, dest="workers",
136
            help="Number of workers to spawn")
141 137
    
142 138
    return parser.parse_args(args)
143 139

  
......
169 165
            chan.queue_delete(queue=queue)
170 166
        except amqp.exceptions.AMQPChannelException as e:
171 167
            print e.amqp_reply_code, " ", e.amqp_reply_text
168
    chan.close()
169
    chan.connection.close()
172 170

  
173 171
def main():
172
    global children, logger
174 173
    (opts, args) = parse_arguments(sys.argv[1:])
175 174

  
175
    # Initialize logger
176
    lvl = logging.DEBUG if opts.debug else logging.INFO
177
    logger = logging.getLogger("synnefo.dispatcher")
178
    logger.setLevel(lvl)
179
    formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
180
            "%Y-%m-%d %H:%M:%S")
181
    handler = logging.FileHandler(opts.log_file)
182
    handler.setFormatter(formatter)
183
    logger.addHandler(handler)
184

  
185
    #Special case for the clean up queues action
176 186
    if opts.cleanup_queues:
177 187
        cleanup_queues()
178 188
        return
179 189

  
180
    #newpid = os.fork()
181
    #if newpid == 0:
182
    child(sys.argv[1:])
183
    #else:
184
    #    pids = (os.getpid(), newpid)
185
    #    print "parent: %d, child: %d" % pids
186

  
187
    # Become a daemon:
188
    # Redirect stdout and stderr to handler.stream to catch
189
    # early errors in the daemonization process [e.g., pidfile creation]
190
    # which will otherwise go to /dev/null.
191
    #daemon_context = daemon.DaemonContext(
192
    #        umask=022,
193
    #        stdout=handler.stream,
194
    #        stderr=handler.stream,
195
    #        files_preserve=[handler.stream])
196
    #daemon_context.open()
197
    #logger.info("Became a daemon")
198
    
190
    #Fork workers
191
    children = []
192

  
193
    i = 0
194
    while i < opts.workers:
195
        newpid = os.fork()
196

  
197
        if newpid == 0:
198
            signal(SIGINT, exit_handler)
199
            signal(SIGTERM, exit_handler)
200
            #child(sys.argv[1:], logger)
201
            time.sleep(5)
202
            sys.exit(0)
203
        else:
204
            pids = (os.getpid(), newpid)
205
            logger.debug("%d, forked child: %d" % pids)
206
            children.append(pids[1])
207
        i += 1
208

  
199 209
    # Catch signals to ensure graceful shutdown
200
    #signal(SIGINT, exit_handler)
201
    #signal(SIGTERM, exit_handler)
202
    #signal(SIGKILL, exit_handler)
210
    signal(SIGINT,  parent_handler)
211
    signal(SIGTERM, parent_handler)
212

  
213
    try:
214
        os.wait()
215
    except Exception :
216
        pass
203 217

  
204 218
if __name__ == "__main__":
205 219
    logging.basicConfig(level=logging.DEBUG)

Also available in: Unified diff