Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 28662998

History | View | Annotate | Download (9.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger(__name__)
43

    
44

    
45
def handle_message_delivery(func):
46
    """ Generic decorator for handling messages.
47

48
    This decorator is responsible for converting the message into json format,
49
    handling of common exceptions and acknowledment of message if needed.
50

51
    """
52
    @wraps(func)
53
    def wrapper(client, message, *args, **kwargs):
54
        try:
55
            msg = None
56
            msg = json.loads(message['body'])
57
            func(msg)
58
            client.basic_ack(message)
59
        except ValueError as e:
60
            log.error("Incoming message not in JSON format %s: %s", e, message)
61
            client.basic_nack(message)
62
        except KeyError as e:
63
            log.error("Malformed incoming JSON, missing attribute %s: %s",
64
                      e, message)
65
            client.basic_nack(message)
66
        except Exception as e:
67
            if msg:
68
                log.exception("Unexpected error: %s, msg: %s", e, msg)
69
            else:
70
                log.exception("Unexpected error: %s", e)
71
            client.basic_reject(message)
72

    
73
    return wrapper
74

    
75

    
76
def instance_from_msg(func):
77
    """ Decorator for getting the VirtualMachine object of the msg.
78

79
    """
80
    @handle_message_delivery
81
    @wraps(func)
82
    def wrapper(msg):
83
        try:
84
            vm_id = utils.id_from_instance_name(msg["instance"])
85
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
86
            func(vm, msg)
87
        except VirtualMachine.InvalidBackendIdError:
88
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
89
        except VirtualMachine.DoesNotExist:
90
            log.error("VM for instance %s with id %d not found in DB.",
91
                      msg['instance'], vm_id)
92
        except Network.InvalidBackendIdError, Network.DoesNotExist:
93
            log.error("Invalid message", msg)
94
    return wrapper
95

    
96

    
97
def network_from_msg(func):
98
    """ Decorator for getting the BackendNetwork object of the msg.
99

100
    """
101
    @handle_message_delivery
102
    @wraps(func)
103
    def wrapper(msg):
104
        try:
105
            network_id = utils.id_from_network_name(msg["network"])
106
            network = Network.objects.select_for_update().get(id=network_id)
107
            backend = Backend.objects.get(clustername=msg['cluster'])
108
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
109
                                                             backend=backend)
110
            if new:
111
                log.info("Created missing BackendNetwork %s", bnet)
112
            func(bnet, msg)
113
        except Network.InvalidBackendIdError:
114
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
115
        except Network.DoesNotExist:
116
            log.error("Network %s not found in DB.", msg['network'])
117
        except Backend.DoesNotExist:
118
            log.error("Backend %s not found in DB.", msg['cluster'])
119
        except BackendNetwork.DoesNotExist:
120
            log.error("Network %s on backend %s not found in DB.",
121
                      msg['network'], msg['cluster'])
122
    return wrapper
123

    
124

    
125
def if_update_required(func):
126
    """
127
    Decorator for checking if an incoming message needs to update the db.
128

129
    The database will not be updated in the following cases:
130
    - The message has been redelivered and the action has already been
131
      completed. In this case the event_time will be equal with the one
132
      in the database.
133
    - The message describes a previous state in the ganeti, from the one that
134
      is described in the db. In this case the event_time will be smaller from
135
      the one in the database.
136

137
    """
138
    @wraps(func)
139
    def wrapper(target, msg):
140
        try:
141
            event_time = merge_time(msg['event_time'])
142
        except:
143
            log.error("Received message with malformed time: %s",
144
                      msg['event_time'])
145
            raise KeyError
146

    
147
        db_time = target.backendtime
148

    
149
        if db_time and event_time <= db_time:
150
            format_ = "%d/%m/%y %H:%M:%S:%f"
151
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
152
                      " db_timestamp: %s",
153
                      msg,
154
                      event_time.strftime(format_),
155
                      db_time.strftime(format_))
156
            return
157
        # New message. Update the database!
158
        func(target, msg, event_time)
