Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ dccd42eb

History | View | Annotate | Download (10.1 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 86f046a8 Giorgos Verigakis
import logging
34 23c84263 Georgios Gousios
import json
35 c4e55622 Christos Stavrakakis
from functools import wraps
36 23c84263 Georgios Gousios
37 22ee6892 Christos Stavrakakis
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38 1ed37c1d Giorgos Verigakis
from synnefo.logic import utils, backend
39 23c84263 Georgios Gousios
40 c4e55622 Christos Stavrakakis
from synnefo.lib.utils import merge_time
41 c4e55622 Christos Stavrakakis
42 fdfd8c6d Christos Stavrakakis
log = logging.getLogger(__name__)
43 23c84263 Georgios Gousios
44 cc92b70f Christos Stavrakakis
45 a17a8e98 Christos Stavrakakis
def handle_message_delivery(func):
46 a17a8e98 Christos Stavrakakis
    """ Generic decorator for handling messages.
47 c4e55622 Christos Stavrakakis

48 a17a8e98 Christos Stavrakakis
    This decorator is responsible for converting the message into json format,
49 a17a8e98 Christos Stavrakakis
    handling of common exceptions and acknowledment of message if needed.
50 c4e55622 Christos Stavrakakis

51 c4e55622 Christos Stavrakakis
    """
52 c4e55622 Christos Stavrakakis
    @wraps(func)
53 c4e55622 Christos Stavrakakis
    def wrapper(client, message, *args, **kwargs):
54 c4e55622 Christos Stavrakakis
        try:
55 e6f6627c Christos Stavrakakis
            msg = None
56 c4e55622 Christos Stavrakakis
            msg = json.loads(message['body'])
57 a17a8e98 Christos Stavrakakis
            func(msg)
58 a17a8e98 Christos Stavrakakis
            client.basic_ack(message)
59 22ee6892 Christos Stavrakakis
        except ValueError as e:
60 22ee6892 Christos Stavrakakis
            log.error("Incoming message not in JSON format %s: %s", e, message)
61 b9d91e62 Christos Stavrakakis
            client.basic_nack(message)
62 22ee6892 Christos Stavrakakis
        except KeyError as e:
63 22ee6892 Christos Stavrakakis
            log.error("Malformed incoming JSON, missing attribute %s: %s",
64 22ee6892 Christos Stavrakakis
                      e, message)
65 b9d91e62 Christos Stavrakakis
            client.basic_nack(message)
66 a17a8e98 Christos Stavrakakis
        except Exception as e:
67 e6f6627c Christos Stavrakakis
            if msg:
68 e6f6627c Christos Stavrakakis
                log.exception("Unexpected error: %s, msg: %s", e, msg)
69 e6f6627c Christos Stavrakakis
            else:
70 e6f6627c Christos Stavrakakis
                log.exception("Unexpected error: %s", e)
71 b9d91e62 Christos Stavrakakis
            client.basic_reject(message)
72 a17a8e98 Christos Stavrakakis
73 a17a8e98 Christos Stavrakakis
    return wrapper
74 a17a8e98 Christos Stavrakakis
75 22ee6892 Christos Stavrakakis
76 a17a8e98 Christos Stavrakakis
def instance_from_msg(func):
77 a17a8e98 Christos Stavrakakis
    """ Decorator for getting the VirtualMachine object of the msg.
78 a17a8e98 Christos Stavrakakis

79 a17a8e98 Christos Stavrakakis
    """
80 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
81 a17a8e98 Christos Stavrakakis
    @wraps(func)
82 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
83 a17a8e98 Christos Stavrakakis
        try:
84 a17a8e98 Christos Stavrakakis
            vm_id = utils.id_from_instance_name(msg["instance"])
85 7f2dbcad Christos Stavrakakis
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
86 a17a8e98 Christos Stavrakakis
            func(vm, msg)
87 c4e55622 Christos Stavrakakis
        except VirtualMachine.InvalidBackendIdError:
88 c4e55622 Christos Stavrakakis
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
89 c4e55622 Christos Stavrakakis
        except VirtualMachine.DoesNotExist:
90 c4e55622 Christos Stavrakakis
            log.error("VM for instance %s with id %d not found in DB.",
91 c4e55622 Christos Stavrakakis
                      msg['instance'], vm_id)
92 15cb13b5 Christos Stavrakakis
        except Network.InvalidBackendIdError, Network.DoesNotExist:
93 15cb13b5 Christos Stavrakakis
            log.error("Invalid message", msg)
94 a17a8e98 Christos Stavrakakis
    return wrapper
95 a17a8e98 Christos Stavrakakis
96 22ee6892 Christos Stavrakakis
97 a17a8e98 Christos Stavrakakis
def network_from_msg(func):
98 22ee6892 Christos Stavrakakis
    """ Decorator for getting the BackendNetwork object of the msg.
99 a17a8e98 Christos Stavrakakis

100 a17a8e98 Christos Stavrakakis
    """
101 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
102 a17a8e98 Christos Stavrakakis
    @wraps(func)
103 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
104 a17a8e98 Christos Stavrakakis
        try:
105 a17a8e98 Christos Stavrakakis
            network_id = utils.id_from_network_name(msg["network"])
106 d2e73c0c Christos Stavrakakis
            network = Network.objects.select_for_update().get(id=network_id)
107 22ee6892 Christos Stavrakakis
            backend = Backend.objects.get(clustername=msg['cluster'])
108 22ee6892 Christos Stavrakakis
            backend_network = BackendNetwork.objects.get(network=network,
109 22ee6892 Christos Stavrakakis
                                                         backend=backend)
110 22ee6892 Christos Stavrakakis
            func(backend_network, msg)
111 a17a8e98 Christos Stavrakakis
        except Network.InvalidBackendIdError:
112 a17a8e98 Christos Stavrakakis
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
113 a17a8e98 Christos Stavrakakis
        except Network.DoesNotExist:
114 22ee6892 Christos Stavrakakis
            log.error("Network %s not found in DB.", msg['network'])
115 22ee6892 Christos Stavrakakis
        except Backend.DoesNotExist:
116 22ee6892 Christos Stavrakakis
            log.error("Backend %s not found in DB.", msg['cluster'])
117 22ee6892 Christos Stavrakakis
        except BackendNetwork.DoesNotExist:
118 22ee6892 Christos Stavrakakis
            log.error("Network %s on backend %s not found in DB.",
119 22ee6892 Christos Stavrakakis
                      msg['network'], msg['cluster'])
120 a17a8e98 Christos Stavrakakis
    return wrapper
121 a17a8e98 Christos Stavrakakis
122 22ee6892 Christos Stavrakakis
123 a17a8e98 Christos Stavrakakis
def if_update_required(func):
124 a17a8e98 Christos Stavrakakis
    """
125 a17a8e98 Christos Stavrakakis
    Decorator for checking if an incoming message needs to update the db.
126 a17a8e98 Christos Stavrakakis

127 a17a8e98 Christos Stavrakakis
    The database will not be updated in the following cases:
128 a17a8e98 Christos Stavrakakis
    - The message has been redelivered and the action has already been
129 a17a8e98 Christos Stavrakakis
      completed. In this case the event_time will be equal with the one
130 a17a8e98 Christos Stavrakakis
      in the database.
131 22ee6892 Christos Stavrakakis
    - The message describes a previous state in the ganeti, from the one that
132 22ee6892 Christos Stavrakakis
      is described in the db. In this case the event_time will be smaller from
133 22ee6892 Christos Stavrakakis
      the one in the database.
134 a17a8e98 Christos Stavrakakis

135 a17a8e98 Christos Stavrakakis
    """
136 a17a8e98 Christos Stavrakakis
    @wraps(func)
137 a17a8e98 Christos Stavrakakis
    def wrapper(target, msg):
138 bb80a8d7 Christos Stavrakakis
        try:
139 bb80a8d7 Christos Stavrakakis
            event_time = merge_time(msg['event_time'])
140 bb80a8d7 Christos Stavrakakis
        except:
141 bb80a8d7 Christos Stavrakakis
            log.error("Received message with malformed time: %s",
142 bb80a8d7 Christos Stavrakakis
                      msg['event_time'])
143 bb80a8d7 Christos Stavrakakis
            raise KeyError
144 bb80a8d7 Christos Stavrakakis
145 a17a8e98 Christos Stavrakakis
        db_time = target.backendtime
146 a17a8e98 Christos Stavrakakis
147 22ee6892 Christos Stavrakakis
        if db_time and event_time <= db_time:
148 a17a8e98 Christos Stavrakakis
            format_ = "%d/%m/%y %H:%M:%S:%f"
149 cc92b70f Christos Stavrakakis
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
150 cc92b70f Christos Stavrakakis
                      " db_timestamp: %s",
151 a17a8e98 Christos Stavrakakis
                      msg,
152 a17a8e98 Christos Stavrakakis
                      event_time.strftime(format_),
153 a17a8e98 Christos Stavrakakis
                      db_time.strftime(format_))
