Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 95059648

History | View | Annotate | Download (9.7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger(__name__)
43

    
44

    
45
def handle_message_delivery(func):
46
    """ Generic decorator for handling messages.
47

48
    This decorator is responsible for converting the message into json format,
49
    handling of common exceptions and acknowledment of message if needed.
50

51
    """
52
    @wraps(func)
53
    def wrapper(client, message, *args, **kwargs):
54
        try:
55
            msg = None
56
            msg = json.loads(message['body'])
57
            func(msg)
58
            client.basic_ack(message)
59
        except ValueError as e:
60
            log.error("Incoming message not in JSON format %s: %s", e, message)
61
            client.basic_nack(message)
62
        except KeyError as e:
63
            log.error("Malformed incoming JSON, missing attribute %s: %s",
64
                      e, message)
65
            client.basic_nack(message)
66
        except Exception as e:
67
            if msg:
68
                log.exception("Unexpected error: %s, msg: %s", e, msg)
69
            else:
70
                log.exception("Unexpected error: %s", e)
71
            client.basic_reject(message)
72

    
73
    return wrapper
74

    
75

    
76
def instance_from_msg(func):
77
    """ Decorator for getting the VirtualMachine object of the msg.
78

79
    """
80
    @handle_message_delivery
81
    @wraps(func)
82
    def wrapper(msg):
83
        try:
84
            vm_id = utils.id_from_instance_name(msg["instance"])
85
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
86
            func(vm, msg)
87
        except VirtualMachine.InvalidBackendIdError:
88
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
89
        except VirtualMachine.DoesNotExist:
90
            log.error("VM for instance %s with id %d not found in DB.",
91
                      msg['instance'], vm_id)
92
        except Network.InvalidBackendIdError, Network.DoesNotExist:
93
            log.error("Invalid message", msg)
94
    return wrapper
95

    
96

    
97
def network_from_msg(func):
98
    """ Decorator for getting the BackendNetwork object of the msg.
99

100
    """
101
    @handle_message_delivery
102
    @wraps(func)
103
    def wrapper(msg):
104
        try:
105
            network_id = utils.id_from_network_name(msg["network"])
106
            network = Network.objects.select_for_update().get(id=network_id)
107
            backend = Backend.objects.get(clustername=msg['cluster'])
108
            bnet, new = BackendNetwork.objects.get_or_create(network=network,
109
                                                             backend=backend)
110
            if new:
111
                log.info("Created missing BackendNetwork %s", bnet)
112
            func(bnet, msg)
113
        except Network.InvalidBackendIdError:
114
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
115
        except Network.DoesNotExist:
116
            log.error("Network %s not found in DB.", msg['network'])
117
        except Backend.DoesNotExist:
118
            log.error("Backend %s not found in DB.", msg['cluster'])
119
        except BackendNetwork.DoesNotExist:
120
            log.error("Network %s on backend %s not found in DB.",
121
                      msg['network'], msg['cluster'])
122
    return wrapper
123

    
124

    
125
def if_update_required(func):
126
    """
127
    Decorator for checking if an incoming message needs to update the db.
128

129
    The database will not be updated in the following cases:
130
    - The message has been redelivered and the action has already been
131
      completed. In this case the event_time will be equal with the one
132
      in the database.
133
    - The message describes a previous state in the ganeti, from the one that
134
      is described in the db. In this case the event_time will be smaller from
135
      the one in the database.
136

137
    """
138
    @wraps(func)
139
    def wrapper(target, msg):
140
        try:
141
            event_time = merge_time(msg['event_time'])
142
        except:
143
            log.error("Received message with malformed time: %s",
144
                      msg['event_time'])
145
            raise KeyError
146

    
147
        db_time = target.backendtime
148

    
149
        if db_time and event_time <= db_time:
150
            format_ = "%d/%m/%y %H:%M:%S:%f"
151
            log.debug("Ignoring message %s.\nevent_timestamp: %s"
152
                      " db_timestamp: %s",
153
                      msg,
154
                      event_time.strftime(format_),
155
                      db_time.strftime(format_))
156
            return
157
        # New message. Update the database!
158
        func(target, msg, event_time)
159

    
160
    return wrapper
161

    
162

    
163
@instance_from_msg
164
@if_update_required
165
def update_db(vm, msg, event_time):
166
    """Process a notification of type 'ganeti-op-status'"""
167
    log.debug("Processing ganeti-op-status msg: %s", msg)
168

    
169
    if msg['type'] != "ganeti-op-status":
170
        log.error("Message is of unknown type %s.", msg['type'])
171
        return
172

    
173
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
174
                              msg['status'], msg['logmsg'])
