Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 7b438672

History | View | Annotate | Download (12.4 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 86f046a8 Giorgos Verigakis
import logging
34 23c84263 Georgios Gousios
import json
35 c4e55622 Christos Stavrakakis
from functools import wraps
36 23c84263 Georgios Gousios
37 36d450e8 Christos Stavrakakis
from django.db import transaction
38 727fb2f9 Christos Stavrakakis
from synnefo.db.models import (Backend, VirtualMachine, Network,
39 727fb2f9 Christos Stavrakakis
                               BackendNetwork, pooled_rapi_client)
40 ee995ae2 Christos Stavrakakis
from synnefo.logic import utils, backend as backend_mod, rapi
41 23c84263 Georgios Gousios
42 c4e55622 Christos Stavrakakis
from synnefo.lib.utils import merge_time
43 c4e55622 Christos Stavrakakis
44 fdfd8c6d Christos Stavrakakis
log = logging.getLogger(__name__)
45 23c84263 Georgios Gousios
46 cc92b70f Christos Stavrakakis
47 a17a8e98 Christos Stavrakakis
def handle_message_delivery(func):
48 a17a8e98 Christos Stavrakakis
    """ Generic decorator for handling messages.
49 c4e55622 Christos Stavrakakis

50 a17a8e98 Christos Stavrakakis
    This decorator is responsible for converting the message into json format,
51 a17a8e98 Christos Stavrakakis
    handling of common exceptions and acknowledment of message if needed.
52 c4e55622 Christos Stavrakakis

53 c4e55622 Christos Stavrakakis
    """
54 c4e55622 Christos Stavrakakis
    @wraps(func)
55 c4e55622 Christos Stavrakakis
    def wrapper(client, message, *args, **kwargs):
56 c4e55622 Christos Stavrakakis
        try:
57 e6f6627c Christos Stavrakakis
            msg = None
58 c4e55622 Christos Stavrakakis
            msg = json.loads(message['body'])
59 a17a8e98 Christos Stavrakakis
            func(msg)
60 a17a8e98 Christos Stavrakakis
            client.basic_ack(message)
61 22ee6892 Christos Stavrakakis
        except ValueError as e:
62 22ee6892 Christos Stavrakakis
            log.error("Incoming message not in JSON format %s: %s", e, message)
63 b9d91e62 Christos Stavrakakis
            client.basic_nack(message)
64 22ee6892 Christos Stavrakakis
        except KeyError as e:
65 22ee6892 Christos Stavrakakis
            log.error("Malformed incoming JSON, missing attribute %s: %s",
66 22ee6892 Christos Stavrakakis
                      e, message)
67 b9d91e62 Christos Stavrakakis
            client.basic_nack(message)
68 a17a8e98 Christos Stavrakakis
        except Exception as e:
69 e6f6627c Christos Stavrakakis
            if msg:
70 e6f6627c Christos Stavrakakis
                log.exception("Unexpected error: %s, msg: %s", e, msg)
71 e6f6627c Christos Stavrakakis
            else:
72 e6f6627c Christos Stavrakakis
                log.exception("Unexpected error: %s", e)
73 b9d91e62 Christos Stavrakakis
            client.basic_reject(message)
74 a17a8e98 Christos Stavrakakis
75 a17a8e98 Christos Stavrakakis
    return wrapper
76 a17a8e98 Christos Stavrakakis
77 22ee6892 Christos Stavrakakis
78 a17a8e98 Christos Stavrakakis
def instance_from_msg(func):
79 a17a8e98 Christos Stavrakakis
    """ Decorator for getting the VirtualMachine object of the msg.
80 a17a8e98 Christos Stavrakakis

81 a17a8e98 Christos Stavrakakis
    """
82 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
83 a17a8e98 Christos Stavrakakis
    @wraps(func)
84 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
85 a17a8e98 Christos Stavrakakis
        try:
86 a17a8e98 Christos Stavrakakis
            vm_id = utils.id_from_instance_name(msg["instance"])
87 7f2dbcad Christos Stavrakakis
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
88 7b438672 Christos Stavrakakis
            if vm.deleted:
89 7b438672 Christos Stavrakakis
                log.debug("Ignoring message for deleted instance '%s'", vm)
90 7b438672 Christos Stavrakakis
                return
91 a17a8e98 Christos Stavrakakis
            func(vm, msg)
92 c4e55622 Christos Stavrakakis
        except VirtualMachine.InvalidBackendIdError:
93 c4e55622 Christos Stavrakakis
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
94 c4e55622 Christos Stavrakakis
        except VirtualMachine.DoesNotExist:
95 c4e55622 Christos Stavrakakis
            log.error("VM for instance %s with id %d not found in DB.",
96 c4e55622 Christos Stavrakakis
                      msg['instance'], vm_id)
97 7b438672 Christos Stavrakakis
        except (Network.InvalidBackendIdError, Network.DoesNotExist):
98 66932293 Christos Stavrakakis
            log.error("Invalid message, can not find network. msg: %s", msg)
99 a17a8e98 Christos Stavrakakis
    return wrapper
100 a17a8e98 Christos Stavrakakis
101 22ee6892 Christos Stavrakakis
102 a17a8e98 Christos Stavrakakis
def network_from_msg(func):
103 22ee6892 Christos Stavrakakis
    """ Decorator for getting the BackendNetwork object of the msg.
104 a17a8e98 Christos Stavrakakis

105 a17a8e98 Christos Stavrakakis
    """
106 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
107 a17a8e98 Christos Stavrakakis
    @wraps(func)
108 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
109 a17a8e98 Christos Stavrakakis
        try:
110 a17a8e98 Christos Stavrakakis
            network_id = utils.id_from_network_name(msg["network"])
111 d2e73c0c Christos Stavrakakis
            network = Network.objects.select_for_update().get(id=network_id)
112 7b438672 Christos Stavrakakis
            if network.deleted:
113 7b438672 Christos Stavrakakis
                log.debug("Ignoring message for deleted network '%s'", network)
114 7b438672 Christos Stavrakakis
                return
115 22ee6892 Christos Stavrakakis
            backend = Backend.objects.get(clustername=msg['cluster'])
116 99af08a4 Christos Stavrakakis
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
117 99af08a4 Christos Stavrakakis
                                                             backend=backend)
