Revision 78e2d194 db/db_controller.py

b/db/db_controller.py
33 33
from synnefo.db.models import VirtualMachine
34 34
from synnefo.logic import utils, backend
35 35

  
36
logger = None
36
class Dispatcher:
37 37

  
38
def update_db(message):
39
    try:
40
        msg = json.loads(message.body)
38
    logger = None
39
    chan = None
40

  
41
    def __init__(self, debug, logger):
42
        self.logger = logger
43
        self._init_queues(debug)
41 44

  
42
        if msg["type"] != "ganeti-op-status":
43
            logging.error("Message is of uknown type %s." % (msg["type"],))
45
    def update_db(self, message):
46
        try:
47
            msg = json.loads(message.body)
48

  
49
            if msg["type"] != "ganeti-op-status":
50
                self.logger.error("Message is of uknown type %s." % (msg["type"],))
51
                return
52

  
53
            vmid = utils.id_from_instance_name(msg["instance"])
54
            vm = VirtualMachine.objects.get(id=vmid)
55

  
56
            self.logger.debug("Processing msg: %s" % (msg,))
57
            backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
58
            self.logger.debug("Done processing msg for vm %s." % (msg["instance"]))
59

  
60
        except KeyError:
61
            self.logger.error("Malformed incoming JSON, missing attributes: " + message.body)
62
        except VirtualMachine.InvalidBackendIdError:
63
            self.logger.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
64
        except VirtualMachine.DoesNotExist:
65
            self.logger.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
66
        except Exception as e:
67
            self.logger.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
44 68
            return
69
        finally:
70
            message.channel.basic_ack(message.delivery_tag)
45 71

  
46
        vmid = utils.id_from_instance_name(msg["instance"])
47
        vm = VirtualMachine.objects.get(id=vmid)
48

  
49
        logging.debug("Processing msg: %s" % (msg,))
50
        backend.process_backend_msg(vm, msg["jobId"], msg["operation"], msg["status"], msg["logmsg"])
51
        logging.debug("Done processing msg for vm %s." % (msg["instance"]))
52

  
53
    except KeyError:
54
        logging.error("Malformed incoming JSON, missing attributes: " + message.body)
55
    except VirtualMachine.InvalidBackendIdError:
56
        logging.debug("Ignoring msg for unknown instance %s." % (msg["instance"],))
57
    except VirtualMachine.DoesNotExist:
58
        logging.error("VM for instance %s with id %d not found in DB." % (msg["instance"], vmid))
59
    except Exception as e:
60
        logging.error("Unexpected error:\n" + "".join(traceback.format_exception(*sys.exc_info())))
61
        return
62
    finally:
72
    def send_email(self, message):
73
        self.logger.debug("Request to send email message")
63 74
        message.channel.basic_ack(message.delivery_tag)
64 75

  
65
def send_email(message):
66
    logger.debug("Request to send email message")
67
    message.channel.basic_ack(message.delivery_tag)
68

  
69
def update_credits(message):
70
    logger.debug("Request to update credits")
71
    message.channel.basic_ack(message.delivery_tag)
72

  
73
def declare_queues(chan):
74
    chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="topic", durable=True, auto_delete=False)
75
    chan.exchange_declare(exchange=settings.EXCHANGE_CRON, type="topic", durable=True, auto_delete=False)
76
    chan.exchange_declare(exchange=settings.EXCHANGE_API, type="topic", durable=True, auto_delete=False)
77

  
78
    chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False)
79
    chan.queue_declare(queue=settings.QUEUE_CRON_CREDITS, durable=True, exclusive=False, auto_delete=False)
80
    chan.queue_declare(queue=settings.QUEUE_API_EMAIL, durable=True, exclusive=False, auto_delete=False)
81
    chan.queue_declare(queue=settings.QUEUE_CRON_EMAIL, durable=True, exclusive=False, auto_delete=False)
82

  
83
def init_devel():
84
    chan = open_channel()
85
    declare_queues(chan)
86
    chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
87
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
88
    return chan
89

  
90
def init():
91
    chan = open_channel()
92
    declare_queues(chan)
93
    chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
94
    chan.basic_consume(queue="events", callback=update_db, consumer_tag="dbupdater")
95
    return chan
96

  
97
def parse_arguments(args):
98
    from optparse import OptionParser
76
    def update_credits(self, message):
77
        self.logger.debug("Request to update credits")
78
        message.channel.basic_ack(message.delivery_tag)
99 79

  
100
    parser = OptionParser()
101
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
102
            help="Enable debug mode")
103
    parser.add_option("-l", "--log", dest="log_file",
104
            default=settings.DISPATCHER_LOG_FILE,
105
            metavar="FILE",
106
            help="Write log to FILE instead of %s" %
107
            settings.DISPATCHER_LOG_FILE)
80
    def wait(self):
81
        while True:
82
            try:
83
                self.chan.wait()
84
            except SystemExit:
85
                break
86

  
87
        self.chan.basic_cancel("dbupdater")
88
        self.chan.close()
89
        self.chan.connection.close()
90

  
91
    def _declare_queues(self):
92
        self.chan.exchange_declare(exchange=settings.EXCHANGE_GANETI, type="direct", durable=True, auto_delete=False)
93
        self.chan.exchange_declare(exchange=settings.EXCHANGE_CRON, type="topic", durable=True, auto_delete=False)
94
        self.chan.exchange_declare(exchange=settings.EXCHANGE_API, type="topic", durable=True, auto_delete=False)
95

  
96
        self.chan.queue_declare(queue=settings.QUEUE_GANETI_EVENTS, durable=True, exclusive=False, auto_delete=False)
