Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 25891abc

History | View | Annotate | Download (9.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger(__name__)
43

    
44

    
45
def handle_message_delivery(func):
46
    """ Generic decorator for handling messages.
47

48
    This decorator is responsible for converting the message into json format,
49
    handling of common exceptions and acknowledment of message if needed.
50

51
    """
52
    @wraps(func)
53
    def wrapper(client, message, *args, **kwargs):
54
        try:
55
            msg = None
56
            msg = json.loads(message['body'])
57
            func(msg)
58
            client.basic_ack(message)
59
        except ValueError as e:
60
            log.error("Incoming message not in JSON format %s: %s", e, message)
61
            client.basic_nack(message)
62
        except KeyError as e:
63
            log.error("Malformed incoming JSON, missing attribute %s: %s",
64
                      e, message)
65
            client.basic_nack(message)
66
        except Exception as e:
67
            if msg:
68
                log.exception("Unexpected error: %s, msg: %s", e, msg)
69
            else:
70
                log.exception("Unexpected error: %s", e)
71
            client.basic_reject(message)
72

    
73
    return wrapper
74

    
75

    
76
def instance_from_msg(func):
77
    """ Decorator for getting the VirtualMachine object of the msg.
78

79
    """
80
    @handle_message_delivery
81
    @wraps(func)
82
    def wrapper(msg):
83
        try:
84
            vm_id = utils.id_from_instance_name(msg["instance"])
85
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
86
            func(vm, msg)
87
        except VirtualMachine.InvalidBackendIdError:
88
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
89
        except VirtualMachine.DoesNotExist:
90
            log.error("VM for instance %s with id %d not found in DB.",
91
                      msg['instance'], vm_id)
92
        except Network.InvalidBackendIdError, Network.DoesNotExist:
93
            log.error("Invalid message", msg)
94
    return wrapper
95

    
96

    
97
def network_from_msg(func):
98
    """ Decorator for getting the BackendNetwork object of the msg.
99

100
    """
101
    @handle_message_delivery
102
    @wraps(func)
103
    def wrapper(msg):
104
        try:
105
            network_id = utils.id_from_network_name(msg["network"])
106
            network = Network.objects.select_for_update().get(id=network_id)
107
            backend = Backend.objects.get(clustername=msg['cluster'])
108
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
109
                                                             backend=backend)
110
            if new:
111
                log.info("Created missing BackendNetwork %s", bnet)
112
            func(bnet, msg)
113
        except Network.InvalidBackendIdError:
114
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
115
        except Network.DoesNotExist:
116
            log.error("Network %s not found in DB.", msg['network'])
117
        except Backend.DoesNotExist:
118
            log.error("Backend %s not found in DB.", msg['cluster'])
119
        except BackendNetwork.DoesNotExist:
120
            log.error("Network %s on backend %s not found in DB.",
121
                      msg['network'], msg['cluster'])
122
    return wrapper
123

    
124

    
125
def if_update_required(func):
126
    """
127
    Decorator for checking if an incoming message needs to update the db.
128

129
    The database will not be updated in the following cases:
130
    - The message has been redelivered and the action has already been
131
      completed. In this case the event_time will be equal with the one
132
      in the database.
133
    - The message describes a previous state in the ganeti, from the one that
134
      is described in the db. In this case the event_time will be smaller from
135
      the one in the database.
136

137
    """
138
    @wraps(func)
139
    def wrapper(target, msg):
140
        try:
141
            event_time = merge_time(msg['event_time'])
142
        except:
143
            log.error("Received message with malformed time: %s",
144
                      msg['event_time'])
145
            raise KeyError
146

    
147
        db_time = target.backendtime
148

    
149
        if db_time and event_time <= db_time:
150
            format_ = "%d/%m/%y %H:%M:%S:%f"
151
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
152
                      " db_timestamp: %s",
153
                      msg,
154
                      event_time.strftime(format_),
155
                      db_time.strftime(format_))
156
            return
157
        # New message. Update the database!
