Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 475d4a85

History | View | Annotate | Download (12.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from django.db import transaction
38
from synnefo.db.models import (Backend, VirtualMachine, Network,
39
                               BackendNetwork, pooled_rapi_client)
40
from synnefo.logic import utils, backend as backend_mod, rapi
41

    
42
from synnefo.lib.utils import merge_time
43

    
44
log = logging.getLogger(__name__)
45

    
46

    
47
def handle_message_delivery(func):
48
    """ Generic decorator for handling messages.
49

50
    This decorator is responsible for converting the message into json format,
51
    handling of common exceptions and acknowledment of message if needed.
52

53
    """
54
    @wraps(func)
55
    def wrapper(client, message, *args, **kwargs):
56
        try:
57
            msg = None
58
            msg = json.loads(message['body'])
59
            func(msg)
60
            client.basic_ack(message)
61
        except ValueError as e:
62
            log.error("Incoming message not in JSON format %s: %s", e, message)
63
            client.basic_nack(message)
64
        except KeyError as e:
65
            log.error("Malformed incoming JSON, missing attribute %s: %s",
66
                      e, message)
67
            client.basic_nack(message)
68
        except Exception as e:
69
            if msg:
70
                log.exception("Unexpected error: %s, msg: %s", e, msg)
71
            else:
72
                log.exception("Unexpected error: %s", e)
73
            client.basic_reject(message)
74

    
75
    return wrapper
76

    
77

    
78
def instance_from_msg(func):
79
    """ Decorator for getting the VirtualMachine object of the msg.
80

81
    """
82
    @handle_message_delivery
83
    @wraps(func)
84
    def wrapper(msg):
85
        try:
86
            vm_id = utils.id_from_instance_name(msg["instance"])
87
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
88
            func(vm, msg)
89
        except VirtualMachine.InvalidBackendIdError:
90
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
91
        except VirtualMachine.DoesNotExist:
92
            log.error("VM for instance %s with id %d not found in DB.",
93
                      msg['instance'], vm_id)
94
        except (Network.InvalidBackendIdError, Network.DoesNotExist) as e:
95
            log.error("Invalid message, can not find network. msg: %s", msg)
96
    return wrapper
97

    
98

    
99
def network_from_msg(func):
100
    """ Decorator for getting the BackendNetwork object of the msg.
101

102
    """
103
    @handle_message_delivery
104
    @wraps(func)
105
    def wrapper(msg):
106
        try:
107
            network_id = utils.id_from_network_name(msg["network"])
108
            network = Network.objects.select_for_update().get(id=network_id)
109
            backend = Backend.objects.get(clustername=msg['cluster'])
110
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
111
                                                             backend=backend)
112
            if new:
113
                log.info("Created missing BackendNetwork %s", bnet)
114
            func(bnet, msg)
115
        except Network.InvalidBackendIdError:
116
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
117
        except Network.DoesNotExist:
118
            log.error("Network %s not found in DB.", msg['network'])
119
        except Backend.DoesNotExist:
120
            log.error("Backend %s not found in DB.", msg['cluster'])
121
        except BackendNetwork.DoesNotExist:
122
            log.error("Network %s on backend %s not found in DB.",
123
                      msg['network'], msg['cluster'])
124
    return wrapper
125

    
126

    
127
def if_update_required(func):
128
    """
129
    Decorator for checking if an incoming message needs to update the db.
130

131
    The database will not be updated in the following cases:
132
    - The message has been redelivered and the action has already been
133
      completed. In this case the event_time will be equal with the one
134
      in the database.
135
    - The message describes a previous state in the ganeti, from the one that
136
      is described in the db. In this case the event_time will be smaller from
137
      the one in the database.
138

139
    """
140
    @wraps(func)
141
    def wrapper(target, msg):
142
        try:
143
            event_time = merge_time(msg['event_time'])
144
        except:
145
            log.error("Received message with malformed time: %s",
146
                      msg['event_time'])
147
            raise KeyError
148

    
149
        db_time = target.backendtime
150

    
151
        if db_time and event_time <= db_time:
152
            format_ = "%d/%m/%y %H:%M:%S:%f"
153
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
154
                      " db_timestamp: %s",
155
                      msg,
156
                      event_time.strftime(format_),
157
                      db_time.strftime(format_))
158
            return
159
        # New message. Update the database!
160
        func(target, msg, event_time)
161

    
162
    return wrapper
163

    
164

    
165
@instance_from_msg
166
@if_update_required
167
def update_db(vm, msg, event_time):
168
    """Process a notification of type 'ganeti-op-status'"""
169
    log.debug("Processing ganeti-op-status msg: %s", msg)
170

    
171
    if msg['type'] != "ganeti-op-status":
172
        log.error("Message is of unknown type %s.", msg['type'])
173
        return
174

    
175
    operation = msg["operation"]
176
    status = msg["status"]
177
    jobID = msg["jobId"]
178
    logmsg = msg["logmsg"]
179
    nics = msg.get("instance_nics", None)
180
    job_fields = msg.get("job_fields", {})
181
    result = msg.get("result", [])
182

    
183
    # Special case: OP_INSTANCE_CREATE with opportunistic locking may fail
184
    # if all Ganeti nodes are already locked. Retry the job without
185
    # opportunistic locking..
186
    if (operation == "OP_INSTANCE_CREATE" and status == "error" and
187
       job_fields.get("opportunistic_locking", False)):
