Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 7bfff8a6

History | View | Annotate | Download (12.5 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from django.db import transaction
38
from synnefo.db.models import (Backend, VirtualMachine, Network,
39
                               BackendNetwork, pooled_rapi_client)
40
from synnefo.logic import utils, backend as backend_mod, rapi
41

    
42
from synnefo.lib.utils import merge_time
43

    
44
log = logging.getLogger(__name__)
45

    
46

    
47
def handle_message_delivery(func):
48
    """ Generic decorator for handling messages.
49

50
    This decorator is responsible for converting the message into json format,
51
    handling of common exceptions and acknowledment of message if needed.
52

53
    """
54
    @wraps(func)
55
    def wrapper(client, message, *args, **kwargs):
56
        try:
57
            msg = None
58
            msg = json.loads(message['body'])
59
            func(msg)
60
            client.basic_ack(message)
61
        except ValueError as e:
62
            log.error("Incoming message not in JSON format %s: %s", e, message)
63
            client.basic_nack(message)
64
        except KeyError as e:
65
            log.error("Malformed incoming JSON, missing attribute %s: %s",
66
                      e, message)
67
            client.basic_nack(message)
68
        except Exception as e:
69
            if msg:
70
                log.exception("Unexpected error: %s, msg: %s", e, msg)
71
            else:
72
                log.exception("Unexpected error: %s", e)
73
            client.basic_reject(message)
74

    
75
    return wrapper
76

    
77

    
78
def instance_from_msg(func):
79
    """ Decorator for getting the VirtualMachine object of the msg.
80

81
    """
82
    @handle_message_delivery
83
    @wraps(func)
84
    def wrapper(msg):
85
        try:
86
            vm_id = utils.id_from_instance_name(msg["instance"])
87
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
88
            if vm.deleted:
89
                log.debug("Ignoring message for deleted instance '%s'", vm)
90
                return
91
            func(vm, msg)
92
        except VirtualMachine.InvalidBackendIdError:
93
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
94
        except VirtualMachine.DoesNotExist:
95
            log.error("VM for instance %s with id %d not found in DB.",
96
                      msg['instance'], vm_id)
97
        except (Network.InvalidBackendIdError, Network.DoesNotExist):
98
            log.error("Invalid message, can not find network. msg: %s", msg)
99
    return wrapper
100

    
101

    
102
def network_from_msg(func):
103
    """ Decorator for getting the BackendNetwork object of the msg.
104

105
    """
106
    @handle_message_delivery
107
    @wraps(func)
108
    def wrapper(msg):
109
        try:
110
            network_id = utils.id_from_network_name(msg["network"])
111
            network = Network.objects.select_for_update().get(id=network_id)
112
            if network.deleted:
113
                log.debug("Ignoring message for deleted network '%s'", network)
114
                return
115
            backend = Backend.objects.get(clustername=msg['cluster'])
116
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
117
                                                             backend=backend)
118
            if new:
119
                log.info("Created missing BackendNetwork %s", bnet)
120
            func(bnet, msg)
121
        except Network.InvalidBackendIdError:
122
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
123
        except Network.DoesNotExist:
124
            log.error("Network %s not found in DB.", msg['network'])
125
        except Backend.DoesNotExist:
126
            log.error("Backend %s not found in DB.", msg['cluster'])
127
        except BackendNetwork.DoesNotExist:
128
            log.error("Network %s on backend %s not found in DB.",
129
                      msg['network'], msg['cluster'])
130
    return wrapper
131

    
132

    
133
def if_update_required(func):
134
    """
135
    Decorator for checking if an incoming message needs to update the db.
136

137
    The database will not be updated in the following cases:
138
    - The message has been redelivered and the action has already been
139
      completed. In this case the event_time will be equal with the one
140
      in the database.
141
    - The message describes a previous state in the ganeti, from the one that
142
      is described in the db. In this case the event_time will be smaller from
143
      the one in the database.
144

145
    """
146
    @wraps(func)
147
    def wrapper(target, msg):
148
        try:
149
            event_time = merge_time(msg['event_time'])
150
        except:
151
            log.error("Received message with malformed time: %s",
152
                      msg['event_time'])
153
            raise KeyError
154

    
155
        db_time = target.backendtime
156

    
157
        if db_time and event_time <= db_time:
158
            format_ = "%d/%m/%y %H:%M:%S:%f"
159
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
160
                      " db_timestamp: %s",
161
                      msg,
162
                      event_time.strftime(format_),
163
                      db_time.strftime(format_))
164
            return
165
        # New message. Update the database!
166
        func(target, msg, event_time)
167

    
168
    return wrapper
169

    
170

    
171
@instance_from_msg
172
@if_update_required
173
def update_db(vm, msg, event_time):
174
    """Process a notification of type 'ganeti-op-status'"""
175
    log.debug("Processing ganeti-op-status msg: %s", msg)
176

    
177
    if msg['type'] != "ganeti-op-status":
178
        log.error("Message is of unknown type %s.", msg['type'])
179
        return
180

    
181
    operation = msg["operation"]
182
    status = msg["status"]
183
    jobID = msg["jobId"]
184
    logmsg = msg["logmsg"]
185
    nics = msg.get("instance_nics", None)
186
    job_fields = msg.get("job_fields", {})
187
    result = msg.get("result", [])
188

    
189
    # Special case: OP_INSTANCE_CREATE with opportunistic locking may fail
190
    # if all Ganeti nodes are already locked. Retry the job without
191
    # opportunistic locking..
