Revision 00b01ca9

b/.gitignore
1
*.db
2
*.pyc
3
*~
4
bin/
5
share/
6
build/
7
include/
8
*.pt.py
9
*.installed.cfg
10
*.sqlite
11
.Python
12
.idea
13
.DS_Store
14
selenium-server-standalone-2.0b2.jar
15
.project
16
.pydevproject
17
.settings/
18
settings.d/*-local.conf
19
*.egg-info
20
dist
21
_build
22

  
23
# version modules created automatically from setup.py
24
*/synnefo/versions/*.py
25
!*/synnefo/versions/__init__.py
26
snf-okeanos-site/okeanos_site/version.py
b/kalamari.py
1
#!/usr/bin/env python
2
# Copyright 2012 GRNET S.A. All rights reserved.
3
#
4
# Redistribution and use in source and binary forms, with or without
5
# modification, are permitted provided that the following conditions
6
# are met:
7
#
8
#   1. Redistributions of source code must retain the above copyright
9
#      notice, this list of conditions and the following disclaimer.
10
#
11
#  2. Redistributions in binary form must reproduce the above copyright
12
#     notice, this list of conditions and the following disclaimer in the
13
#     documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
#
27
# The views and conclusions contained in the software and documentation are
28
# those of the authors and should not be interpreted as representing official
29
# policies, either expressed or implied, of GRNET S.A.
30

  
31

  
32
""" Transfer messages from an AMQP queue to a MongoDB collection.
33

  
34
Kalamari 
35

  
36
"""
37
from django.core.management import setup_environ
38

  
39
import sys
40
import os
41
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
42
sys.path.append(path)
43

  
44
from synnefo import settings
45
setup_environ(settings)
46

  
47
from amqplib import client_0_8 as amqp
48
from signal import signal, SIGINT, SIGTERM
49

  
50
import logging
51
import time
52
import socket
53
from daemon import daemon
54

  
55
__author__ = 'Georgios Gousios <gousiosg@gmail.com>'
56

  
57

  
58
# Take care of differences between python-daemon versions.
59
try:
60
    from daemon import pidfile
61
except:
62
    from daemon import pidlockfile
63

  
64
from synnefo.logic import callbacks
65
from synnefo.util.dictconfig import dictConfig
66

  
67

  
68
log = logging.getLogger()
69

  
70

  
71
# Queue names
72
QUEUES = []
73

  
74
# Queue bindings to exchanges
75
BINDINGS = []
76

  
77

  
78
class Dispatcher:
79
    chan = None
80
    debug = False
81
    clienttags = []
82

  
83
    def __init__(self, debug=False):
84
        self.debug = debug
85
        self._init()
86

  
87
    def wait(self):
88
        while True:
89
            try:
90
                self.chan.wait()
91
            except SystemExit:
92
                break
93
            except amqp.exceptions.AMQPConnectionException:
94
                log.error("Server went away, reconnecting...")
95
                self._init()
96
            except socket.error:
97
                log.error("Server went away, reconnecting...")
98
                self._init()
99
            except Exception, e:
100
                log.exception("Caught unexpected exception")
101

  
102
        [self.chan.basic_cancel(clienttag) for clienttag in self.clienttags]
103
        self.chan.connection.close()
104
        self.chan.close()
105

  
106
    def _init(self):
107
        global QUEUES, BINDINGS
108
        log.info("Initializing")
109

  
110
        # Connect to RabbitMQ
111
        conn = None
112
        while conn == None:
113
            log.info("Attempting to connect to %s", settings.RABBIT_HOST)
114
            try:
115
                conn = amqp.Connection(host=settings.RABBIT_HOST,
116
                                       userid=settings.RABBIT_USERNAME,
117
                                       password=settings.RABBIT_PASSWORD,
118
                                       virtual_host=settings.RABBIT_VHOST)
119
            except socket.error:
120
                log.error("Failed to connect to %s, retrying in 10s",
121
                                  settings.RABBIT_HOST)
122
                time.sleep(10)
123

  
124
        log.info("Connection succesful, opening channel")
125
        self.chan = conn.channel()
126

  
127
        # Declare queues and exchanges
128
        for exchange in settings.EXCHANGES:
129
            self.chan.exchange_declare(exchange=exchange, type="topic",
130
                                       durable=True, auto_delete=False)
