Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 6138f0ef

History | View | Annotate | Download (9.4 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42
log = logging.getLogger()
43

    
44
def handle_message_delivery(func):
45
    """ Generic decorator for handling messages.
46

47
    This decorator is responsible for converting the message into json format,
48
    handling of common exceptions and acknowledment of message if needed.
49

50
    """
51
    @wraps(func)
52
    def wrapper(client, message, *args, **kwargs):
53
        try:
54
            msg = json.loads(message['body'])
55
            func(msg)
56
            client.basic_ack(message)
57
        except ValueError as e:
58
            log.error("Incoming message not in JSON format %s: %s", e, message)
59
            client.basic_ack(message)
60
        except KeyError as e:
61
            log.error("Malformed incoming JSON, missing attribute %s: %s",
62
                      e, message)
63
            client.basic_ack(message)
64
        except Exception as e:
65
            log.exception("Unexpected error: %s, msg: %s", e, msg)
66

    
67
    return wrapper
68

    
69

    
70
def instance_from_msg(func):
71
    """ Decorator for getting the VirtualMachine object of the msg.
72

73
    """
74
    @handle_message_delivery
75
    @wraps(func)
76
    def wrapper(msg):
77
        try:
78
            vm_id = utils.id_from_instance_name(msg["instance"])
79
            vm = VirtualMachine.objects.get(id=vm_id)
80
            func(vm, msg)
81
        except VirtualMachine.InvalidBackendIdError:
82
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
83
        except VirtualMachine.DoesNotExist:
84
            log.error("VM for instance %s with id %d not found in DB.",
85
                      msg['instance'], vm_id)
86
        except Network.InvalidBackendIdError, Network.DoesNotExist:
87
            log.error("Invalid message", msg)
88
    return wrapper
89

    
90

    
91
def network_from_msg(func):
92
    """ Decorator for getting the BackendNetwork object of the msg.
93

94
    """
95
    @handle_message_delivery
96
    @wraps(func)
97
    def wrapper(msg):
98
        try:
99
            network_id = utils.id_from_network_name(msg["network"])
100
            network = Network.objects.get(id=network_id)
101
            backend = Backend.objects.get(clustername=msg['cluster'])
102
            backend_network = BackendNetwork.objects.get(network=network,
103
                                                         backend=backend)
104
            func(backend_network, msg)
105
        except Network.InvalidBackendIdError:
106
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
107
        except Network.DoesNotExist:
108
            log.error("Network %s not found in DB.", msg['network'])
109
        except Backend.DoesNotExist:
110
            log.error("Backend %s not found in DB.", msg['cluster'])
111
        except BackendNetwork.DoesNotExist:
112
            log.error("Network %s on backend %s not found in DB.",
113
                      msg['network'], msg['cluster'])
114
    return wrapper
115

    
116

    
117
def if_update_required(func):
118
    """
119
    Decorator for checking if an incoming message needs to update the db.
120

121
    The database will not be updated in the following cases:
122
    - The message has been redelivered and the action has already been
123
      completed. In this case the event_time will be equal with the one
124
      in the database.
125
    - The message describes a previous state in the ganeti, from the one that
126
      is described in the db. In this case the event_time will be smaller from
127
      the one in the database.
128

129
    """
130
    @wraps(func)
131
    def wrapper(target, msg):
132
        event_time = merge_time(msg['event_time'])
133
        db_time = target.backendtime
134

    
135
        if db_time and event_time <= db_time:
136
            format_ = "%d/%m/%y %H:%M:%S:%f"
137
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
138
                      msg,
139
                      event_time.strftime(format_),
140
                      db_time.strftime(format_))
141
            return
142
        # New message. Update the database!
143
        func(target, msg, event_time)
144

    
145
    return wrapper
146

    
147

    
148
@instance_from_msg
149
@if_update_required
150
def update_db(vm, msg, event_time):
151
    """Process a notification of type 'ganeti-op-status'"""
152
    log.debug("Processing ganeti-op-status msg: %s", msg)
