Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ e6f6627c

History | View | Annotate | Download (10.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger(__name__)
43

    
44
def handle_message_delivery(func):
45
    """ Generic decorator for handling messages.
46

47
    This decorator is responsible for converting the message into json format,
48
    handling of common exceptions and acknowledment of message if needed.
49

50
    """
51
    @wraps(func)
52
    def wrapper(client, message, *args, **kwargs):
53
        try:
54
            msg = None
55
            msg = json.loads(message['body'])
56
            func(msg)
57
            client.basic_ack(message)
58
        except ValueError as e:
59
            log.error("Incoming message not in JSON format %s: %s", e, message)
60
            client.basic_nack(message)
61
        except KeyError as e:
62
            log.error("Malformed incoming JSON, missing attribute %s: %s",
63
                      e, message)
64
            client.basic_nack(message)
65
        except Exception as e:
66
            if msg:
67
                log.exception("Unexpected error: %s, msg: %s", e, msg)
68
            else:
69
                log.exception("Unexpected error: %s", e)
70
            client.basic_reject(message)
71

    
72
    return wrapper
73

    
74

    
75
def instance_from_msg(func):
76
    """ Decorator for getting the VirtualMachine object of the msg.
77

78
    """
79
    @handle_message_delivery
80
    @wraps(func)
81
    def wrapper(msg):
82
        try:
83
            vm_id = utils.id_from_instance_name(msg["instance"])
84
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
85
            func(vm, msg)
86
        except VirtualMachine.InvalidBackendIdError:
87
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
88
        except VirtualMachine.DoesNotExist:
89
            log.error("VM for instance %s with id %d not found in DB.",
90
                      msg['instance'], vm_id)
91
        except Network.InvalidBackendIdError, Network.DoesNotExist:
92
            log.error("Invalid message", msg)
93
    return wrapper
94

    
95

    
96
def network_from_msg(func):
97
    """ Decorator for getting the BackendNetwork object of the msg.
98

99
    """
100
    @handle_message_delivery
101
    @wraps(func)
102
    def wrapper(msg):
103
        try:
104
            network_id = utils.id_from_network_name(msg["network"])
105
            network = Network.objects.select_for_update().get(id=network_id)
106
            backend = Backend.objects.get(clustername=msg['cluster'])
107
            backend_network = BackendNetwork.objects.get(network=network,
108
                                                         backend=backend)
109
            func(backend_network, msg)
110
        except Network.InvalidBackendIdError:
111
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
112
        except Network.DoesNotExist:
113
            log.error("Network %s not found in DB.", msg['network'])
114
        except Backend.DoesNotExist:
115
            log.error("Backend %s not found in DB.", msg['cluster'])
116
        except BackendNetwork.DoesNotExist:
117
            log.error("Network %s on backend %s not found in DB.",
118
                      msg['network'], msg['cluster'])
119
    return wrapper
120

    
121

    
122
def if_update_required(func):
123
    """
124
    Decorator for checking if an incoming message needs to update the db.
125

126
    The database will not be updated in the following cases:
127
    - The message has been redelivered and the action has already been
128
      completed. In this case the event_time will be equal with the one
129
      in the database.
130
    - The message describes a previous state in the ganeti, from the one that
131
      is described in the db. In this case the event_time will be smaller from
132
      the one in the database.
133

134
    """
135
    @wraps(func)
136
    def wrapper(target, msg):
137
        try:
138
            event_time = merge_time(msg['event_time'])
139
        except:
140
            log.error("Received message with malformed time: %s",
141
                      msg['event_time'])
142
            raise KeyError
143

    
144
        db_time = target.backendtime
145

    
146
        if db_time and event_time <= db_time:
147
            format_ = "%d/%m/%y %H:%M:%S:%f"
148
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
149
                      msg,
150
                      event_time.strftime(format_),
151
                      db_time.strftime(format_))
152
            return
153
        # New message. Update the database!
154
        func(target, msg, event_time)
155

    
156
    return wrapper
157

    
158

    
159
@instance_from_msg
160
@if_update_required
161
def update_db(vm, msg, event_time):
162
    """Process a notification of type 'ganeti-op-status'"""
163
    log.debug("Processing ganeti-op-status msg: %s", msg)