154 a17a8e98 Christos Stavrakakis
            return
155 a17a8e98 Christos Stavrakakis
        # New message. Update the database!
156 a17a8e98 Christos Stavrakakis
        func(target, msg, event_time)
157 c4e55622 Christos Stavrakakis
158 c4e55622 Christos Stavrakakis
    return wrapper
159 c4e55622 Christos Stavrakakis
160 c4e55622 Christos Stavrakakis
161 a17a8e98 Christos Stavrakakis
@instance_from_msg
162 a17a8e98 Christos Stavrakakis
@if_update_required
163 a17a8e98 Christos Stavrakakis
def update_db(vm, msg, event_time):
164 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-op-status'"""
165 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-op-status msg: %s", msg)
166 23c84263 Georgios Gousios
167 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-op-status":
168 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
169 c4e55622 Christos Stavrakakis
        return
170 7ca9e930 Vangelis Koukis
171 c4e55622 Christos Stavrakakis
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
172 c4e55622 Christos Stavrakakis
                              msg['status'], msg['logmsg'])
173 c4e55622 Christos Stavrakakis
174 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-op-status msg for vm %s.",
175 c4e55622 Christos Stavrakakis
              msg['instance'])
176 c4e55622 Christos Stavrakakis
177 c4e55622 Christos Stavrakakis
178 a17a8e98 Christos Stavrakakis
@instance_from_msg
179 a17a8e98 Christos Stavrakakis
@if_update_required
180 a17a8e98 Christos Stavrakakis
def update_net(vm, msg, event_time):
181 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-net-status'"""
182 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-net-status msg: %s", msg)
183 7ca9e930 Vangelis Koukis
184 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-net-status":
185 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
186 c4e55622 Christos Stavrakakis
        return
