Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ aee560b0

History | View | Annotate | Download (11.6 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, pooled_rapi_client)
39
from synnefo.logic import utils, backend, rapi
40

    
41
from synnefo.lib.utils import merge_time
42

    
43
log = logging.getLogger(__name__)
44

    
45

    
46
def handle_message_delivery(func):
47
    """ Generic decorator for handling messages.
48

49
    This decorator is responsible for converting the message into json format,
50
    handling of common exceptions and acknowledment of message if needed.
51

52
    """
53
    @wraps(func)
54
    def wrapper(client, message, *args, **kwargs):
55
        try:
56
            msg = None
57
            msg = json.loads(message['body'])
58
            func(msg)
59
            client.basic_ack(message)
60
        except ValueError as e:
61
            log.error("Incoming message not in JSON format %s: %s", e, message)
62
            client.basic_nack(message)
63
        except KeyError as e:
64
            log.error("Malformed incoming JSON, missing attribute %s: %s",
65
                      e, message)
66
            client.basic_nack(message)
67
        except Exception as e:
68
            if msg:
69
                log.exception("Unexpected error: %s, msg: %s", e, msg)
70
            else:
71
                log.exception("Unexpected error: %s", e)
72
            client.basic_reject(message)
73

    
74
    return wrapper
75

    
76

    
77
def instance_from_msg(func):
78
    """ Decorator for getting the VirtualMachine object of the msg.
79

80
    """
81
    @handle_message_delivery
82
    @wraps(func)
83
    def wrapper(msg):
84
        try:
85
            vm_id = utils.id_from_instance_name(msg["instance"])
86
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
87
            func(vm, msg)
88
        except VirtualMachine.InvalidBackendIdError:
89
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
90
        except VirtualMachine.DoesNotExist:
91
            log.error("VM for instance %s with id %d not found in DB.",
92
                      msg['instance'], vm_id)
93
        except (Network.InvalidBackendIdError, Network.DoesNotExist) as e:
94
            log.error("Invalid message, can not find network. msg: %s", msg)
95
    return wrapper
96

    
97

    
98
def network_from_msg(func):
99
    """ Decorator for getting the BackendNetwork object of the msg.
100

101
    """
102
    @handle_message_delivery
103
    @wraps(func)
104
    def wrapper(msg):
105
        try:
106
            network_id = utils.id_from_network_name(msg["network"])
107
            network = Network.objects.select_for_update().get(id=network_id)
108
            backend = Backend.objects.get(clustername=msg['cluster'])
109
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
110
                                                             backend=backend)
111
            if new:
112
                log.info("Created missing BackendNetwork %s", bnet)
113
            func(bnet, msg)
114
        except Network.InvalidBackendIdError:
115
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
116
        except Network.DoesNotExist:
117
            log.error("Network %s not found in DB.", msg['network'])
118
        except Backend.DoesNotExist:
119
            log.error("Backend %s not found in DB.", msg['cluster'])
120
        except BackendNetwork.DoesNotExist:
121
            log.error("Network %s on backend %s not found in DB.",
122
                      msg['network'], msg['cluster'])
123
    return wrapper
124

    
125

    
126
def if_update_required(func):
127
    """
128
    Decorator for checking if an incoming message needs to update the db.
129

130
    The database will not be updated in the following cases:
131
    - The message has been redelivered and the action has already been
132
      completed. In this case the event_time will be equal with the one
133
      in the database.
134
    - The message describes a previous state in the ganeti, from the one that
135
      is described in the db. In this case the event_time will be smaller from
136
      the one in the database.
137

138
    """
139
    @wraps(func)
140
    def wrapper(target, msg):
141
        try:
142
            event_time = merge_time(msg['event_time'])
143
        except:
144
            log.error("Received message with malformed time: %s",
145
                      msg['event_time'])
146
            raise KeyError
147

    
148
        db_time = target.backendtime
149

    
150
        if db_time and event_time <= db_time:
151
            format_ = "%d/%m/%y %H:%M:%S:%f"
152
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
153
                      " db_timestamp: %s",
154
                      msg,
155
                      event_time.strftime(format_),
156
                      db_time.strftime(format_))
157
            return
158
        # New message. Update the database!
159
        func(target, msg, event_time)
160

    
161
    return wrapper
162

    
163

    
164
@instance_from_msg
165
@if_update_required
166
def update_db(vm, msg, event_time):
167
    """Process a notification of type 'ganeti-op-status'"""
168
    log.debug("Processing ganeti-op-status msg: %s", msg)
169

    
170
    if msg['type'] != "ganeti-op-status":
171
        log.error("Message is of unknown type %s.", msg['type'])
172
        return
173

    
174
    operation = msg["operation"]
175
    status = msg["status"]
176
    jobID = msg["jobId"]
177
    logmsg = msg["logmsg"]
178
    nics = msg.get("nics", None)
179
    job_fields = msg.get("job_fields", {})
180
    result = msg.get("result", [])
181

    
182
    # Special case: OP_INSTANCE_CREATE with opportunistic locking may fail