159

    
160
    return wrapper
161

    
162

    
163
@instance_from_msg
164
@if_update_required
165
def update_db(vm, msg, event_time):
166
    """Process a notification of type 'ganeti-op-status'"""
167
    log.debug("Processing ganeti-op-status msg: %s", msg)
168

    
169
    if msg['type'] != "ganeti-op-status":
170
        log.error("Message is of unknown type %s.", msg['type'])
171
        return
172

    
173
    nics = msg.get("nics", None)
174
    beparams = msg.get("beparams", None)
175
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
176
                              msg['status'], msg['logmsg'], nics=nics,
177
                              beparams=beparams)
178

    
179
    log.debug("Done processing ganeti-op-status msg for vm %s.",
180
              msg['instance'])
181

    
182

    
183
@network_from_msg
184
@if_update_required
185
def update_network(network, msg, event_time):
186
    """Process a notification of type 'ganeti-network-status'"""
187
    log.debug("Processing ganeti-network-status msg: %s", msg)
188

    
189
    if msg['type'] != "ganeti-network-status":
190
        log.error("Message is of unknown type %s.", msg['type'])
191
        return
192

    
193
    opcode = msg['operation']
194
    status = msg['status']
195
    jobid = msg['jobId']
196

    
197
    if opcode == "OP_NETWORK_SET_PARAMS":
198
        backend.process_network_modify(network, event_time, jobid, opcode,
199
                                       status, msg['add_reserved_ips'],
200
                                       msg['remove_reserved_ips'])
201
    else:
202
        backend.process_network_status(network, event_time, jobid, opcode,
203
                                       status, msg['logmsg'])
204

    
205
    log.debug("Done processing ganeti-network-status msg for network %s.",
206
              msg['network'])
207

    
208

    
209
@instance_from_msg
210
@if_update_required
211
def update_build_progress(vm, msg, event_time):
212
    """
213
    Process a create progress message. Update build progress, or create
214
    appropriate diagnostic entries for the virtual machine instance.
215
    """
216
    log.debug("Processing ganeti-create-progress msg: %s", msg)
217

    
218
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
219
                           'image-warning', 'image-helper'):
220
        log.error("Message is of unknown type %s", msg['type'])
221
        return
222

    
223
    if msg['type'] == 'image-copy-progress':
224
        backend.process_create_progress(vm, event_time, msg['progress'])
225
        # we do not add diagnostic messages for copy-progress messages
226
        return
227

    
228
    # default diagnostic fields
229
    source = msg['type']
230
    level = 'DEBUG'
231
    message = msg.get('messages', '')
232
    if isinstance(message, list):
233
        message = " ".join(message)
234

    
235
    details = msg.get('stderr', None)
236

    
237
    if msg['type'] == 'image-helper':
238
        # for helper task events join subtype to diagnostic source and
239
        # set task name as diagnostic message
240
        if msg.get('subtype', None):
241
            if msg.get('subtype') in ['task-start', 'task-end']:
242
                message = msg.get('task', message)
243
                source = "%s-%s" % (source, msg.get('subtype'))
244

    
245
        if msg.get('subtype', None) == 'warning':
246
            level = 'WARNING'
247

    
248
        if msg.get('subtype', None) == 'error':
249
            level = 'ERROR'
250

    
251
        if msg.get('subtype', None) == 'info':
252
            level = 'INFO'
253

    
254
    if msg['type'] == 'image-error':
255
        level = 'ERROR'
256

    
257
    if msg['type'] == 'image-warning':
258
        level = 'WARNING'
259

    
260
    if not message.strip():
261
        message = " ".join(source.split("-")).capitalize()
262

    
263
    # create the diagnostic entry
264
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
265
                                       details=details)
266

    
267
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
268
              msg['instance'])
269

    
270

    
271
def dummy_proc(client, message, *args, **kwargs):
272
    try:
273
        log.debug("Msg: %s", message['body'])
274
        client.basic_ack(message)
275
    except Exception as e:
276
        log.exception("Could not receive message %s" % e)