158
        func(target, msg, event_time)
159

    
160
    return wrapper
161

    
162

    
163
@instance_from_msg
164
@if_update_required
165
def update_db(vm, msg, event_time):
166
    """Process a notification of type 'ganeti-op-status'"""
167
    log.debug("Processing ganeti-op-status msg: %s", msg)
168

    
169
    if msg['type'] != "ganeti-op-status":
170
        log.error("Message is of unknown type %s.", msg['type'])
171
        return
172

    
173
    nics = msg.get("nics", None)
174
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
175
                              msg['status'], msg['logmsg'], nics)
176

    
177
    log.debug("Done processing ganeti-op-status msg for vm %s.",
178
              msg['instance'])
179

    
180

    
181
@network_from_msg
182
@if_update_required
183
def update_network(network, msg, event_time):
184
    """Process a notification of type 'ganeti-network-status'"""
185
    log.debug("Processing ganeti-network-status msg: %s", msg)
186

    
187
    if msg['type'] != "ganeti-network-status":
188
        log.error("Message is of unknown type %s.", msg['type'])
189
        return
190

    
191
    opcode = msg['operation']
192
    status = msg['status']
193
    jobid = msg['jobId']
194

    
195
    if opcode == "OP_NETWORK_SET_PARAMS":
196
        backend.process_network_modify(network, event_time, jobid, opcode,
197
                                       status, msg['add_reserved_ips'],
198
                                       msg['remove_reserved_ips'])
199
    else:
200
        backend.process_network_status(network, event_time, jobid, opcode,
201
                                       status, msg['logmsg'])
202

    
203
    log.debug("Done processing ganeti-network-status msg for network %s.",
204
              msg['network'])
205

    
206

    
207
@instance_from_msg
208
@if_update_required
209
def update_build_progress(vm, msg, event_time):
210
    """
211
    Process a create progress message. Update build progress, or create
212
    appropriate diagnostic entries for the virtual machine instance.
213
    """
214
    log.debug("Processing ganeti-create-progress msg: %s", msg)
215

    
216
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
217
                           'image-warning', 'image-helper'):
218
        log.error("Message is of unknown type %s", msg['type'])
219
        return
220

    
221
    if msg['type'] == 'image-copy-progress':
222
        backend.process_create_progress(vm, event_time, msg['progress'])
223
        # we do not add diagnostic messages for copy-progress messages
224
        return
225

    
226
    # default diagnostic fields
227
    source = msg['type']
228
    level = 'DEBUG'
229
    message = msg.get('messages', '')
230
    if isinstance(message, list):
231
        message = " ".join(message)
232

    
233
    details = msg.get('stderr', None)
234

    
235
    if msg['type'] == 'image-helper':
236
        # for helper task events join subtype to diagnostic source and
237
        # set task name as diagnostic message
238
        if msg.get('subtype', None):
239
            if msg.get('subtype') in ['task-start', 'task-end']:
240
                message = msg.get('task', message)
241
                source = "%s-%s" % (source, msg.get('subtype'))
242

    
243
        if msg.get('subtype', None) == 'warning':
244
            level = 'WARNING'
245

    
246
        if msg.get('subtype', None) == 'error':
247
            level = 'ERROR'
248

    
249
        if msg.get('subtype', None) == 'info':
250
            level = 'INFO'
251

    
252
    if msg['type'] == 'image-error':
253
        level = 'ERROR'
254

    
255
    if msg['type'] == 'image-warning':
256
        level = 'WARNING'
257

    
258
    if not message.strip():
259
        message = " ".join(source.split("-")).capitalize()
260

    
261
    # create the diagnostic entry
262
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
263
                                       details=details)
264

    
265
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
266
              msg['instance'])
267

    
268

    
269
def dummy_proc(client, message, *args, **kwargs):
270
    try:
271
        log.debug("Msg: %s", message['body'])
272
        client.basic_ack(message)
273
    except Exception as e:
274
        log.exception("Could not receive message %s" % e)