187 9068cd85 Georgios Gousios
188 c4e55622 Christos Stavrakakis
    backend.process_net_status(vm, event_time, msg['nics'])
189 604b2bf8 Georgios Gousios
190 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-net-status msg for vm %s.",
191 c4e55622 Christos Stavrakakis
              msg["instance"])
192 604b2bf8 Georgios Gousios
193 604b2bf8 Georgios Gousios
194 a17a8e98 Christos Stavrakakis
@network_from_msg
195 a17a8e98 Christos Stavrakakis
@if_update_required
196 a17a8e98 Christos Stavrakakis
def update_network(network, msg, event_time):
197 a17a8e98 Christos Stavrakakis
    """Process a notification of type 'ganeti-network-status'"""
198 a17a8e98 Christos Stavrakakis
    log.debug("Processing ganeti-network-status msg: %s", msg)
199 a17a8e98 Christos Stavrakakis
200 a17a8e98 Christos Stavrakakis
    if msg['type'] != "ganeti-network-status":
201 a17a8e98 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
202 a17a8e98 Christos Stavrakakis
        return
203 a17a8e98 Christos Stavrakakis
204 fd2bdbb2 Christos Stavrakakis
    opcode = msg['operation']
205 fd2bdbb2 Christos Stavrakakis
    status = msg['status']
206 fd2bdbb2 Christos Stavrakakis
    jobid = msg['jobId']
207 fd2bdbb2 Christos Stavrakakis
208 fd2bdbb2 Christos Stavrakakis
    if opcode == "OP_NETWORK_SET_PARAMS":
209 fd2bdbb2 Christos Stavrakakis
        backend.process_network_modify(network, event_time, jobid, opcode,
210 fd2bdbb2 Christos Stavrakakis
                                       status, msg['add_reserved_ips'],
211 fd2bdbb2 Christos Stavrakakis
                                       msg['remove_reserved_ips'])
212 fd2bdbb2 Christos Stavrakakis
    else:
213 fd2bdbb2 Christos Stavrakakis
        backend.process_network_status(network, event_time, jobid, opcode,
214 fd2bdbb2 Christos Stavrakakis
                                       status, msg['logmsg'])
215 a17a8e98 Christos Stavrakakis
216 22ee6892 Christos Stavrakakis
    log.debug("Done processing ganeti-network-status msg for network %s.",
217 22ee6892 Christos Stavrakakis
              msg['network'])