131

  
132
        for queue in QUEUES:
133
            self.chan.queue_declare(queue=queue, durable=True,
134
                                    exclusive=False, auto_delete=False)
135

  
136
        bindings = BINDINGS
137

  
138
        # Bind queues to handler methods
139
        for binding in bindings:
140
            try:
141
                callback = getattr(callbacks, binding[3])
142
            except AttributeError:
143
                log.error("Cannot find callback %s", binding[3])
144
                raise SystemExit(1)
145

  
146
            self.chan.queue_bind(queue=binding[0], exchange=binding[1],
147
                                 routing_key=binding[2])
148
            tag = self.chan.basic_consume(queue=binding[0], callback=callback)
149
            log.debug("Binding %s(%s) to queue %s with handler %s",
150
                              binding[1], binding[2], binding[0], binding[3])
151
            self.clienttags.append(tag)
152

  
153

  
154
def _init_queues():
155
    global QUEUES, BINDINGS
156

  
157
    # Queue declarations
158
    prefix = settings.BACKEND_PREFIX_ID.split('-')[0]
159

  
160
    QUEUE_GANETI_EVENTS_OP = "%s-events-op" % prefix
161
    QUEUE_GANETI_EVENTS_NET = "%s-events-net" % prefix
162
    QUEUE_GANETI_BUILD_PROGR = "%s-events-progress" % prefix
163
    QUEUE_CRON_CREDITS = "%s-credits" % prefix
164
    QUEUE_EMAIL = "%s-email" % prefix
165
    QUEUE_RECONC = "%s-reconciliation" % prefix
166
    if settings.DEBUG is True:
167
        QUEUE_DEBUG = "debug"       # Debug queue, retrieves all messages
168

  
169
    QUEUES = (QUEUE_GANETI_EVENTS_OP, QUEUE_GANETI_EVENTS_NET,
170
              QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_RECONC,
171
              QUEUE_GANETI_BUILD_PROGR)
172

  
173
    # notifications of type "ganeti-op-status"
174
    DB_HANDLER_KEY_OP = 'ganeti.%s.event.op' % prefix
175
    # notifications of type "ganeti-net-status"
176
    DB_HANDLER_KEY_NET = 'ganeti.%s.event.net' % prefix
177
    # notifications of type "ganeti-create-progress"
178
    BUILD_MONITOR_HANDLER = 'ganeti.%s.event.progress' % prefix
179
    # email
180
    EMAIL_HANDLER = 'logic.%s.email.*' % prefix
181
    # reconciliation
182
    RECONC_HANDLER = 'reconciliation.%s.*' % prefix
183

  
184
    BINDINGS = [
185
    # Queue                   # Exchange                # RouteKey              # Handler
186
    (QUEUE_GANETI_EVENTS_OP,  settings.EXCHANGE_GANETI, DB_HANDLER_KEY_OP,      'update_db'),
187
    (QUEUE_GANETI_EVENTS_NET, settings.EXCHANGE_GANETI, DB_HANDLER_KEY_NET,     'update_net'),
188
    (QUEUE_GANETI_BUILD_PROGR,settings.EXCHANGE_GANETI, BUILD_MONITOR_HANDLER,  'update_build_progress'),
189
    (QUEUE_CRON_CREDITS,      settings.EXCHANGE_CRON,   '*.credits.*',          'update_credits'),
190
    (QUEUE_EMAIL,             settings.EXCHANGE_API,    EMAIL_HANDLER,          'send_email'),
191
    (QUEUE_EMAIL,             settings.EXCHANGE_CRON,   EMAIL_HANDLER,          'send_email'),
192
    (QUEUE_RECONC,            settings.EXCHANGE_CRON,   RECONC_HANDLER,         'trigger_status_update'),
193
    ]
194

  
195
    if settings.DEBUG is True:
196
        BINDINGS += [
197
            # Queue       # Exchange          # RouteKey  # Handler
198
            (QUEUE_DEBUG, settings.EXCHANGE_GANETI, '#',  'dummy_proc'),
199
            (QUEUE_DEBUG, settings.EXCHANGE_CRON,   '#',  'dummy_proc'),
200
            (QUEUE_DEBUG, settings.EXCHANGE_API,    '#',  'dummy_proc'),
201
        ]
202
        QUEUES += (QUEUE_DEBUG,)
