Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 6138f0ef

History | View | Annotate | Download (9.4 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 86f046a8 Giorgos Verigakis
import logging
34 23c84263 Georgios Gousios
import json
35 c4e55622 Christos Stavrakakis
from functools import wraps
36 23c84263 Georgios Gousios
37 22ee6892 Christos Stavrakakis
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38 1ed37c1d Giorgos Verigakis
from synnefo.logic import utils, backend
39 23c84263 Georgios Gousios
40 c4e55622 Christos Stavrakakis
from synnefo.lib.utils import merge_time
41 c4e55622 Christos Stavrakakis
42 86f046a8 Giorgos Verigakis
log = logging.getLogger()
43 23c84263 Georgios Gousios
44 a17a8e98 Christos Stavrakakis
def handle_message_delivery(func):
45 a17a8e98 Christos Stavrakakis
    """ Generic decorator for handling messages.
46 c4e55622 Christos Stavrakakis

47 a17a8e98 Christos Stavrakakis
    This decorator is responsible for converting the message into json format,
48 a17a8e98 Christos Stavrakakis
    handling of common exceptions and acknowledment of message if needed.
49 c4e55622 Christos Stavrakakis

50 c4e55622 Christos Stavrakakis
    """
51 c4e55622 Christos Stavrakakis
    @wraps(func)
52 c4e55622 Christos Stavrakakis
    def wrapper(client, message, *args, **kwargs):
53 c4e55622 Christos Stavrakakis
        try:
54 c4e55622 Christos Stavrakakis
            msg = json.loads(message['body'])
55 a17a8e98 Christos Stavrakakis
            func(msg)
56 a17a8e98 Christos Stavrakakis
            client.basic_ack(message)
57 22ee6892 Christos Stavrakakis
        except ValueError as e:
58 22ee6892 Christos Stavrakakis
            log.error("Incoming message not in JSON format %s: %s", e, message)
59 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
60 22ee6892 Christos Stavrakakis
        except KeyError as e:
61 22ee6892 Christos Stavrakakis
            log.error("Malformed incoming JSON, missing attribute %s: %s",
62 22ee6892 Christos Stavrakakis
                      e, message)
63 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
64 a17a8e98 Christos Stavrakakis
        except Exception as e:
65 a17a8e98 Christos Stavrakakis
            log.exception("Unexpected error: %s, msg: %s", e, msg)
66 a17a8e98 Christos Stavrakakis
67 a17a8e98 Christos Stavrakakis
    return wrapper
68 a17a8e98 Christos Stavrakakis
69 22ee6892 Christos Stavrakakis
70 a17a8e98 Christos Stavrakakis
def instance_from_msg(func):
71 a17a8e98 Christos Stavrakakis
    """ Decorator for getting the VirtualMachine object of the msg.
72 a17a8e98 Christos Stavrakakis

73 a17a8e98 Christos Stavrakakis
    """
74 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
75 a17a8e98 Christos Stavrakakis
    @wraps(func)
76 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
77 a17a8e98 Christos Stavrakakis
        try:
78 a17a8e98 Christos Stavrakakis
            vm_id = utils.id_from_instance_name(msg["instance"])
79 a17a8e98 Christos Stavrakakis
            vm = VirtualMachine.objects.get(id=vm_id)
80 a17a8e98 Christos Stavrakakis
            func(vm, msg)
81 c4e55622 Christos Stavrakakis
        except VirtualMachine.InvalidBackendIdError:
82 c4e55622 Christos Stavrakakis
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
83 c4e55622 Christos Stavrakakis
        except VirtualMachine.DoesNotExist:
84 c4e55622 Christos Stavrakakis
            log.error("VM for instance %s with id %d not found in DB.",
85 c4e55622 Christos Stavrakakis
                      msg['instance'], vm_id)
86 15cb13b5 Christos Stavrakakis
        except Network.InvalidBackendIdError, Network.DoesNotExist:
87 15cb13b5 Christos Stavrakakis
            log.error("Invalid message", msg)
88 a17a8e98 Christos Stavrakakis
    return wrapper
89 a17a8e98 Christos Stavrakakis
90 22ee6892 Christos Stavrakakis
91 a17a8e98 Christos Stavrakakis
def network_from_msg(func):
92 22ee6892 Christos Stavrakakis
    """ Decorator for getting the BackendNetwork object of the msg.
93 a17a8e98 Christos Stavrakakis

94 a17a8e98 Christos Stavrakakis
    """
95 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
96 a17a8e98 Christos Stavrakakis
    @wraps(func)
97 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
98 a17a8e98 Christos Stavrakakis
        try:
99 a17a8e98 Christos Stavrakakis
            network_id = utils.id_from_network_name(msg["network"])
100 a17a8e98 Christos Stavrakakis
            network = Network.objects.get(id=network_id)
101 22ee6892 Christos Stavrakakis
            backend = Backend.objects.get(clustername=msg['cluster'])
102 22ee6892 Christos Stavrakakis
            backend_network = BackendNetwork.objects.get(network=network,
103 22ee6892 Christos Stavrakakis
                                                         backend=backend)
104 22ee6892 Christos Stavrakakis
            func(backend_network, msg)
105 a17a8e98 Christos Stavrakakis
        except Network.InvalidBackendIdError:
106 a17a8e98 Christos Stavrakakis
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
107 a17a8e98 Christos Stavrakakis
        except Network.DoesNotExist:
108 22ee6892 Christos Stavrakakis
            log.error("Network %s not found in DB.", msg['network'])
109 22ee6892 Christos Stavrakakis
        except Backend.DoesNotExist:
110 22ee6892 Christos Stavrakakis
            log.error("Backend %s not found in DB.", msg['cluster'])
111 22ee6892 Christos Stavrakakis
        except BackendNetwork.DoesNotExist:
112 22ee6892 Christos Stavrakakis
            log.error("Network %s on backend %s not found in DB.",
113 22ee6892 Christos Stavrakakis
                      msg['network'], msg['cluster'])
114 a17a8e98 Christos Stavrakakis
    return wrapper
115 a17a8e98 Christos Stavrakakis
116 22ee6892 Christos Stavrakakis
117 a17a8e98 Christos Stavrakakis
def if_update_required(func):
118 a17a8e98 Christos Stavrakakis
    """
119 a17a8e98 Christos Stavrakakis
    Decorator for checking if an incoming message needs to update the db.
120 a17a8e98 Christos Stavrakakis

121 a17a8e98 Christos Stavrakakis
    The database will not be updated in the following cases:
122 a17a8e98 Christos Stavrakakis
    - The message has been redelivered and the action has already been
123 a17a8e98 Christos Stavrakakis
      completed. In this case the event_time will be equal with the one
124 a17a8e98 Christos Stavrakakis
      in the database.
125 22ee6892 Christos Stavrakakis
    - The message describes a previous state in the ganeti, from the one that
126 22ee6892 Christos Stavrakakis
      is described in the db. In this case the event_time will be smaller from
127 22ee6892 Christos Stavrakakis
      the one in the database.
128 a17a8e98 Christos Stavrakakis

129 a17a8e98 Christos Stavrakakis
    """
130 a17a8e98 Christos Stavrakakis
    @wraps(func)
131 a17a8e98 Christos Stavrakakis
    def wrapper(target, msg):
132 a17a8e98 Christos Stavrakakis
        event_time = merge_time(msg['event_time'])
133 a17a8e98 Christos Stavrakakis
        db_time = target.backendtime
134 a17a8e98 Christos Stavrakakis
135 22ee6892 Christos Stavrakakis
        if db_time and event_time <= db_time:
136 a17a8e98 Christos Stavrakakis
            format_ = "%d/%m/%y %H:%M:%S:%f"
137 a17a8e98 Christos Stavrakakis
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
138 a17a8e98 Christos Stavrakakis
                      msg,
139 a17a8e98 Christos Stavrakakis
                      event_time.strftime(format_),
140 a17a8e98 Christos Stavrakakis
                      db_time.strftime(format_))
