Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ dccd42eb

History | View | Annotate | Download (10.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger(__name__)
43

    
44

    
45
def handle_message_delivery(func):
46
    """ Generic decorator for handling messages.
47

48
    This decorator is responsible for converting the message into json format,
49
    handling of common exceptions and acknowledment of message if needed.
50

51
    """
52
    @wraps(func)
53
    def wrapper(client, message, *args, **kwargs):
54
        try:
55
            msg = None
56
            msg = json.loads(message['body'])
57
            func(msg)
58
            client.basic_ack(message)
59
        except ValueError as e:
60
            log.error("Incoming message not in JSON format %s: %s", e, message)
61
            client.basic_nack(message)
62
        except KeyError as e:
63
            log.error("Malformed incoming JSON, missing attribute %s: %s",
64
                      e, message)
65
            client.basic_nack(message)
66
        except Exception as e:
67
            if msg:
68
                log.exception("Unexpected error: %s, msg: %s", e, msg)
69
            else:
70
                log.exception("Unexpected error: %s", e)
71
            client.basic_reject(message)
72

    
73
    return wrapper
74

    
75

    
76
def instance_from_msg(func):
77
    """ Decorator for getting the VirtualMachine object of the msg.
78

79
    """
80
    @handle_message_delivery
81
    @wraps(func)
82
    def wrapper(msg):
83
        try:
84
            vm_id = utils.id_from_instance_name(msg["instance"])
85
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
86
            func(vm, msg)
87
        except VirtualMachine.InvalidBackendIdError:
88
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
89
        except VirtualMachine.DoesNotExist:
90
            log.error("VM for instance %s with id %d not found in DB.",
91
                      msg['instance'], vm_id)
92
        except Network.InvalidBackendIdError, Network.DoesNotExist:
93
            log.error("Invalid message", msg)
94
    return wrapper
95

    
96

    
97
def network_from_msg(func):
98
    """ Decorator for getting the BackendNetwork object of the msg.
99

100
    """
101
    @handle_message_delivery
102
    @wraps(func)
103
    def wrapper(msg):
104
        try:
105
            network_id = utils.id_from_network_name(msg["network"])
106
            network = Network.objects.select_for_update().get(id=network_id)
107
            backend = Backend.objects.get(clustername=msg['cluster'])
108
            backend_network = BackendNetwork.objects.get(network=network,
109
                                                         backend=backend)
110
            func(backend_network, msg)
111
        except Network.InvalidBackendIdError:
112
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
113
        except Network.DoesNotExist:
114
            log.error("Network %s not found in DB.", msg['network'])
115
        except Backend.DoesNotExist:
116
            log.error("Backend %s not found in DB.", msg['cluster'])
117
        except BackendNetwork.DoesNotExist:
118
            log.error("Network %s on backend %s not found in DB.",
119
                      msg['network'], msg['cluster'])
120
    return wrapper
121

    
122

    
123
def if_update_required(func):
124
    """
125
    Decorator for checking if an incoming message needs to update the db.
126

127
    The database will not be updated in the following cases:
128
    - The message has been redelivered and the action has already been
129
      completed. In this case the event_time will be equal with the one
130
      in the database.
131
    - The message describes a previous state in the ganeti, from the one that
132
      is described in the db. In this case the event_time will be smaller from
133
      the one in the database.
134

135
    """
136
    @wraps(func)
137
    def wrapper(target, msg):
138
        try:
139
            event_time = merge_time(msg['event_time'])
140
        except:
141
            log.error("Received message with malformed time: %s",
142
                      msg['event_time'])
143
            raise KeyError
144

    
145
        db_time = target.backendtime
146

    
147
        if db_time and event_time <= db_time:
148
            format_ = "%d/%m/%y %H:%M:%S:%f"
149
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
150
                      " db_timestamp: %s",
151
                      msg,
152
                      event_time.strftime(format_),
153
                      db_time.strftime(format_))
154
            return
155
        # New message. Update the database!
156
        func(target, msg, event_time)
157

    
158
    return wrapper
159

    
160

    
161
@instance_from_msg
162
@if_update_required
163
def update_db(vm, msg, event_time):
164
    """Process a notification of type 'ganeti-op-status'"""