118 99af08a4 Christos Stavrakakis
            if new:
119 99af08a4 Christos Stavrakakis
                log.info("Created missing BackendNetwork %s", bnet)
120 99af08a4 Christos Stavrakakis
            func(bnet, msg)
121 a17a8e98 Christos Stavrakakis
        except Network.InvalidBackendIdError:
122 a17a8e98 Christos Stavrakakis
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
123 a17a8e98 Christos Stavrakakis
        except Network.DoesNotExist:
124 22ee6892 Christos Stavrakakis
            log.error("Network %s not found in DB.", msg['network'])
125 22ee6892 Christos Stavrakakis
        except Backend.DoesNotExist:
126 22ee6892 Christos Stavrakakis
            log.error("Backend %s not found in DB.", msg['cluster'])
127 22ee6892 Christos Stavrakakis
        except BackendNetwork.DoesNotExist:
128 22ee6892 Christos Stavrakakis
            log.error("Network %s on backend %s not found in DB.",
129 22ee6892 Christos Stavrakakis
                      msg['network'], msg['cluster'])
130 a17a8e98 Christos Stavrakakis
    return wrapper
131 a17a8e98 Christos Stavrakakis
132 22ee6892 Christos Stavrakakis
133 a17a8e98 Christos Stavrakakis
def if_update_required(func):
134 a17a8e98 Christos Stavrakakis
    """
135 a17a8e98 Christos Stavrakakis
    Decorator for checking if an incoming message needs to update the db.
136 a17a8e98 Christos Stavrakakis

137 a17a8e98 Christos Stavrakakis
    The database will not be updated in the following cases:
138 a17a8e98 Christos Stavrakakis
    - The message has been redelivered and the action has already been
139 a17a8e98 Christos Stavrakakis
      completed. In this case the event_time will be equal with the one
140 a17a8e98 Christos Stavrakakis
      in the database.
141 22ee6892 Christos Stavrakakis
    - The message describes a previous state in the ganeti, from the one that
142 22ee6892 Christos Stavrakakis
      is described in the db. In this case the event_time will be smaller from
143 22ee6892 Christos Stavrakakis
      the one in the database.
144 a17a8e98 Christos Stavrakakis

145 a17a8e98 Christos Stavrakakis
    """
146 a17a8e98 Christos Stavrakakis
    @wraps(func)
147 a17a8e98 Christos Stavrakakis
    def wrapper(target, msg):
148 bb80a8d7 Christos Stavrakakis
        try:
149 bb80a8d7 Christos Stavrakakis
            event_time = merge_time(msg['event_time'])
150 bb80a8d7 Christos Stavrakakis
        except:
151 bb80a8d7 Christos Stavrakakis
            log.error("Received message with malformed time: %s",
152 bb80a8d7 Christos Stavrakakis
                      msg['event_time'])