192
    if (operation == "OP_INSTANCE_CREATE" and status == "error" and
193
       job_fields.get("opportunistic_locking", False)):
194
        try:
195
            error_code = result[1][1]
196
        except IndexError:
197
            error_code = None
198
        if error_code == rapi.ECODE_TEMP_NORES:
199
            if vm.backendjobid != jobID:  # The job has already been retried!
200
                return
201
            # Remove extra fields
202
            [job_fields.pop(f) for f in ("OP_ID", "reason")]
203
            # Remove 'pnode' and 'snode' if they were set by Ganeti iallocator.
204
            # Ganeti will fail if both allocator and nodes are specified.
205
            allocator = job_fields.pop("iallocator")
206
            if allocator is not None:
207
                [job_fields.pop(f) for f in ("pnode", "snode")]
208
            name = job_fields.pop("name", job_fields.pop("instance_name"))
209
            # Turn off opportunistic locking before retrying the job
210
            job_fields["opportunistic_locking"] = False
211
            with pooled_rapi_client(vm) as c:
212
                jobID = c.CreateInstance(name=name, **job_fields)
213
            # Update the VM fields
214
            vm.backendjobid = jobID
215
            # Update the task_job_id for commissions
216
            vm.task_job_id = jobID
217
            vm.backendjobstatus = None
218
            vm.save()
219
            log.info("Retrying failed creation of instance '%s' without"
220
                     " opportunistic locking. New job ID: '%s'", name, jobID)
221
            return
222

    
223
    backend_mod.process_op_status(vm, event_time, jobID,
224
                                  operation, status,
225
                                  logmsg, nics=nics,
226
                                  job_fields=job_fields)
227

    
228
    log.debug("Done processing ganeti-op-status msg for vm %s.",
229
              msg['instance'])
230

    
231

    
232
@network_from_msg
233
@if_update_required
234
def update_network(network, msg, event_time):
235
    """Process a notification of type 'ganeti-network-status'"""
236
    log.debug("Processing ganeti-network-status msg: %s", msg)
237

    
238
    if msg['type'] != "ganeti-network-status":
239
        log.error("Message is of unknown type %s.", msg['type'])
240
        return
241

    
242
    opcode = msg['operation']
243
    status = msg['status']
244
    jobid = msg['jobId']
245
    job_fields = msg.get('job_fields', {})
246

    
247
    if opcode == "OP_NETWORK_SET_PARAMS":
248
        backend_mod.process_network_modify(network, event_time, jobid, opcode,
249
                                           status, job_fields)
250
    else:
251
        backend_mod.process_network_status(network, event_time, jobid, opcode,
252
                                           status, msg['logmsg'])
253

    
254
    log.debug("Done processing ganeti-network-status msg for network %s.",
255
              msg['network'])
256

    
257

    
258
@instance_from_msg
259
@if_update_required
260
def update_build_progress(vm, msg, event_time):
261
    """
262
    Process a create progress message. Update build progress, or create
263
    appropriate diagnostic entries for the virtual machine instance.
264
    """
265
    log.debug("Processing ganeti-create-progress msg: %s", msg)
266

    
267
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
268
                           'image-warning', 'image-helper'):
269
        log.error("Message is of unknown type %s", msg['type'])
270
        return
271

    
272
    if msg['type'] == 'image-copy-progress':
273
        backend_mod.process_create_progress(vm, event_time, msg['progress'])
274
        # we do not add diagnostic messages for copy-progress messages
275
        return
276

    
277
    # default diagnostic fields
278
    source = msg['type']
279
    level = 'DEBUG'
280
    message = msg.get('messages', '')
281
    if isinstance(message, list):
282
        message = " ".join(message)
283

    
284
    details = msg.get('stderr', None)
285

    
286
    if msg['type'] == 'image-helper':
287
        # for helper task events join subtype to diagnostic source and
288
        # set task name as diagnostic message
289
        if msg.get('subtype', None):
290
            if msg.get('subtype') in ['task-start', 'task-end']:
291
                message = msg.get('task', message)
292
                source = "%s-%s" % (source, msg.get('subtype'))
293

    
294
        if msg.get('subtype', None) == 'warning':
295
            level = 'WARNING'
296

    
297
        if msg.get('subtype', None) == 'error':
298
            level = 'ERROR'
299

    
300
        if msg.get('subtype', None) == 'info':
301
            level = 'INFO'
302

    
303
    if msg['type'] == 'image-error':
304
        level = 'ERROR'
305

    
306
    if msg['type'] == 'image-warning':
307
        level = 'WARNING'
308

    
309
    if not message.strip():
310
        message = " ".join(source.split("-")).capitalize()
311

    
312
    # create the diagnostic entry
313
    backend_mod.create_instance_diagnostic(vm, message, source, level,
314
                                           event_time, details=details)
315

    
316
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
317
              msg['instance'])
318

    
319

    
320
@handle_message_delivery
321
@transaction.commit_on_success()
322
def update_cluster(msg):
323
    operation = msg.get("operation")
324
    clustername = msg.get("cluster")
325
    if clustername is None:
326
        return
327
    if operation != "OP_CLUSTER_SET_PARAMS":
328
        return
329
    backend = Backend.objects.select_for_update().get(clustername=clustername)
330
    backend_mod.update_backend_disk_templates(backend)
331
    backend_mod.update_backend_resources(backend)
332

    
333

    
334
def dummy_proc(client, message, *args, **kwargs):
335
    try:
336
        log.debug("Msg: %s", message['body'])
337
        client.basic_ack(message)
338
    except Exception as e:
339
        log.exception("Could not receive message %s" % e)