164

    
165
    if msg['type'] != "ganeti-op-status":
166
        log.error("Message is of unknown type %s.", msg['type'])
167
        return
168

    
169
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
170
                              msg['status'], msg['logmsg'])
171

    
172
    log.debug("Done processing ganeti-op-status msg for vm %s.",
173
              msg['instance'])
174

    
175

    
176
@instance_from_msg
177
@if_update_required
178
def update_net(vm, msg, event_time):
179
    """Process a notification of type 'ganeti-net-status'"""
180
    log.debug("Processing ganeti-net-status msg: %s", msg)
181

    
182
    if msg['type'] != "ganeti-net-status":
183
        log.error("Message is of unknown type %s", msg['type'])
184
        return
185

    
186
    backend.process_net_status(vm, event_time, msg['nics'])
187

    
188
    log.debug("Done processing ganeti-net-status msg for vm %s.",
189
              msg["instance"])
190

    
191

    
192
@network_from_msg
193
@if_update_required
194
def update_network(network, msg, event_time):
195
    """Process a notification of type 'ganeti-network-status'"""
196
    log.debug("Processing ganeti-network-status msg: %s", msg)
197

    
198
    if msg['type'] != "ganeti-network-status":
199
        log.error("Message is of unknown type %s.", msg['type'])
200
        return
201

    
202
    opcode = msg['operation']
203
    status = msg['status']
204
    jobid = msg['jobId']
205

    
206
    if opcode == "OP_NETWORK_SET_PARAMS":
207
        backend.process_network_modify(network, event_time, jobid, opcode,
208
                                       status, msg['add_reserved_ips'],
209
                                       msg['remove_reserved_ips'])
210
    else:
211
        backend.process_network_status(network, event_time, jobid, opcode,
212
                                       status, msg['logmsg'])
213

    
214
    log.debug("Done processing ganeti-network-status msg for network %s.",
215
              msg['network'])
216

    
217

    
218
@instance_from_msg
219
@if_update_required
220
def update_build_progress(vm, msg, event_time):
221
    """
222
    Process a create progress message. Update build progress, or create
223
    appropriate diagnostic entries for the virtual machine instance.
224
    """
225
    log.debug("Processing ganeti-create-progress msg: %s", msg)
226

    
227
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
228
                           'image-warning', 'image-helper'):
229
        log.error("Message is of unknown type %s", msg['type'])
230
        return
231

    
232
    if msg['type'] == 'image-copy-progress':
233
        backend.process_create_progress(vm, event_time, msg['progress'])
234
        # we do not add diagnostic messages for copy-progress messages
235
        return
236

    
237
    # default diagnostic fields
238
    source = msg['type']
239
    level = 'DEBUG'
240
    message = msg.get('messages', None)
241
    if isinstance(message, list):
242
        message = " ".join(message)
243

    
244
    details = msg.get('stderr', None)
245

    
246
    if msg['type'] == 'image-helper':
247
        # for helper task events join subtype to diagnostic source and
248
        # set task name as diagnostic message
249
        if msg.get('subtype', None) and msg.get('subtype') in ['task-start',
250
              'task-end']:
251
            message = msg.get('task', message)
252
            source = "%s-%s" % (source, msg.get('subtype'))
253

    
254
        if msg.get('subtype', None) == 'warning':
255
            level = 'WARNING'
256

    
257
        if msg.get('subtype', None) == 'error':
258
            level = 'ERROR'
259

    
260
        if msg.get('subtype', None) == 'info':
261
            level = 'INFO'
262

    
263
    if msg['type'] == 'image-error':
264
        level = 'ERROR'
265

    
266
    if msg['type'] == 'image-warning':
267
        level = 'WARNING'
268

    
269
    if not message.strip():
270
        message = " ".join(source.split("-")).capitalize()
271

    
272
    # create the diagnostic entry
273
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
274
        details=details)
275

    
276
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
277
              msg['instance'])
278

    
279

    
280
def dummy_proc(client, message, *args, **kwargs):
281
    try:
282
        log.debug("Msg: %s", message['body'])
283
        client.basic_ack(message)
284
    except Exception as e:
285
        log.exception("Could not receive message %s" % e)