Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ d7841399

History | View | Annotate | Download (10 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger(__name__)
43

    
44
def handle_message_delivery(func):
45
    """ Generic decorator for handling messages.
46

47
    This decorator is responsible for converting the message into json format,
48
    handling of common exceptions and acknowledment of message if needed.
49

50
    """
51
    @wraps(func)
52
    def wrapper(client, message, *args, **kwargs):
53
        try:
54
            msg = json.loads(message['body'])
55
            func(msg)
56
            client.basic_ack(message)
57
        except ValueError as e:
58
            log.error("Incoming message not in JSON format %s: %s", e, message)
59
            client.basic_nack(message)
60
        except KeyError as e:
61
            log.error("Malformed incoming JSON, missing attribute %s: %s",
62
                      e, message)
63
            client.basic_nack(message)
64
        except Exception as e:
65
            log.exception("Unexpected error: %s, msg: %s", e, msg)
66
            client.basic_reject(message)
67

    
68
    return wrapper
69

    
70

    
71
def instance_from_msg(func):
72
    """ Decorator for getting the VirtualMachine object of the msg.
73

74
    """
75
    @handle_message_delivery
76
    @wraps(func)
77
    def wrapper(msg):
78
        try:
79
            vm_id = utils.id_from_instance_name(msg["instance"])
80
            vm = VirtualMachine.objects.select_for_update().get(id=vm_id)
81
            func(vm, msg)
82
        except VirtualMachine.InvalidBackendIdError:
83
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
84
        except VirtualMachine.DoesNotExist:
85
            log.error("VM for instance %s with id %d not found in DB.",
86
                      msg['instance'], vm_id)
87
        except Network.InvalidBackendIdError, Network.DoesNotExist:
88
            log.error("Invalid message", msg)
89
    return wrapper
90

    
91

    
92
def network_from_msg(func):
93
    """ Decorator for getting the BackendNetwork object of the msg.
94

95
    """
96
    @handle_message_delivery
97
    @wraps(func)
98
    def wrapper(msg):
99
        try:
100
            network_id = utils.id_from_network_name(msg["network"])
101
            network = Network.objects.select_for_update().get(id=network_id)
102
            backend = Backend.objects.get(clustername=msg['cluster'])
103
            backend_network = BackendNetwork.objects.get(network=network,
104
                                                         backend=backend)
105
            func(backend_network, msg)
106
        except Network.InvalidBackendIdError:
107
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
108
        except Network.DoesNotExist:
109
            log.error("Network %s not found in DB.", msg['network'])
110
        except Backend.DoesNotExist:
111
            log.error("Backend %s not found in DB.", msg['cluster'])
112
        except BackendNetwork.DoesNotExist:
113
            log.error("Network %s on backend %s not found in DB.",
114
                      msg['network'], msg['cluster'])
115
    return wrapper
116

    
117

    
118
def if_update_required(func):
119
    """
120
    Decorator for checking if an incoming message needs to update the db.
121

122
    The database will not be updated in the following cases:
123
    - The message has been redelivered and the action has already been
124
      completed. In this case the event_time will be equal with the one
125
      in the database.
126
    - The message describes a previous state in the ganeti, from the one that
127
      is described in the db. In this case the event_time will be smaller from
128
      the one in the database.
129

130
    """
131
    @wraps(func)
132
    def wrapper(target, msg):
133
        try:
134
            event_time = merge_time(msg['event_time'])
135
        except:
136
            log.error("Received message with malformed time: %s",
137
                      msg['event_time'])
138
            raise KeyError
139

    
140
        db_time = target.backendtime
141

    
142
        if db_time and event_time <= db_time:
143
            format_ = "%d/%m/%y %H:%M:%S:%f"
144
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
145
                      msg,
146
                      event_time.strftime(format_),
147
                      db_time.strftime(format_))
148
            return
149
        # New message. Update the database!
150
        func(target, msg, event_time)
151

    
152
    return wrapper
153

    
154

    
155
@instance_from_msg
156
@if_update_required
157
def update_db(vm, msg, event_time):
158
    """Process a notification of type 'ganeti-op-status'"""
159
    log.debug("Processing ganeti-op-status msg: %s", msg)