141 a17a8e98 Christos Stavrakakis
            return
142 a17a8e98 Christos Stavrakakis
        # New message. Update the database!
143 a17a8e98 Christos Stavrakakis
        func(target, msg, event_time)
144 c4e55622 Christos Stavrakakis
145 c4e55622 Christos Stavrakakis
    return wrapper
146 c4e55622 Christos Stavrakakis
147 c4e55622 Christos Stavrakakis
148 a17a8e98 Christos Stavrakakis
@instance_from_msg
149 a17a8e98 Christos Stavrakakis
@if_update_required
150 a17a8e98 Christos Stavrakakis
def update_db(vm, msg, event_time):
151 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-op-status'"""
152 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-op-status msg: %s", msg)
153 23c84263 Georgios Gousios
154 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-op-status":
155 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
156 c4e55622 Christos Stavrakakis
        return
157 7ca9e930 Vangelis Koukis
158 c4e55622 Christos Stavrakakis
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
159 c4e55622 Christos Stavrakakis
                              msg['status'], msg['logmsg'])
160 c4e55622 Christos Stavrakakis
161 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-op-status msg for vm %s.",
162 c4e55622 Christos Stavrakakis
              msg['instance'])
163 c4e55622 Christos Stavrakakis
164 c4e55622 Christos Stavrakakis
165 a17a8e98 Christos Stavrakakis
@instance_from_msg
166 a17a8e98 Christos Stavrakakis
@if_update_required
167 a17a8e98 Christos Stavrakakis
def update_net(vm, msg, event_time):
168 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-net-status'"""
169 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-net-status msg: %s", msg)
170 7ca9e930 Vangelis Koukis
171 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-net-status":
172 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
173 c4e55622 Christos Stavrakakis
        return