175

    
176
    log.debug("Done processing ganeti-op-status msg for vm %s.",
177
              msg['instance'])
178

    
179

    
180
@network_from_msg
181
@if_update_required
182
def update_network(network, msg, event_time):
183
    """Process a notification of type 'ganeti-network-status'"""
184
    log.debug("Processing ganeti-network-status msg: %s", msg)
185

    
186
    if msg['type'] != "ganeti-network-status":
187
        log.error("Message is of unknown type %s.", msg['type'])
188
        return
189

    
190
    opcode = msg['operation']
191
    status = msg['status']
192
    jobid = msg['jobId']
193

    
194
    if opcode == "OP_NETWORK_SET_PARAMS":
195
        backend.process_network_modify(network, event_time, jobid, opcode,
196
                                       status, msg['add_reserved_ips'],
197
                                       msg['remove_reserved_ips'])
198
    else:
199
        backend.process_network_status(network, event_time, jobid, opcode,
200
                                       status, msg['logmsg'])
201

    
202
    log.debug("Done processing ganeti-network-status msg for network %s.",
203
              msg['network'])
204

    
205

    
206
@instance_from_msg
207
@if_update_required
208
def update_build_progress(vm, msg, event_time):
209
    """
210
    Process a create progress message. Update build progress, or create
211
    appropriate diagnostic entries for the virtual machine instance.
212
    """
213
    log.debug("Processing ganeti-create-progress msg: %s", msg)
214

    
215
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
216
                           'image-warning', 'image-helper'):
217
        log.error("Message is of unknown type %s", msg['type'])
218
        return
219

    
220
    if msg['type'] == 'image-copy-progress':
221
        backend.process_create_progress(vm, event_time, msg['progress'])
222
        # we do not add diagnostic messages for copy-progress messages
223
        return
224

    
225
    # default diagnostic fields
226
    source = msg['type']
227
    level = 'DEBUG'
228
    message = msg.get('messages', '')
229
    if isinstance(message, list):
230
        message = " ".join(message)
231

    
232
    details = msg.get('stderr', None)
233

    
234
    if msg['type'] == 'image-helper':
235
        # for helper task events join subtype to diagnostic source and
236
        # set task name as diagnostic message
237
        if msg.get('subtype', None):
238
            if msg.get('subtype') in ['task-start', 'task-end']:
239
                message = msg.get('task', message)
240
                source = "%s-%s" % (source, msg.get('subtype'))
241

    
242
        if msg.get('subtype', None) == 'warning':
243
            level = 'WARNING'
244

    
245
        if msg.get('subtype', None) == 'error':
246
            level = 'ERROR'
247

    
248
        if msg.get('subtype', None) == 'info':
249
            level = 'INFO'
250

    
251
    if msg['type'] == 'image-error':
252
        level = 'ERROR'
253

    
254
    if msg['type'] == 'image-warning':
255
        level = 'WARNING'
256

    
257
    if not message.strip():
258
        message = " ".join(source.split("-")).capitalize()
259

    
260
    # create the diagnostic entry
261
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
262
                                       details=details)
263

    
264
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
265
              msg['instance'])
266

    
267

    
268
def dummy_proc(client, message, *args, **kwargs):
269
    try:
270
        log.debug("Msg: %s", message['body'])
271
        client.basic_ack(message)
272
    except Exception as e:
273
        log.exception("Could not receive message %s" % e)