Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / management / commands / queue-retry.py @ 9e20fcee

History | View | Annotate | Download (4 kB)

1 67619fd7 Christos Stavrakakis
# Copyright 2011-2012 GRNET S.A. All rights reserved.
2 67619fd7 Christos Stavrakakis
#
3 67619fd7 Christos Stavrakakis
# Redistribution and use in source and binary forms, with or without
4 67619fd7 Christos Stavrakakis
# modification, are permitted provided that the following conditions
5 67619fd7 Christos Stavrakakis
# are met:
6 67619fd7 Christos Stavrakakis
#
7 67619fd7 Christos Stavrakakis
#   1. Redistributions of source code must retain the above copyright
8 67619fd7 Christos Stavrakakis
#      notice, this list of conditions and the following disclaimer.
9 67619fd7 Christos Stavrakakis
#
10 67619fd7 Christos Stavrakakis
#  2. Redistributions in binary form must reproduce the above copyright
11 67619fd7 Christos Stavrakakis
#     notice, this list of conditions and the following disclaimer in the
12 67619fd7 Christos Stavrakakis
#     documentation and/or other materials provided with the distribution.
13 67619fd7 Christos Stavrakakis
#
14 67619fd7 Christos Stavrakakis
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 67619fd7 Christos Stavrakakis
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 67619fd7 Christos Stavrakakis
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 67619fd7 Christos Stavrakakis
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 67619fd7 Christos Stavrakakis
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 67619fd7 Christos Stavrakakis
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 67619fd7 Christos Stavrakakis
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 67619fd7 Christos Stavrakakis
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 67619fd7 Christos Stavrakakis
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 67619fd7 Christos Stavrakakis
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 67619fd7 Christos Stavrakakis
# SUCH DAMAGE.
25 67619fd7 Christos Stavrakakis
#
26 67619fd7 Christos Stavrakakis
# The views and conclusions contained in the software and documentation are
27 67619fd7 Christos Stavrakakis
# those of the authors and should not be interpreted as representing official
28 67619fd7 Christos Stavrakakis
# policies, either expressed or implied, of GRNET S.A.
29 67619fd7 Christos Stavrakakis
#
30 67619fd7 Christos Stavrakakis
from django.core.management.base import BaseCommand
31 67619fd7 Christos Stavrakakis
from optparse import make_option
32 67619fd7 Christos Stavrakakis
33 67619fd7 Christos Stavrakakis
from synnefo.lib.amqp import AMQPClient
34 67619fd7 Christos Stavrakakis
35 67619fd7 Christos Stavrakakis
from synnefo.logic import queues
36 67619fd7 Christos Stavrakakis
37 67619fd7 Christos Stavrakakis
import json
38 67619fd7 Christos Stavrakakis
import logging
39 67619fd7 Christos Stavrakakis
log = logging.getLogger("")
40 67619fd7 Christos Stavrakakis
41 67619fd7 Christos Stavrakakis
42 67619fd7 Christos Stavrakakis
class Command(BaseCommand):
43 67619fd7 Christos Stavrakakis
    help = "Resend messages from dead letter queues to original exchange"""
44 67619fd7 Christos Stavrakakis
45 67619fd7 Christos Stavrakakis
    option_list = BaseCommand.option_list + (
46 cc92b70f Christos Stavrakakis
        make_option(
47 cc92b70f Christos Stavrakakis
            '--keep-zombies',
48 cc92b70f Christos Stavrakakis
            action='store_true',
49 cc92b70f Christos Stavrakakis
            dest='keep_zombies',
50 cc92b70f Christos Stavrakakis
            default=False,
51 cc92b70f Christos Stavrakakis
            help="Do not remove messages that died more than one times"),
52 cc92b70f Christos Stavrakakis
    )
53 67619fd7 Christos Stavrakakis
54 67619fd7 Christos Stavrakakis
    def handle(self, *args, **options):
55 67619fd7 Christos Stavrakakis
        verbose = (options["verbosity"] == "2")
56 67619fd7 Christos Stavrakakis
        self.keep_zombies = options["keep_zombies"]