165
    log.debug("Processing ganeti-op-status msg: %s", msg)
166

    
167
    if msg['type'] != "ganeti-op-status":
168
        log.error("Message is of unknown type %s.", msg['type'])
169
        return
170

    
171
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
172
                              msg['status'], msg['logmsg'])
173

    
174
    log.debug("Done processing ganeti-op-status msg for vm %s.",
175
              msg['instance'])
176

    
177

    
178
@instance_from_msg
179
@if_update_required
180
def update_net(vm, msg, event_time):
181
    """Process a notification of type 'ganeti-net-status'"""
182
    log.debug("Processing ganeti-net-status msg: %s", msg)
183

    
184
    if msg['type'] != "ganeti-net-status":
185
        log.error("Message is of unknown type %s", msg['type'])
186
        return
187

    
188
    backend.process_net_status(vm, event_time, msg['nics'])
189

    
190
    log.debug("Done processing ganeti-net-status msg for vm %s.",
191
              msg["instance"])
192

    
193

    
194
@network_from_msg
195
@if_update_required
196
def update_network(network, msg, event_time):
197
    """Process a notification of type 'ganeti-network-status'"""
198
    log.debug("Processing ganeti-network-status msg: %s", msg)
199

    
200
    if msg['type'] != "ganeti-network-status":
201
        log.error("Message is of unknown type %s.", msg['type'])
202
        return
203

    
204
    opcode = msg['operation']
205
    status = msg['status']
206
    jobid = msg['jobId']
207

    
208
    if opcode == "OP_NETWORK_SET_PARAMS":
209
        backend.process_network_modify(network, event_time, jobid, opcode,
210
                                       status, msg['add_reserved_ips'],
211
                                       msg['remove_reserved_ips'])
212
    else:
213
        backend.process_network_status(network, event_time, jobid, opcode,
214
                                       status, msg['logmsg'])
215

    
216
    log.debug("Done processing ganeti-network-status msg for network %s.",
217
              msg['network'])
218

    
219

    
220
@instance_from_msg
221
@if_update_required
222
def update_build_progress(vm, msg, event_time):
223
    """
224
    Process a create progress message. Update build progress, or create
225
    appropriate diagnostic entries for the virtual machine instance.
226
    """
227
    log.debug("Processing ganeti-create-progress msg: %s", msg)
228

    
229
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
230
                           'image-warning', 'image-helper'):
231
        log.error("Message is of unknown type %s", msg['type'])
232
        return
233

    
234
    if msg['type'] == 'image-copy-progress':
235
        backend.process_create_progress(vm, event_time, msg['progress'])
236
        # we do not add diagnostic messages for copy-progress messages
237
        return
238

    
239
    # default diagnostic fields
240
    source = msg['type']
241
    level = 'DEBUG'
242
    message = msg.get('messages', '')
243
    if isinstance(message, list):
244
        message = " ".join(message)
245

    
246
    details = msg.get('stderr', None)
247

    
248
    if msg['type'] == 'image-helper':
249
        # for helper task events join subtype to diagnostic source and
250
        # set task name as diagnostic message
251
        if msg.get('subtype', None):
252
            if msg.get('subtype') in ['task-start', 'task-end']:
253
                message = msg.get('task', message)
254
                source = "%s-%s" % (source, msg.get('subtype'))
255

    
256
        if msg.get('subtype', None) == 'warning':
257
            level = 'WARNING'
258

    
259
        if msg.get('subtype', None) == 'error':
260
            level = 'ERROR'
261

    
262
        if msg.get('subtype', None) == 'info':
263
            level = 'INFO'
264

    
265
    if msg['type'] == 'image-error':
266
        level = 'ERROR'
267

    
268
    if msg['type'] == 'image-warning':
269
        level = 'WARNING'
270

    
271
    if not message.strip():
272
        message = " ".join(source.split("-")).capitalize()
273

    
274
    # create the diagnostic entry
275
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
276
                                       details=details)
277

    
278
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
279
              msg['instance'])
280

    
281

    
282
def dummy_proc(client, message, *args, **kwargs):
283
    try:
284
        log.debug("Msg: %s", message['body'])
285
        client.basic_ack(message)
286
    except Exception as e:
287
        log.exception("Could not receive message %s" % e)