174 9068cd85 Georgios Gousios
175 c4e55622 Christos Stavrakakis
    backend.process_net_status(vm, event_time, msg['nics'])
176 604b2bf8 Georgios Gousios
177 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-net-status msg for vm %s.",
178 c4e55622 Christos Stavrakakis
              msg["instance"])
179 604b2bf8 Georgios Gousios
180 604b2bf8 Georgios Gousios
181 a17a8e98 Christos Stavrakakis
@network_from_msg
182 a17a8e98 Christos Stavrakakis
@if_update_required
183 a17a8e98 Christos Stavrakakis
def update_network(network, msg, event_time):
184 a17a8e98 Christos Stavrakakis
    """Process a notification of type 'ganeti-network-status'"""
185 a17a8e98 Christos Stavrakakis
    log.debug("Processing ganeti-network-status msg: %s", msg)
186 a17a8e98 Christos Stavrakakis
187 a17a8e98 Christos Stavrakakis
    if msg['type'] != "ganeti-network-status":
188 a17a8e98 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
189 a17a8e98 Christos Stavrakakis
        return
190 a17a8e98 Christos Stavrakakis
191 22ee6892 Christos Stavrakakis
    backend.process_network_status(network, event_time,
192 22ee6892 Christos Stavrakakis
                                   msg['jobId'], msg['operation'],
193 22ee6892 Christos Stavrakakis
                                   msg['status'], msg['logmsg'])
194 a17a8e98 Christos Stavrakakis
195 22ee6892 Christos Stavrakakis
    log.debug("Done processing ganeti-network-status msg for network %s.",
196 22ee6892 Christos Stavrakakis
              msg['network'])
197 a17a8e98 Christos Stavrakakis
198 a17a8e98 Christos Stavrakakis
199 a17a8e98 Christos Stavrakakis
@instance_from_msg
200 a17a8e98 Christos Stavrakakis
@if_update_required
201 a17a8e98 Christos Stavrakakis
def update_build_progress(vm, msg, event_time):
202 6138f0ef Kostas Papadimitriou
    """
203 6138f0ef Kostas Papadimitriou
    Process a create progress message. Update build progress, or create
204 6138f0ef Kostas Papadimitriou
    appropriate diagnostic entries for the virtual machine instance.
205 6138f0ef Kostas Papadimitriou
    """