188
        try:
189
            error_code = result[1][1]
190
        except IndexError:
191
            error_code = None
192
        if error_code == rapi.ECODE_TEMP_NORES:
193
            if vm.backendjobid != jobID:  # The job has already been retried!
194
                return
195
            # Remove extra fields
196
            [job_fields.pop(f) for f in ("OP_ID", "reason")]
197
            # Remove 'pnode' and 'snode' if they were set by Ganeti iallocator.
198
            # Ganeti will fail if both allocator and nodes are specified.
199
            allocator = job_fields.pop("iallocator")
200
            if allocator is not None:
201
                [job_fields.pop(f) for f in ("pnode", "snode")]
202
            name = job_fields.pop("name", job_fields.pop("instance_name"))
203
            # Turn off opportunistic locking before retrying the job
204
            job_fields["opportunistic_locking"] = False
205
            with pooled_rapi_client(vm) as c:
206
                jobID = c.CreateInstance(name=name, **job_fields)
207
            # Update the VM fields
208
            vm.backendjobid = jobID
209
            # Update the task_job_id for commissions
210
            vm.task_job_id = jobID
211
            vm.backendjobstatus = None
212
            vm.save()
213
            log.info("Retrying failed creation of instance '%s' without"
214
                     " opportunistic locking. New job ID: '%s'", name, jobID)
215
            return
216

    
217
    backend_mod.process_op_status(vm, event_time, jobID,
218
                                  operation, status,
219
                                  logmsg, nics=nics,
220
                                  job_fields=job_fields)
221

    
222
    log.debug("Done processing ganeti-op-status msg for vm %s.",
223
              msg['instance'])
224

    
225

    
226
@network_from_msg
227
@if_update_required
228
def update_network(network, msg, event_time):
229
    """Process a notification of type 'ganeti-network-status'"""
230
    log.debug("Processing ganeti-network-status msg: %s", msg)
231

    
232
    if msg['type'] != "ganeti-network-status":
233
        log.error("Message is of unknown type %s.", msg['type'])
234
        return
235

    
236
    opcode = msg['operation']
237
    status = msg['status']
238
    jobid = msg['jobId']
239
    job_fields = msg.get('job_fields', {})
240

    
241
    if opcode == "OP_NETWORK_SET_PARAMS":
242
        backend_mod.process_network_modify(network, event_time, jobid, opcode,
243
                                           status, job_fields)
244
    else:
245
        backend_mod.process_network_status(network, event_time, jobid, opcode,
246
                                           status, msg['logmsg'])
247

    
248
    log.debug("Done processing ganeti-network-status msg for network %s.",
249
              msg['network'])
250

    
251

    
252
@instance_from_msg
253
@if_update_required
254
def update_build_progress(vm, msg, event_time):
255
    """
256
    Process a create progress message. Update build progress, or create
257
    appropriate diagnostic entries for the virtual machine instance.
258
    """
259
    log.debug("Processing ganeti-create-progress msg: %s", msg)
260

    
261
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
262
                           'image-warning', 'image-helper'):
263
        log.error("Message is of unknown type %s", msg['type'])
264
        return
265

    
266
    if msg['type'] == 'image-copy-progress':
267
        backend_mod.process_create_progress(vm, event_time, msg['progress'])
268
        # we do not add diagnostic messages for copy-progress messages
269
        return
270

    
271
    # default diagnostic fields
272
    source = msg['type']
273
    level = 'DEBUG'
274
    message = msg.get('messages', '')
275
    if isinstance(message, list):
276
        message = " ".join(message)
277

    
278
    details = msg.get('stderr', None)
279

    
280
    if msg['type'] == 'image-helper':
281
        # for helper task events join subtype to diagnostic source and
282
        # set task name as diagnostic message
283
        if msg.get('subtype', None):
284
            if msg.get('subtype') in ['task-start', 'task-end']:
285
                message = msg.get('task', message)
286
                source = "%s-%s" % (source, msg.get('subtype'))
287

    
288
        if msg.get('subtype', None) == 'warning':
289
            level = 'WARNING'
290

    
291
        if msg.get('subtype', None) == 'error':
292
            level = 'ERROR'
293

    
294
        if msg.get('subtype', None) == 'info':
295
            level = 'INFO'
296

    
297
    if msg['type'] == 'image-error':
298
        level = 'ERROR'
299

    
300
    if msg['type'] == 'image-warning':
301
        level = 'WARNING'
302

    
303
    if not message.strip():
304
        message = " ".join(source.split("-")).capitalize()
305

    
306
    # create the diagnostic entry
307
    backend_mod.create_instance_diagnostic(vm, message, source, level,
308
                                           event_time, details=details)
309

    
310
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
311
              msg['instance'])
312

    
313

    
314
@handle_message_delivery
315
@transaction.commit_on_success()
316
def update_cluster(msg):
317
    clustername = msg.get("cluster")
318
    if clustername is None:
319
        return
320
    backend = Backend.objects.select_for_update().get(clustername=clustername)
321
    backend_mod.update_backend_disk_templates(backend)
322
    backend_mod.update_backend_resources(backend)
323

    
324

    
325
def dummy_proc(client, message, *args, **kwargs):
326
    try:
327
        log.debug("Msg: %s", message['body'])
328
        client.basic_ack(message)
329
    except Exception as e:
330
        log.exception("Could not receive message %s" % e)