153

    
154
    if msg['type'] != "ganeti-op-status":
155
        log.error("Message is of unknown type %s.", msg['type'])
156
        return
157

    
158
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
159
                              msg['status'], msg['logmsg'])
160

    
161
    log.debug("Done processing ganeti-op-status msg for vm %s.",
162
              msg['instance'])
163

    
164

    
165
@instance_from_msg
166
@if_update_required
167
def update_net(vm, msg, event_time):
168
    """Process a notification of type 'ganeti-net-status'"""
169
    log.debug("Processing ganeti-net-status msg: %s", msg)
170

    
171
    if msg['type'] != "ganeti-net-status":
172
        log.error("Message is of unknown type %s", msg['type'])
173
        return
174

    
175
    backend.process_net_status(vm, event_time, msg['nics'])
176

    
177
    log.debug("Done processing ganeti-net-status msg for vm %s.",
178
              msg["instance"])
179

    
180

    
181
@network_from_msg
182
@if_update_required
183
def update_network(network, msg, event_time):
184
    """Process a notification of type 'ganeti-network-status'"""
185
    log.debug("Processing ganeti-network-status msg: %s", msg)
186

    
187
    if msg['type'] != "ganeti-network-status":
188
        log.error("Message is of unknown type %s.", msg['type'])
189
        return
190

    
191
    backend.process_network_status(network, event_time,
192
                                   msg['jobId'], msg['operation'],
193
                                   msg['status'], msg['logmsg'])
194

    
195
    log.debug("Done processing ganeti-network-status msg for network %s.",
196
              msg['network'])
197

    
198

    
199
@instance_from_msg
200
@if_update_required
201
def update_build_progress(vm, msg, event_time):
202
    """
203
    Process a create progress message. Update build progress, or create
204
    appropriate diagnostic entries for the virtual machine instance.
205
    """
206
    log.debug("Processing ganeti-create-progress msg: %s", msg)
207

    
208
    if msg['type'] not in ('image-copy-progress', 'image-error', 'image-info',
209
                           'image-warning', 'image-helper'):
210
        log.error("Message is of unknown type %s", msg['type'])
211
        return
212

    
213
    if msg['type'] == 'image-copy-progress':
214
        backend.process_create_progress(vm, event_time, msg['progress'])
215
        # we do not add diagnostic messages for copy-progress messages
216
        return
217

    
218
    # default diagnostic fields
219
    source = msg['type']
220
    level = 'DEBUG'
221
    message = msg.get('messages', None)
222
    if isinstance(message, list):
223
        message = " ".join(message)
224

    
225
    details = msg.get('stderr', None)
226

    
227
    if msg['type'] == 'image-helper':
228
        # for helper task events join subtype to diagnostic source and
229
        # set task name as diagnostic message
230
        if msg.get('subtype', None) and msg.get('subtype') in ['task-start',
231
              'task-end']:
232
            message = msg.get('task', message)
233
            source = "%s-%s" % (source, msg.get('subtype'))
234

    
235
        if msg.get('subtype', None) == 'warning':
236
            level = 'WARNING'
237

    
238
        if msg.get('subtype', None) == 'error':
239
            level = 'ERROR'
240

    
241
        if msg.get('subtype', None) == 'info':
242
            level = 'INFO'
243

    
244
    if msg['type'] == 'image-error':
245
        level = 'ERROR'
246

    
247
    if msg['type'] == 'image-warning':
248
        level = 'WARNING'
249

    
250
    if not message.strip():
251
        message = " ".join(source.split("-")).capitalize()
252

    
253
    # create the diagnostic entry
254
    backend.create_instance_diagnostic(vm, message, source, level, event_time,
255
        details=details)
256

    
257
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
258
              msg['instance'])
259

    
260

    
261
def dummy_proc(client, message, *args, **kwargs):
262
    try:
263
        log.debug("Msg: %s", message['body'])
264
        client.basic_ack(message)
265
    except Exception as e:
266
        log.exception("Could not receive message %s" % e)