Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 727fb2f9

History | View | Annotate | Download (11 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import (Backend, VirtualMachine, Network,
38
                               BackendNetwork, pooled_rapi_client)
39
from synnefo.logic import utils, backend
40

    
41
from synnefo.lib.utils import merge_time
42

    
43
log = logging.getLogger(__name__)
44

    
45

    
46
def handle_message_delivery(func):
47
    """ Generic decorator for handling messages.
48

49
    This decorator is responsible for converting the message into json format,
50
    handling of common exceptions and acknowledment of message if needed.
51

52
    """
53
    @wraps(func)
54
    def wrapper(client, message, *args, **kwargs):
55
        try:
56
            msg = None
57
            msg = json.loads(message['body'])
58
            func(msg)
59
            client.basic_ack(message)
60
        except ValueError as e:
61
            log.error("Incoming message not in JSON format %s: %s", e, message)
62
            client.basic_nack(message)
63
        except KeyError as e:
64
            log.error("Malformed incoming JSON, missing attribute %s: %s",
65
                      e, message)
66
            client.basic_nack(message)
67
        except Exception as e:
68
            if msg:
69
                log.exception("Unexpected error: %s, msg: %s", e, msg)
70
            else:
71
                log.exception("Unexpected error: %s", e)
72
            client.basic_reject(message)
73

    
74
    return wrapper
75

    
76

    
77
def instance_from_msg(func):
78
    """ Decorator for getting the VirtualMachine object of the msg.
79

80
    """
81
    @handle_message_delivery
82
    @wraps(func)
83
    def wrapper(msg):
84
        try:
85
            vm_id = utils.id_from_instance_name(msg["instance"])
86
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
87
            func(vm, msg)
88
        except VirtualMachine.InvalidBackendIdError:
89
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
90
        except VirtualMachine.DoesNotExist:
91
            log.error("VM for instance %s with id %d not found in DB.",
92
                      msg['instance'], vm_id)
93
        except (Network.InvalidBackendIdError, Network.DoesNotExist) as e:
94
            log.error("Invalid message, can not find network. msg: %s", msg)
95
    return wrapper
96

    
97

    
98
def network_from_msg(func):
99
    """ Decorator for getting the BackendNetwork object of the msg.
100

101
    """
102
    @handle_message_delivery
103
    @wraps(func)
104
    def wrapper(msg):
105
        try:
106
            network_id = utils.id_from_network_name(msg["network"])
107
            network = Network.objects.select_for_update().get(id=network_id)
108
            backend = Backend.objects.get(clustername=msg['cluster'])
109
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
110
                                                             backend=backend)
111
            if new:
112
                log.info("Created missing BackendNetwork %s", bnet)
113
            func(bnet, msg)
114
        except Network.InvalidBackendIdError:
115
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
116
        except Network.DoesNotExist:
117
            log.error("Network %s not found in DB.", msg['network'])
118
        except Backend.DoesNotExist:
119
            log.error("Backend %s not found in DB.", msg['cluster'])
120
        except BackendNetwork.DoesNotExist:
121
            log.error("Network %s on backend %s not found in DB.",
122
                      msg['network'], msg['cluster'])
123
    return wrapper
124

    
125

    
126
def if_update_required(func):
127
    """
128
    Decorator for checking if an incoming message needs to update the db.
129

130
    The database will not be updated in the following cases:
131
    - The message has been redelivered and the action has already been
132
      completed. In this case the event_time will be equal with the one
133
      in the database.
134
    - The message describes a previous state in the ganeti, from the one that
135
      is described in the db. In this case the event_time will be smaller from
136
      the one in the database.
137

138
    """
139
    @wraps(func)
140
    def wrapper(target, msg):
141
        try:
142
            event_time = merge_time(msg['event_time'])
143
        except:
144
            log.error("Received message with malformed time: %s",
145
                      msg['event_time'])
146
            raise KeyError
147

    
148
        db_time = target.backendtime
149

    
150
        if db_time and event_time <= db_time:
151
            format_ = "%d/%m/%y %H:%M:%S:%f"
152
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
153
                      " db_timestamp: %s",
154
                      msg,
155
                      event_time.strftime(format_),
156
                      db_time.strftime(format_))
157
            return
158
        # New message. Update the database!
159
        func(target, msg, event_time)
160

    
161
    return wrapper
162

    
163

    
164
@instance_from_msg
165
@if_update_required
166
def update_db(vm, msg, event_time):
167
    """Process a notification of type 'ganeti-op-status'"""
168
    log.debug("Processing ganeti-op-status msg: %s", msg)
169

    
170
    if msg['type'] != "ganeti-op-status":
171
        log.error("Message is of unknown type %s.", msg['type'])
172
        return
173

    
174
    operation = msg["operation"]
175
    status = msg["status"]