97
        self.chan.queue_declare(queue=settings.QUEUE_CRON_CREDITS, durable=True, exclusive=False, auto_delete=False)
98
        self.chan.queue_declare(queue=settings.QUEUE_API_EMAIL, durable=True, exclusive=False, auto_delete=False)
99
        self.chan.queue_declare(queue=settings.QUEUE_CRON_EMAIL, durable=True, exclusive=False, auto_delete=False)
100

  
101
    def _init_queues(self,debug):
102
        self._open_channel()
103
        if debug:
104
            self._init_devel()
105
        else:
106
            self._init()
107

  
108
    def _init_devel(self):
109
        self._declare_queues()
110
        self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
111
        self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater")
112

  
113
    def _init(self):
114
        self._declare_queues()
115
        self.chan.queue_bind(queue=settings.QUEUE_GANETI_EVENTS, exchange=settings.EXCHANGE_GANETI, routing_key="event.*")
116
        self.chan.basic_consume(queue="events", callback=self.update_db, consumer_tag="dbupdater")
117

  
118
    def _open_channel(self):
119
        conn = None
120
        while conn == None:
121
            self.logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
122
            try:
123
                conn = amqp.Connection( host=settings.RABBIT_HOST,
124
                                    userid=settings.RABBIT_USERNAME,
125
                                    password=settings.RABBIT_PASSWORD,
126
                                    virtual_host=settings.RABBIT_VHOST)
127
            except socket.error:
128
                time.sleep(1)
129
                pass
108 130

  
109
    return parser.parse_args(args)
131
        self.logger.info("Connection succesful, opening channel")
132
        self.chan = conn.channel()
110 133

  
111 134
def exit_handler(signum, frame):
112 135
    global handler_logger
......
114 137
    handler_logger.info("Caught fatal signal %d, will raise SystemExit", signum)
115 138
    raise SystemExit
116 139

  
117
def init_queues(debug):
118
    chan = None
119
    if debug:
120
        chan = init_devel()
121
    else:
122
        chan = init()
123
    return chan
124

  
125
def open_channel():
126
    conn = None
127
    while conn == None:
128
        logger.info("Attempting to connect to %s", settings.RABBIT_HOST)
129
        try:
130
            conn = amqp.Connection( host=settings.RABBIT_HOST,
131
                                    userid=settings.RABBIT_USERNAME,
132
                                    password=settings.RABBIT_PASSWORD,
133
                                    virtual_host=settings.RABBIT_VHOST)
134
        except socket.error:
135
            time.sleep(1)
136
            pass
137

  
138
    logger.info("Connection succesful, opening channel")
139
    return conn.channel()
140

  
141
def main():
142
    global logger
143
    (opts, args) = parse_arguments(sys.argv[1:])
140
def child(cmdline):
141
    #Cmd line argument parsing
142
    (opts, args) = parse_arguments(cmdline)
144 143

  
145 144
    # Initialize logger
146 145
    lvl = logging.DEBUG if opts.debug else logging.INFO
......
152 151
    handler.setFormatter(formatter)
153 152
    logger.addHandler(handler)
154 153

  
155
    #Init the queues
156
    chan = init_queues(opts.debug)
154
    d = Dispatcher(debug = True, logger = logger)
155

  
156
    d.wait()
157

  
158
def parse_arguments(args):
159
    from optparse import OptionParser
160

  
161
    parser = OptionParser()
162
    parser.add_option("-d", "--debug", action="store_true", dest="debug",
163
            help="Enable debug mode")
164
    parser.add_option("-l", "--log", dest="log_file",
165
            default=settings.DISPATCHER_LOG_FILE,
166
            metavar="FILE",
167
            help="Write log to FILE instead of %s" %
168
            settings.DISPATCHER_LOG_FILE)
169

  
170
    return parser.parse_args(args)
171

  
172
def main():
173
    global logger
174
    (opts, args) = parse_arguments(sys.argv[1:])
175

  
176
    #newpid = os.fork()
177
    #if newpid == 0:
178
    child(sys.argv[1:])
179
    #else:
180
    #    pids = (os.getpid(), newpid)
181
    #    print "parent: %d, child: %d" % pids
157 182

  
158 183
    # Become a daemon:
159 184
    # Redirect stdout and stderr to handler.stream to catch
160 185
    # early errors in the daemonization process [e.g., pidfile creation]
161 186
    # which will otherwise go to /dev/null.
162
    daemon_context = daemon.DaemonContext(
163
            umask=022,
164
            stdout=handler.stream,
165
            stderr=handler.stream,
166
            files_preserve=[handler.stream])
167
    daemon_context.open()
168
    logger.info("Became a daemon")
187
    #daemon_context = daemon.DaemonContext(
188
    #        umask=022,
189
    #        stdout=handler.stream,
190
    #        stderr=handler.stream,
191
    #        files_preserve=[handler.stream])
192
    #daemon_context.open()
193
    #logger.info("Became a daemon")
169 194
    
170 195
    # Catch signals to ensure graceful shutdown
171
    signal(SIGINT, exit_handler)
172
    signal(SIGTERM, exit_handler)
173
    signal(SIGKILL, exit_handler)
174

  
175
    while True:
176
        try:
177
            chan.wait()
178
        except SystemExit:
179
            break
180

  
181
    chan.basic_cancel("dbupdater")
182
    chan.close()
183
    chan.connection.close()
196
    #signal(SIGINT, exit_handler)
197
    #signal(SIGTERM, exit_handler)
198
    #signal(SIGKILL, exit_handler)
184 199

  
185 200
if __name__ == "__main__":
186 201
    logging.basicConfig(level=logging.DEBUG)

Also available in: Unified diff