160

    
161
    if msg['type'] != "ganeti-op-status":
162
        log.error("Message is of unknown type %s.", msg['type'])
163
        return
164

    
165
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
166
                              msg['status'], msg['logmsg'])
167

    
168
    log.debug("Done processing ganeti-op-status msg for vm %s.",
169
              msg['instance'])
170

    
171

    
172
@instance_from_msg
173
@if_update_required
174
def update_net(vm, msg, event_time):
175
    """Process a notification of type 'ganeti-net-status'"""
176
    log.debug("Processing ganeti-net-status msg: %s", msg)
177

    
178
    if msg['type'] != "ganeti-net-status":
179
        log.error("Message is of unknown type %s", msg['type'])
180
        return
181

    
182
    backend.process_net_status(vm, event_time, msg['nics'])
183

    
184
    log.debug("Done processing ganeti-net-status msg for vm %s.",
185
              msg["instance"])
186

    
187

    
188
@network_from_msg
189
@if_update_required
190
def update_network(network, msg, event_time):
191
    """Process a notification of type 'ganeti-network-status'"""
192
    log.debug("Processing ganeti-network-status msg: %s", msg)
193

    
194
    if msg['type'] != "ganeti-network-status":
195
        log.error("Message is of unknown type %s.", msg['type'])
196
        return
197

    
198
    opcode = msg['operation']
199
    status = msg['status']
200
    jobid = msg['jobId']
201

    
202
    if opcode == "OP_NETWORK_SET_PARAMS":
203
        backend.process_network_modify(network, event_time, jobid, opcode,
204
                                       status, msg['add_reserved_ips'],
205
                                       msg['remove_reserved_ips'])
206
    else:
207
        backend.process_network_status(network, event_time, jobid, opcode,
208
                                       status, msg['logmsg'])
209

    
210
    log.debug("Done processing ganeti-network-status msg for network %s.",
211
              msg['network'])
212

    
213

    
214
@instance_from_msg
215
@if_update_required
216
def update_build_progress(vm, msg, event_time):
217
    """
218
    Process a create progress message. Update build progress, or create
219
    appropriate diagnostic entries for the virtual machine instance.
220
    """
221
    log.debug("Processing ganeti-create-progress msg: %s", msg)
222

    
223
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
224
                           'image-warning', 'image-helper'):
225
        log.error("Message is of unknown type %s", msg['type'])
226
        return
227

    
228
    if msg['type'] == 'image-copy-progress':
229
        backend.process_create_progress(vm, event_time, msg['progress'])
230
        # we do not add diagnostic messages for copy-progress messages
231
        return
232

    
233
    # default diagnostic fields
234
    source = msg['type']
235
    level = 'DEBUG'
236
    message = msg.get('messages', None)
237
    if isinstance(message, list):
238
        message = " ".join(message)
239

    
240
    details = msg.get('stderr', None)
241

    
242
    if msg['type'] == 'image-helper':
243
        # for helper task events join subtype to diagnostic source and
244
        # set task name as diagnostic message
245
        if msg.get('subtype', None) and msg.get('subtype') in ['task-start',
246
              'task-end']:
247
            message = msg.get('task', message)
248
            source = "%s-%s" % (source, msg.get('subtype'))
249

    
250
        if msg.get('subtype', None) == 'warning':
251
            level = 'WARNING'
252

    
253
        if msg.get('subtype', None) == 'error':
254
            level = 'ERROR'
255

    
256
        if msg.get('subtype', None) == 'info':
257
            level = 'INFO'
258

    
259
    if msg['type'] == 'image-error':
260
        level = 'ERROR'
261

    
262
    if msg['type'] == 'image-warning':
263
        level = 'WARNING'
264

    
265
    if not message.strip():
266
        message = " ".join(source.split("-")).capitalize()
267

    
268
    # create the diagnostic entry
269
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
270
        details=details)
271

    
272
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
273
              msg['instance'])
274

    
275

    
276
def dummy_proc(client, message, *args, **kwargs):
277
    try:
278
        log.debug("Msg: %s", message['body'])
279
        client.basic_ack(message)
280
    except Exception as e:
281
        log.exception("Could not receive message %s" % e)