153 bb80a8d7 Christos Stavrakakis
            raise KeyError
154 bb80a8d7 Christos Stavrakakis
155 a17a8e98 Christos Stavrakakis
        db_time = target.backendtime
156 a17a8e98 Christos Stavrakakis
157 22ee6892 Christos Stavrakakis
        if db_time and event_time <= db_time:
158 a17a8e98 Christos Stavrakakis
            format_ = "%d/%m/%y %H:%M:%S:%f"
159 cc92b70f Christos Stavrakakis
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
160 cc92b70f Christos Stavrakakis
                      " db_timestamp: %s",
161 a17a8e98 Christos Stavrakakis
                      msg,
162 a17a8e98 Christos Stavrakakis
                      event_time.strftime(format_),
163 a17a8e98 Christos Stavrakakis
                      db_time.strftime(format_))
164 a17a8e98 Christos Stavrakakis
            return
165 a17a8e98 Christos Stavrakakis
        # New message. Update the database!
166 a17a8e98 Christos Stavrakakis
        func(target, msg, event_time)
167 c4e55622 Christos Stavrakakis
168 c4e55622 Christos Stavrakakis
    return wrapper
169 c4e55622 Christos Stavrakakis
170 c4e55622 Christos Stavrakakis
171 a17a8e98 Christos Stavrakakis
@instance_from_msg
172 a17a8e98 Christos Stavrakakis
@if_update_required
173 a17a8e98 Christos Stavrakakis
def update_db(vm, msg, event_time):
174 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-op-status'"""
175 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-op-status msg: %s", msg)
176 23c84263 Georgios Gousios
177 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-op-status":
178 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
179 c4e55622 Christos Stavrakakis
        return
180 7ca9e930 Vangelis Koukis
181 727fb2f9 Christos Stavrakakis
    operation = msg["operation"]
182 727fb2f9 Christos Stavrakakis
    status = msg["status"]
183 727fb2f9 Christos Stavrakakis
    jobID = msg["jobId"]
184 727fb2f9 Christos Stavrakakis
    logmsg = msg["logmsg"]
185 e6fbada1 Christos Stavrakakis
    nics = msg.get("instance_nics", None)
186 e6fbada1 Christos Stavrakakis
    job_fields = msg.get("job_fields", {})
187 aee560b0 Christos Stavrakakis
    result = msg.get("result", [])
188 727fb2f9 Christos Stavrakakis
189 727fb2f9 Christos Stavrakakis
    # Special case: OP_INSTANCE_CREATE with opportunistic locking may fail
190 727fb2f9 Christos Stavrakakis
    # if all Ganeti nodes are already locked. Retry the job without
191 727fb2f9 Christos Stavrakakis
    # opportunistic locking..
192 727fb2f9 Christos Stavrakakis
    if (operation == "OP_INSTANCE_CREATE" and status == "error" and
193 727fb2f9 Christos Stavrakakis
       job_fields.get("opportunistic_locking", False)):
194 aee560b0 Christos Stavrakakis
        try:
195 aee560b0 Christos Stavrakakis
            error_code = result[1][1]
196 aee560b0 Christos Stavrakakis
        except IndexError:
197 aee560b0 Christos Stavrakakis
            error_code = None
198 aee560b0 Christos Stavrakakis
        if error_code == rapi.ECODE_TEMP_NORES:
199 aee560b0 Christos Stavrakakis
            if vm.backendjobid != jobID:  # The job has already been retried!
200 aee560b0 Christos Stavrakakis
                return
201 aee560b0 Christos Stavrakakis
            # Remove extra fields
202 aee560b0 Christos Stavrakakis
            [job_fields.pop(f) for f in ("OP_ID", "reason")]
203 aee560b0 Christos Stavrakakis
            # Remove 'pnode' and 'snode' if they were set by Ganeti iallocator.
204 aee560b0 Christos Stavrakakis
            # Ganeti will fail if both allocator and nodes are specified.
205 aee560b0 Christos Stavrakakis
            allocator = job_fields.pop("iallocator")
206 aee560b0 Christos Stavrakakis
            if allocator is not None:
207 aee560b0 Christos Stavrakakis
                [job_fields.pop(f) for f in ("pnode", "snode")]
208 aee560b0 Christos Stavrakakis
            name = job_fields.pop("name", job_fields.pop("instance_name"))
209 aee560b0 Christos Stavrakakis
            # Turn off opportunistic locking before retrying the job
210 aee560b0 Christos Stavrakakis
            job_fields["opportunistic_locking"] = False