57 67619fd7 Christos Stavrakakis
        log_level = logging.DEBUG if verbose else logging.WARNING
58 67619fd7 Christos Stavrakakis
        log.setLevel(log_level)
59 67619fd7 Christos Stavrakakis
60 67619fd7 Christos Stavrakakis
        client = AMQPClient(confirms=False)
61 67619fd7 Christos Stavrakakis
        client.connect()
62 67619fd7 Christos Stavrakakis
63 67619fd7 Christos Stavrakakis
        self.client = client
64 67619fd7 Christos Stavrakakis
65 67619fd7 Christos Stavrakakis
        for queue in queues.QUEUES:
66 67619fd7 Christos Stavrakakis
            dead_queue = queues.convert_queue_to_dead(queue)
67 67619fd7 Christos Stavrakakis
            while 1:
68 67619fd7 Christos Stavrakakis
                message = client.basic_get(dead_queue)
69 67619fd7 Christos Stavrakakis
                if not message:
70 67619fd7 Christos Stavrakakis
                    break
71 67619fd7 Christos Stavrakakis
                log.debug("Received message %s", message)
72 67619fd7 Christos Stavrakakis
                self.handle_message(message)
73 67619fd7 Christos Stavrakakis
        client.close()
74 67619fd7 Christos Stavrakakis
        return 0
75 67619fd7 Christos Stavrakakis
76 67619fd7 Christos Stavrakakis
    def handle_message(self, message):
77 67619fd7 Christos Stavrakakis
        try:
78 67619fd7 Christos Stavrakakis
            body = message['body']
79 67619fd7 Christos Stavrakakis
        except KeyError:
80 67619fd7 Christos Stavrakakis
            log.warning("Received message without body: %s", message)
81 67619fd7 Christos Stavrakakis
            return
82 67619fd7 Christos Stavrakakis
83 67619fd7 Christos Stavrakakis
        try:
84 67619fd7 Christos Stavrakakis
            body = json.loads(body)
85 67619fd7 Christos Stavrakakis
        except ValueError:
86 67619fd7 Christos Stavrakakis
            log.error("Removed malformed message with body %s", body)
87 67619fd7 Christos Stavrakakis
            self.client.basic_nack(message)
88 67619fd7 Christos Stavrakakis
            return
89 67619fd7 Christos Stavrakakis
90 67619fd7 Christos Stavrakakis
        if "from-dead-letter" in message['headers']:
91 67619fd7 Christos Stavrakakis
            if not self.keep_zombies:
92 67619fd7 Christos Stavrakakis
                log.info("Removed message that died twice %s", body)
93 67619fd7 Christos Stavrakakis
                self.client.basic_nack(message)
94 67619fd7 Christos Stavrakakis
                return
95 67619fd7 Christos Stavrakakis
96 67619fd7 Christos Stavrakakis
        try:
97 67619fd7 Christos Stavrakakis
            headers = message['headers']
98 67619fd7 Christos Stavrakakis
            death = headers['x-death'][0]
99 67619fd7 Christos Stavrakakis
        except KeyError:
100 cc92b70f Christos Stavrakakis
            log.warning("Received message without death section %s."
101 cc92b70f Christos Stavrakakis
                        "Removing..",
102 67619fd7 Christos Stavrakakis
                        message)
103 67619fd7 Christos Stavrakakis
            self.client.basic_nack(message)
104 67619fd7 Christos Stavrakakis
105 67619fd7 Christos Stavrakakis
        # Get Routing Info
106 67619fd7 Christos Stavrakakis
        exchange = death['exchange']
107 67619fd7 Christos Stavrakakis
        routing_key = death['routing-keys'][0]
108 67619fd7 Christos Stavrakakis
109 67619fd7 Christos Stavrakakis
        # Add after Death
110 67619fd7 Christos Stavrakakis
        body = json.dumps(body)
111 67619fd7 Christos Stavrakakis
112 67619fd7 Christos Stavrakakis
        self.client.basic_publish(exchange, routing_key, body,
113 67619fd7 Christos Stavrakakis
                                  headers={"from-dead-letter": True})
114 67619fd7 Christos Stavrakakis
        self.client.basic_ack(message)