206 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-create-progress msg: %s", msg)
207 604b2bf8 Georgios Gousios
208 a3b1aee2 Nikos Skalkotos
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
209 a3b1aee2 Nikos Skalkotos
                           'image-warning', 'image-helper'):
210 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
211 c4e55622 Christos Stavrakakis
        return
212 c25cc9ec Vangelis Koukis
213 0827883e Nikos Skalkotos
    if msg['type'] == 'image-copy-progress':
214 0827883e Nikos Skalkotos
        backend.process_create_progress(vm, event_time, msg['progress'])
215 6138f0ef Kostas Papadimitriou
        # we do not add diagnostic messages for copy-progress messages
216 6138f0ef Kostas Papadimitriou
        return
217 6138f0ef Kostas Papadimitriou
218 6138f0ef Kostas Papadimitriou
    # default diagnostic fields
219 6138f0ef Kostas Papadimitriou
    source = msg['type']
220 6138f0ef Kostas Papadimitriou
    level = 'DEBUG'
221 6138f0ef Kostas Papadimitriou
    message = msg.get('messages', None)
222 6138f0ef Kostas Papadimitriou
    if isinstance(message, list):
223 6138f0ef Kostas Papadimitriou
        message = " ".join(message)
224 6138f0ef Kostas Papadimitriou
225 6138f0ef Kostas Papadimitriou
    details = msg.get('stderr', None)
226 6138f0ef Kostas Papadimitriou
227 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-helper':
228 6138f0ef Kostas Papadimitriou
        # for helper task events join subtype to diagnostic source and
229 6138f0ef Kostas Papadimitriou
        # set task name as diagnostic message
230 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) and msg.get('subtype') in ['task-start',
231 6138f0ef Kostas Papadimitriou
              'task-end']:
232 6138f0ef Kostas Papadimitriou
            message = msg.get('task', message)
233 6138f0ef Kostas Papadimitriou
            source = "%s-%s" % (source, msg.get('subtype'))
234 6138f0ef Kostas Papadimitriou
235 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'warning':
236 6138f0ef Kostas Papadimitriou
            level = 'WARNING'
237 6138f0ef Kostas Papadimitriou
238 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'error':
239 6138f0ef Kostas Papadimitriou
            level = 'ERROR'
240 6138f0ef Kostas Papadimitriou
241 6138f0ef Kostas Papadimitriou
        if msg.get('subtype', None) == 'info':
242 6138f0ef Kostas Papadimitriou
            level = 'INFO'
243 6138f0ef Kostas Papadimitriou
244 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-error':
245 6138f0ef Kostas Papadimitriou
        level = 'ERROR'
246 6138f0ef Kostas Papadimitriou
247 6138f0ef Kostas Papadimitriou
    if msg['type'] == 'image-warning':
248 6138f0ef Kostas Papadimitriou
        level = 'WARNING'
249 6138f0ef Kostas Papadimitriou
250 6138f0ef Kostas Papadimitriou
    if not message.strip():
251 6138f0ef Kostas Papadimitriou
        message = " ".join(source.split("-")).capitalize()
252 6138f0ef Kostas Papadimitriou
253 6138f0ef Kostas Papadimitriou
    # create the diagnostic entry
254 6138f0ef Kostas Papadimitriou
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
255 6138f0ef Kostas Papadimitriou
        details=details)
256 604b2bf8 Georgios Gousios
257 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
258 c4e55622 Christos Stavrakakis
              msg['instance'])
259 604b2bf8 Georgios Gousios
260 604b2bf8 Georgios Gousios
261 a17a8e98 Christos Stavrakakis
def dummy_proc(client, message, *args, **kwargs):
262 23c84263 Georgios Gousios
    try:
263 c4e55622 Christos Stavrakakis
        log.debug("Msg: %s", message['body'])
264 33b93f81 Christos Stavrakakis
        client.basic_ack(message)
265 23c84263 Georgios Gousios
    except Exception as e:
266 22ee6892 Christos Stavrakakis
        log.exception("Could not receive message %s" % e)