211 aee560b0 Christos Stavrakakis
            with pooled_rapi_client(vm) as c:
212 aee560b0 Christos Stavrakakis
                jobID = c.CreateInstance(name=name, **job_fields)
213 aee560b0 Christos Stavrakakis
            # Update the VM fields
214 aee560b0 Christos Stavrakakis
            vm.backendjobid = jobID
215 ee995ae2 Christos Stavrakakis
            # Update the task_job_id for commissions
216 ee995ae2 Christos Stavrakakis
            vm.task_job_id = jobID
217 aee560b0 Christos Stavrakakis
            vm.backendjobstatus = None
218 aee560b0 Christos Stavrakakis
            vm.save()
219 aee560b0 Christos Stavrakakis
            log.info("Retrying failed creation of instance '%s' without"
220 aee560b0 Christos Stavrakakis
                     " opportunistic locking. New job ID: '%s'", name, jobID)
221 727fb2f9 Christos Stavrakakis
            return
222 727fb2f9 Christos Stavrakakis
223 3a09d155 Christos Stavrakakis
    backend_mod.process_op_status(vm, event_time, jobID,
224 3a09d155 Christos Stavrakakis
                                  operation, status,
225 3a09d155 Christos Stavrakakis
                                  logmsg, nics=nics,
226 e6fbada1 Christos Stavrakakis
                                  job_fields=job_fields)
227 c4e55622 Christos Stavrakakis
228 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-op-status msg for vm %s.",
229 c4e55622 Christos Stavrakakis
              msg['instance'])
230 c4e55622 Christos Stavrakakis
231 c4e55622 Christos Stavrakakis
232 a17a8e98 Christos Stavrakakis
@network_from_msg
233 a17a8e98 Christos Stavrakakis
@if_update_required
234 a17a8e98 Christos Stavrakakis
def update_network(network, msg, event_time):
235 a17a8e98 Christos Stavrakakis
    """Process a notification of type 'ganeti-network-status'"""
236 a17a8e98 Christos Stavrakakis
    log.debug("Processing ganeti-network-status msg: %s", msg)
237 a17a8e98 Christos Stavrakakis
238 a17a8e98 Christos Stavrakakis
    if msg['type'] != "ganeti-network-status":
239 a17a8e98 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
240 a17a8e98 Christos Stavrakakis
        return
241 a17a8e98 Christos Stavrakakis
242 fd2bdbb2 Christos Stavrakakis
    opcode = msg['operation']
243 fd2bdbb2 Christos Stavrakakis
    status = msg['status']
244 fd2bdbb2 Christos Stavrakakis
    jobid = msg['jobId']
245 e6fbada1 Christos Stavrakakis
    job_fields = msg.get('job_fields', {})
246 fd2bdbb2 Christos Stavrakakis
247 fd2bdbb2 Christos Stavrakakis
    if opcode == "OP_NETWORK_SET_PARAMS":
248 36d450e8 Christos Stavrakakis
        backend_mod.process_network_modify(network, event_time, jobid, opcode,
249 e6fbada1 Christos Stavrakakis
                                           status, job_fields)
250 fd2bdbb2 Christos Stavrakakis
    else:
251 36d450e8 Christos Stavrakakis
        backend_mod.process_network_status(network, event_time, jobid, opcode,
252 36d450e8 Christos Stavrakakis
                                           status, msg['logmsg'])
253 a17a8e98 Christos Stavrakakis
254 22ee6892 Christos Stavrakakis
    log.debug("Done processing ganeti-network-status msg for network %s.",
255 22ee6892 Christos Stavrakakis
              msg['network'])
256 a17a8e98 Christos Stavrakakis
257 a17a8e98 Christos Stavrakakis
258 a17a8e98 Christos Stavrakakis
@instance_from_msg
259 a17a8e98 Christos Stavrakakis
@if_update_required
260 a17a8e98 Christos Stavrakakis
def update_build_progress(vm, msg, event_time):
261 6138f0ef Kostas Papadimitriou
    """
262 6138f0ef Kostas Papadimitriou
    Process a create progress message. Update build progress, or create
263 6138f0ef Kostas Papadimitriou
    appropriate diagnostic entries for the virtual machine instance.
264 6138f0ef Kostas Papadimitriou
    """
265 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-create-progress msg: %s", msg)
266 604b2bf8 Georgios Gousios
267 a3b1aee2 Nikos Skalkotos
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
268 a3b1aee2 Nikos Skalkotos
                           'image-warning', 'image-helper'):
269 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
270 c4e55622 Christos Stavrakakis
        return