218 a17a8e98 Christos Stavrakakis
219 a17a8e98 Christos Stavrakakis
220 a17a8e98 Christos Stavrakakis
@instance_from_msg
221 a17a8e98 Christos Stavrakakis
@if_update_required
222 a17a8e98 Christos Stavrakakis
def update_build_progress(vm, msg, event_time):
223 6138f0ef Kostas Papadimitriou
    """
224 6138f0ef Kostas Papadimitriou
    Process a create progress message. Update build progress, or create
225 6138f0ef Kostas Papadimitriou
    appropriate diagnostic entries for the virtual machine instance.
226 6138f0ef Kostas Papadimitriou
    """
227 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-create-progress msg: %s", msg)
228 604b2bf8 Georgios Gousios
229 a3b1aee2 Nikos Skalkotos
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
230 a3b1aee2 Nikos Skalkotos
                           'image-warning', 'image-helper'):
231 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
232 c4e55622 Christos Stavrakakis
        return
233 c25cc9ec Vangelis Koukis
234 0827883e Nikos Skalkotos
    if msg['type'] == 'image-copy-progress':
235 0827883e Nikos Skalkotos
        backend.process_create_progress(vm, event_time, msg['progress'])
236 6138f0ef Kostas Papadimitriou
        # we do not add diagnostic messages for copy-progress messages
237 6138f0ef Kostas Papadimitriou
        return
238 6138f0ef Kostas Papadimitriou
239 6138f0ef Kostas Papadimitriou
    # default diagnostic fields
240 6138f0ef Kostas Papadimitriou
    source = msg['type']
241 6138f0ef Kostas Papadimitriou
    level = 'DEBUG'
242 f659cd15 Kostas Papadimitriou
    message = msg.get('messages', '')
243 6138f0ef Kostas Papadimitriou
    if isinstance(message, list):
244 6138f0ef Kostas Papadimitriou
        message = " ".join(message)
245 6138f0ef Kostas Papadimitriou
246 6138f0ef Kostas Papadimitriou
    details = msg.get('stderr', None)
247 6138f0ef Kostas Papadimitriou
248 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-helper':
249 6138f0ef Kostas Papadimitriou
        # for helper task events join subtype to diagnostic source and
250 6138f0ef Kostas Papadimitriou
        # set task name as diagnostic message
251 cc92b70f Christos Stavrakakis
        if msg.get('subtype', None):
252 cc92b70f Christos Stavrakakis
            if msg.get('subtype') in ['task-start', 'task-end']:
253 cc92b70f Christos Stavrakakis
                message = msg.get('task', message)
254 cc92b70f Christos Stavrakakis
                source = "%s-%s" % (source, msg.get('subtype'))
255 6138f0ef Kostas Papadimitriou
256 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'warning':
257 6138f0ef Kostas Papadimitriou
            level = 'WARNING'
258 6138f0ef Kostas Papadimitriou
259 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'error':
260 6138f0ef Kostas Papadimitriou
            level = 'ERROR'
261 6138f0ef Kostas Papadimitriou
262 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'info':
263 6138f0ef Kostas Papadimitriou
            level = 'INFO'
264 6138f0ef Kostas Papadimitriou
265 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-error':
266 6138f0ef Kostas Papadimitriou
        level = 'ERROR'
267 6138f0ef Kostas Papadimitriou
268 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-warning':
269 6138f0ef Kostas Papadimitriou
        level = 'WARNING'
270 6138f0ef Kostas Papadimitriou
271 6138f0ef Kostas Papadimitriou
    if not message.strip():
272 6138f0ef Kostas Papadimitriou
        message = " ".join(source.split("-")).capitalize()
273 6138f0ef Kostas Papadimitriou
274 6138f0ef Kostas Papadimitriou
    # create the diagnostic entry
275 6138f0ef Kostas Papadimitriou
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
276 cc92b70f Christos Stavrakakis
                                       details=details)
277 604b2bf8 Georgios Gousios
278 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
279 c4e55622 Christos Stavrakakis
              msg['instance'])
280 604b2bf8 Georgios Gousios
281 604b2bf8 Georgios Gousios
282 a17a8e98 Christos Stavrakakis
def dummy_proc(client, message, *args, **kwargs):
283 23c84263 Georgios Gousios
    try:
284 c4e55622 Christos Stavrakakis
        log.debug("Msg: %s", message['body'])
285 33b93f81 Christos Stavrakakis
        client.basic_ack(message)
286 23c84263 Georgios Gousios
    except Exception as e:
287 22ee6892 Christos Stavrakakis
        log.exception("Could not receive message %s" % e)