176
    jobID = msg["jobId"]
177
    logmsg = msg["logmsg"]
178
    nics = msg.get("nics", None)
179
    job_fields = msg.get("job_fields", {})
180

    
181
    # Special case: OP_INSTANCE_CREATE with opportunistic locking may fail
182
    # if all Ganeti nodes are already locked. Retry the job without
183
    # opportunistic locking..
184
    if (operation == "OP_INSTANCE_CREATE" and status == "error" and
185
       job_fields.get("opportunistic_locking", False)):
186
        if vm.backendjobid != jobID:  # The job has already been retried!
187
            return
188
        # Remove extra fields
189
        [job_fields.pop(f) for f in ("OP_ID", "reason")]
190
        name = job_fields.pop("name", job_fields.pop("instance_name"))
191
        # Turn off opportunistic locking before retrying the job
192
        job_fields["opportunistic_locking"] = False
193
        with pooled_rapi_client(vm) as c:
194
            jobID = c.CreateInstance(name=name, **job_fields)
195
        # Update the VM fields
196
        vm.backendjobid = jobID
197
        vm.backendjobstatus = None
198
        vm.save()
199
        log.info("Retrying failed creation of instance '%s' without"
200
                 " opportunistic locking. New job ID: '%s'", name, jobID)
201
        return
202

    
203
    backend.process_op_status(vm, event_time, jobID, operation,
204
                              status, logmsg, nics)
205

    
206
    log.debug("Done processing ganeti-op-status msg for vm %s.",
207
              msg['instance'])
208

    
209

    
210
@network_from_msg
211
@if_update_required
212
def update_network(network, msg, event_time):
213
    """Process a notification of type 'ganeti-network-status'"""
214
    log.debug("Processing ganeti-network-status msg: %s", msg)
215

    
216
    if msg['type'] != "ganeti-network-status":
217
        log.error("Message is of unknown type %s.", msg['type'])
218
        return
219

    
220
    opcode = msg['operation']
221
    status = msg['status']
222
    jobid = msg['jobId']
223

    
224
    if opcode == "OP_NETWORK_SET_PARAMS":
225
        backend.process_network_modify(network, event_time, jobid, opcode,
226
                                       status, msg['add_reserved_ips'],
227
                                       msg['remove_reserved_ips'])
228
    else:
229
        backend.process_network_status(network, event_time, jobid, opcode,
230
                                       status, msg['logmsg'])
231

    
232
    log.debug("Done processing ganeti-network-status msg for network %s.",
233
              msg['network'])
234

    
235

    
236
@instance_from_msg
237
@if_update_required
238
def update_build_progress(vm, msg, event_time):
239
    """
240
    Process a create progress message. Update build progress, or create
241
    appropriate diagnostic entries for the virtual machine instance.
242
    """
243
    log.debug("Processing ganeti-create-progress msg: %s", msg)
244

    
245
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
246
                           'image-warning', 'image-helper'):
247
        log.error("Message is of unknown type %s", msg['type'])
248
        return
249

    
250
    if msg['type'] == 'image-copy-progress':
251
        backend.process_create_progress(vm, event_time, msg['progress'])
252
        # we do not add diagnostic messages for copy-progress messages
253
        return
254

    
255
    # default diagnostic fields
256
    source = msg['type']
257
    level = 'DEBUG'
258
    message = msg.get('messages', '')
259
    if isinstance(message, list):
260
        message = " ".join(message)
261

    
262
    details = msg.get('stderr', None)
263

    
264
    if msg['type'] == 'image-helper':
265
        # for helper task events join subtype to diagnostic source and
266
        # set task name as diagnostic message
267
        if msg.get('subtype', None):
268
            if msg.get('subtype') in ['task-start', 'task-end']:
269
                message = msg.get('task', message)
270
                source = "%s-%s" % (source, msg.get('subtype'))
271

    
272
        if msg.get('subtype', None) == 'warning':
273
            level = 'WARNING'
274

    
275
        if msg.get('subtype', None) == 'error':
276
            level = 'ERROR'
277

    
278
        if msg.get('subtype', None) == 'info':
279
            level = 'INFO'
280

    
281
    if msg['type'] == 'image-error':
282
        level = 'ERROR'
283

    
284
    if msg['type'] == 'image-warning':
285
        level = 'WARNING'
286

    
287
    if not message.strip():
288
        message = " ".join(source.split("-")).capitalize()
289

    
290
    # create the diagnostic entry
291
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
292
                                       details=details)
293

    
294
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
295
              msg['instance'])
296

    
297

    
298
def dummy_proc(client, message, *args, **kwargs):
299
    try:
300
        log.debug("Msg: %s", message['body'])
301
        client.basic_ack(message)
302
    except Exception as e:
303
        log.exception("Could not receive message %s" % e)