271 c25cc9ec Vangelis Koukis
272 0827883e Nikos Skalkotos
    if msg['type'] == 'image-copy-progress':
273 36d450e8 Christos Stavrakakis
        backend_mod.process_create_progress(vm, event_time, msg['progress'])
274 6138f0ef Kostas Papadimitriou
        # we do not add diagnostic messages for copy-progress messages
275 6138f0ef Kostas Papadimitriou
        return
276 6138f0ef Kostas Papadimitriou
277 6138f0ef Kostas Papadimitriou
    # default diagnostic fields
278 6138f0ef Kostas Papadimitriou
    source = msg['type']
279 6138f0ef Kostas Papadimitriou
    level = 'DEBUG'
280 f659cd15 Kostas Papadimitriou
    message = msg.get('messages', '')
281 6138f0ef Kostas Papadimitriou
    if isinstance(message, list):
282 6138f0ef Kostas Papadimitriou
        message = " ".join(message)
283 6138f0ef Kostas Papadimitriou
284 6138f0ef Kostas Papadimitriou
    details = msg.get('stderr', None)
285 6138f0ef Kostas Papadimitriou
286 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-helper':
287 6138f0ef Kostas Papadimitriou
        # for helper task events join subtype to diagnostic source and
288 6138f0ef Kostas Papadimitriou
        # set task name as diagnostic message
289 cc92b70f Christos Stavrakakis
        if msg.get('subtype', None):
290 cc92b70f Christos Stavrakakis
            if msg.get('subtype') in ['task-start', 'task-end']:
291 cc92b70f Christos Stavrakakis
                message = msg.get('task', message)
292 cc92b70f Christos Stavrakakis
                source = "%s-%s" % (source, msg.get('subtype'))
293 6138f0ef Kostas Papadimitriou
294 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'warning':
295 6138f0ef Kostas Papadimitriou
            level = 'WARNING'
296 6138f0ef Kostas Papadimitriou
297 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'error':
298 6138f0ef Kostas Papadimitriou
            level = 'ERROR'
299 6138f0ef Kostas Papadimitriou
300 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'info':
301 6138f0ef Kostas Papadimitriou
            level = 'INFO'
302 6138f0ef Kostas Papadimitriou
303 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-error':
304 6138f0ef Kostas Papadimitriou
        level = 'ERROR'
305 6138f0ef Kostas Papadimitriou
306 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-warning':
307 6138f0ef Kostas Papadimitriou
        level = 'WARNING'
308 6138f0ef Kostas Papadimitriou
309 6138f0ef Kostas Papadimitriou
    if not message.strip():
310 6138f0ef Kostas Papadimitriou
        message = " ".join(source.split("-")).capitalize()
311 6138f0ef Kostas Papadimitriou
312 6138f0ef Kostas Papadimitriou
    # create the diagnostic entry
313 36d450e8 Christos Stavrakakis
    backend_mod.create_instance_diagnostic(vm, message, source, level,
314 36d450e8 Christos Stavrakakis
                                           event_time, details=details)
315 604b2bf8 Georgios Gousios
316 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
317 c4e55622 Christos Stavrakakis
              msg['instance'])
318 604b2bf8 Georgios Gousios
319 604b2bf8 Georgios Gousios
320 73756651 Christos Stavrakakis
@handle_message_delivery
321 36d450e8 Christos Stavrakakis
@transaction.commit_on_success()
322 36d450e8 Christos Stavrakakis
def update_cluster(msg):
323 36d450e8 Christos Stavrakakis
    clustername = msg.get("cluster")
324 36d450e8 Christos Stavrakakis
    if clustername is None:
325 36d450e8 Christos Stavrakakis
        return
326 36d450e8 Christos Stavrakakis
    backend = Backend.objects.select_for_update().get(clustername=clustername)
327 36d450e8 Christos Stavrakakis
    backend_mod.update_backend_disk_templates(backend)
328 36d450e8 Christos Stavrakakis
    backend_mod.update_backend_resources(backend)
329 36d450e8 Christos Stavrakakis
330 36d450e8 Christos Stavrakakis
331 a17a8e98 Christos Stavrakakis
def dummy_proc(client, message, *args, **kwargs):
332 23c84263 Georgios Gousios
    try:
333 c4e55622 Christos Stavrakakis
        log.debug("Msg: %s", message['body'])
334 33b93f81 Christos Stavrakakis
        client.basic_ack(message)
335 23c84263 Georgios Gousios
    except Exception as e:
336 22ee6892 Christos Stavrakakis
        log.exception("Could not receive message %s" % e)