203

  
204

  
205
def _exit_handler(signum, frame):
206
    """"Catch exit signal in children processes"""
207
    log.info("Caught signal %d, will raise SystemExit", signum)
208
    raise SystemExit
209

  
210

  
211
def _parent_handler(signum, frame):
212
    """"Catch exit signal in parent process and forward it to children."""
213
    global children
214
    log.info("Caught signal %d, sending SIGTERM to children %s",
215
                signum, children)
216
    [os.kill(pid, SIGTERM) for pid in children]
217

  
218

  
219
def child(cmdline):
220
    """The context of the child process"""
221

  
222
    # Cmd line argument parsing
223
    (opts, args) = parse_arguments(cmdline)
224
    disp = Dispatcher(debug=opts.debug)
225

  
226
    # Start the event loop
227
    disp.wait()
228

  
229

  
230
def parse_arguments(args):
231
    from optparse import OptionParser
232

  
233
    default_pid_file = os.path.join("var","run","synnefo","dispatcher.pid")
234
    parser = OptionParser()
235
    parser.add_option("-d", "--debug", action="store_true", default=False,
236
                      dest="debug", help="Enable debug mode")
237
    parser.add_option("-w", "--workers", default=2, dest="workers",
238
                      help="Number of workers to spawn", type="int")
239
    parser.add_option("-p", "--pid-file", dest="pid_file",
240
                      default=default_pid_file,
241
                      help="Save PID to file (default: %s)" % default_pid_file)
242
    parser.add_option("--purge-queues", action="store_true",
243
                      default=False, dest="purge_queues",
244
                      help="Remove all declared queues (DANGEROUS!)")