183
    # if all Ganeti nodes are already locked. Retry the job without
184
    # opportunistic locking..
185
    if (operation == "OP_INSTANCE_CREATE" and status == "error" and
186
       job_fields.get("opportunistic_locking", False)):
187
        try:
188
            error_code = result[1][1]
189
        except IndexError:
190
            error_code = None
191
        if error_code == rapi.ECODE_TEMP_NORES:
192
            if vm.backendjobid != jobID:  # The job has already been retried!
193
                return
194
            # Remove extra fields
195
            [job_fields.pop(f) for f in ("OP_ID", "reason")]
196
            # Remove 'pnode' and 'snode' if they were set by Ganeti iallocator.
197
            # Ganeti will fail if both allocator and nodes are specified.
198
            allocator = job_fields.pop("iallocator")
199
            if allocator is not None:
200
                [job_fields.pop(f) for f in ("pnode", "snode")]
201
            name = job_fields.pop("name", job_fields.pop("instance_name"))
202
            # Turn off opportunistic locking before retrying the job
203
            job_fields["opportunistic_locking"] = False
204
            with pooled_rapi_client(vm) as c:
205
                jobID = c.CreateInstance(name=name, **job_fields)
206
            # Update the VM fields
207
            vm.backendjobid = jobID
208
            vm.backendjobstatus = None
209
            vm.save()
210
            log.info("Retrying failed creation of instance '%s' without"
211
                     " opportunistic locking. New job ID: '%s'", name, jobID)
212
            return
213

    
214
    backend.process_op_status(vm, event_time, jobID, operation,
215
                              status, logmsg, nics)
216

    
217
    log.debug("Done processing ganeti-op-status msg for vm %s.",
218
              msg['instance'])
219

    
220

    
221
@network_from_msg
222
@if_update_required
223
def update_network(network, msg, event_time):
224
    """Process a notification of type 'ganeti-network-status'"""
225
    log.debug("Processing ganeti-network-status msg: %s", msg)
226

    
227
    if msg['type'] != "ganeti-network-status":
228
        log.error("Message is of unknown type %s.", msg['type'])
229
        return
230

    
231
    opcode = msg['operation']
232
    status = msg['status']
233
    jobid = msg['jobId']
234

    
235
    if opcode == "OP_NETWORK_SET_PARAMS":
236
        backend.process_network_modify(network, event_time, jobid, opcode,
237
                                       status, msg['add_reserved_ips'],
238
                                       msg['remove_reserved_ips'])
239
    else:
240
        backend.process_network_status(network, event_time, jobid, opcode,
241
                                       status, msg['logmsg'])
242

    
243
    log.debug("Done processing ganeti-network-status msg for network %s.",
244
              msg['network'])
245

    
246

    
247
@instance_from_msg
248
@if_update_required
249
def update_build_progress(vm, msg, event_time):
250
    """
251
    Process a create progress message. Update build progress, or create
252
    appropriate diagnostic entries for the virtual machine instance.
253
    """
254
    log.debug("Processing ganeti-create-progress msg: %s", msg)
255

    
256
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
257
                           'image-warning', 'image-helper'):
258
        log.error("Message is of unknown type %s", msg['type'])
259
        return
260

    
261
    if msg['type'] == 'image-copy-progress':
262
        backend.process_create_progress(vm, event_time, msg['progress'])
263
        # we do not add diagnostic messages for copy-progress messages
264
        return
265

    
266
    # default diagnostic fields
267
    source = msg['type']
268
    level = 'DEBUG'
269
    message = msg.get('messages', '')
270
    if isinstance(message, list):
271
        message = " ".join(message)
272

    
273
    details = msg.get('stderr', None)
274

    
275
    if msg['type'] == 'image-helper':
276
        # for helper task events join subtype to diagnostic source and
277
        # set task name as diagnostic message
278
        if msg.get('subtype', None):
279
            if msg.get('subtype') in ['task-start', 'task-end']:
280
                message = msg.get('task', message)
281
                source = "%s-%s" % (source, msg.get('subtype'))
282

    
283
        if msg.get('subtype', None) == 'warning':
284
            level = 'WARNING'
285

    
286
        if msg.get('subtype', None) == 'error':
287
            level = 'ERROR'
288

    
289
        if msg.get('subtype', None) == 'info':
290
            level = 'INFO'
291

    
292
    if msg['type'] == 'image-error':
293
        level = 'ERROR'
294

    
295
    if msg['type'] == 'image-warning':
296
        level = 'WARNING'
297

    
298
    if not message.strip():
299
        message = " ".join(source.split("-")).capitalize()
300

    
301
    # create the diagnostic entry
302
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
303
                                       details=details)
304

    
305
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
306
              msg['instance'])
307

    
308

    
309
def dummy_proc(client, message, *args, **kwargs):
310
    try:
311
        log.debug("Msg: %s", message['body'])
312
        client.basic_ack(message)
313
    except Exception as e:
314
        log.exception("Could not receive message %s" % e)