245
    parser.add_option("--purge-exchanges", action="store_true",
246
                      default=False, dest="purge_exchanges",
247
                      help="Remove all exchanges. Implies deleting all queues \
248
                           first (DANGEROUS!)")
249
    parser.add_option("--drain-queue", dest="drain_queue",
250
                      help="Strips a queue from all outstanding messages")
251

  
252
    return parser.parse_args(args)
253

  
254

  
255
def purge_queues():
256
    """
257
        Delete declared queues from RabbitMQ. Use with care!
258
    """
259
    global QUEUES, BINDINGS
260
    conn = get_connection()
261
    chan = conn.channel()
262

  
263
    print "Queues to be deleted: ", QUEUES
264

  
265
    if not get_user_confirmation():
266
        return
267

  
268
    for queue in QUEUES:
269
        try:
270
            chan.queue_delete(queue=queue)
271
            print "Deleting queue %s" % queue
272
        except amqp.exceptions.AMQPChannelException as e:
273
            print e.amqp_reply_code, " ", e.amqp_reply_text
274
            chan = conn.channel()
275

  
276
    chan.connection.close()
277

  
278

  
279
def purge_exchanges():
280
    """Delete declared exchanges from RabbitMQ, after removing all queues"""
281
    global QUEUES, BINDINGS
282
    purge_queues()
283

  
284
    conn = get_connection()
285
    chan = conn.channel()
286

  
287
    print "Exchanges to be deleted: ", settings.EXCHANGES
288

  
289
    if not get_user_confirmation():
290
        return
291

  
292
    for exchange in settings.EXCHANGES:
293
        try:
294
            chan.exchange_delete(exchange=exchange)
295
        except amqp.exceptions.AMQPChannelException as e:
296
            print e.amqp_reply_code, " ", e.amqp_reply_text
297

  
298
    chan.connection.close()
299

  
300

  
301
def drain_queue(queue):
302
    """Strip a (declared) queue from all outstanding messages"""
303
    global QUEUES, BINDINGS
304
    if not queue:
305
        return
306

  
307
    if not queue in QUEUES:
308
        print "Queue %s not configured" % queue
309
        return
310

  
311
    print "Queue to be drained: %s" % queue
312

  
313
    if not get_user_confirmation():
314
        return
315
    conn = get_connection()
316
    chan = conn.channel()
317

  
318
    # Register a temporary queue binding
319
    for binding in BINDINGS:
320
        if binding[0] == queue:
321
            exch = binding[1]
322

  
323
    if not exch:
324
        print "Queue not bound to any exchange: %s" % queue
325
        return
326

  
327
    chan.queue_bind(queue=queue, exchange=exch, routing_key='#')
328
    tag = chan.basic_consume(queue=queue, callback=callbacks.dummy_proc)
329

  
330
    print "Queue draining about to start, hit Ctrl+c when done"
331
    time.sleep(2)
332
    print "Queue draining starting"
333

  
334
    signal(SIGTERM, _exit_handler)
335
    signal(SIGINT, _exit_handler)
336

  
337
    num_processed = 0
338
    while True:
339
        chan.wait()
340
        num_processed += 1
341
        sys.stderr.write("Ignored %d messages\r" % num_processed)
342

  
343
    chan.basic_cancel(tag)
344
    chan.connection.close()
345

  
346

  
347
def get_connection():
348
    conn = amqp.Connection(host=settings.RABBIT_HOST,
349
                           userid=settings.RABBIT_USERNAME,
350
                           password=settings.RABBIT_PASSWORD,
351
                           virtual_host=settings.RABBIT_VHOST)
352
    return conn
353

  
354

  
355
def get_user_confirmation():
356
    ans = raw_input("Are you sure (N/y):")
357

  
358
    if not ans:
359
        return False
360
    if ans not in ['Y', 'y']:
361
        return False
362
    return True
363

  
364

  
365
def debug_mode():
366
    disp = Dispatcher(debug=True)
367
    signal(SIGINT, _exit_handler)
368
    signal(SIGTERM, _exit_handler)
369

  
370
    disp.wait()
371

  
372

  
373
def daemon_mode(opts):
374
    global children
375

  
376
    # Create pidfile,
377
    # take care of differences between python-daemon versions
378
    try:
379
        pidf = pidfile.TimeoutPIDLockFile(opts.pid_file, 10)
380
    except:
381
        pidf = pidlockfile.TimeoutPIDLockFile(opts.pid_file, 10)
382

  
383
    pidf.acquire()
384

  
385
    log.info("Became a daemon")
386

  
387
    # Fork workers
388
    children = []
389

  
390
    i = 0
391
    while i < opts.workers:
392
        newpid = os.fork()
393

  
394
        if newpid == 0:
395
            signal(SIGINT, _exit_handler)
396
            signal(SIGTERM, _exit_handler)
397
            child(sys.argv[1:])
398
            sys.exit(1)
399
        else:
400
            log.debug("%d, forked child: %d", os.getpid(), newpid)
401
            children.append(newpid)
402
        i += 1
403

  
404
    # Catch signals to ensure graceful shutdown
405
    signal(SIGINT, _parent_handler)
406
    signal(SIGTERM, _parent_handler)
407

  
408
    # Wait for all children processes to die, one by one
409
    try:
410
        for pid in children:
411
            try:
412
                os.waitpid(pid, 0)
413
            except Exception:
414
                pass
415
    finally:
416
        pidf.release()
417

  
418

  
419
def main():
420
    dictConfig(settings.DISPATCHER_LOGGING)
421

  
422
    global log
423

  
424
    (opts, args) = parse_arguments(sys.argv[1:])
425

  
426
    # Init the global variables containing the queues
427
    _init_queues()
428

  
429
    # Special case for the clean up queues action
430
    if opts.purge_queues:
431
        purge_queues()
432
        return
433

  
434
    # Special case for the clean up exch action
435
    if opts.purge_exchanges:
436
        purge_exchanges()
437
        return
438

  
439
    if opts.drain_queue:
440
        drain_queue(opts.drain_queue)
441
        return
442

  
443
    # Debug mode, process messages without spawning workers
444
    if opts.debug:
445
        debug_mode()
446
        return
447

  
448
    files_preserve = []
449
    for handler in log.handlers:
450
        stream = getattr(handler, 'stream')
451
        if stream and hasattr(stream, 'fileno'):
452
            files_preserve.append(handler.stream)
453

  
454
    daemon_context = daemon.DaemonContext(
455
        files_preserve=files_preserve,
456
        umask=022)
457

  
458
    daemon_context.open()
459

  
460
    # Catch every exception, make sure it gets logged properly
461
    try:
462
        daemon_mode(opts)
463
    except Exception:
464
        log.exception("Unknown error")
465
        raise
466

  
467

  
468
if __name__ == "__main__":
469
    sys.exit(main())
470

  
471
# vim: set sta sts=4 shiftwidth=4 sw=4 et ai :

Also available in: Unified diff