Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ bb80a8d7

History | View | Annotate | Download (8.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36

    
37
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
38
from synnefo.logic import utils, backend
39

    
40
from synnefo.lib.utils import merge_time
41

    
42

    
43
log = logging.getLogger()
44

    
45

    
46
def handle_message_delivery(func):
47
    """ Generic decorator for handling messages.
48

49
    This decorator is responsible for converting the message into json format,
50
    handling of common exceptions and acknowledment of message if needed.
51

52
    """
53
    @wraps(func)
54
    def wrapper(client, message, *args, **kwargs):
55
        try:
56
            msg = json.loads(message['body'])
57
            func(msg)
58
            client.basic_ack(message)
59
        except ValueError as e:
60
            log.error("Incoming message not in JSON format %s: %s", e, message)
61
            client.basic_ack(message)
62
        except KeyError as e:
63
            log.error("Malformed incoming JSON, missing attribute %s: %s",
64
                      e, message)
65
            client.basic_ack(message)
66
        except Exception as e:
67
            log.exception("Unexpected error: %s, msg: %s", e, msg)
68

    
69
    return wrapper
70

    
71

    
72
def instance_from_msg(func):
73
    """ Decorator for getting the VirtualMachine object of the msg.
74

75
    """
76
    @handle_message_delivery
77
    @wraps(func)
78
    def wrapper(msg):
79
        try:
80
            vm_id = utils.id_from_instance_name(msg["instance"])
81
            vm = VirtualMachine.objects.get(id=vm_id)
82
            func(vm, msg)
83
        except VirtualMachine.InvalidBackendIdError:
84
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
85
        except VirtualMachine.DoesNotExist:
86
            log.error("VM for instance %s with id %d not found in DB.",
87
                      msg['instance'], vm_id)
88
        except Network.InvalidBackendIdError, Network.DoesNotExist:
89
            log.error("Invalid message", msg)
90
    return wrapper
91

    
92

    
93
def network_from_msg(func):
94
    """ Decorator for getting the BackendNetwork object of the msg.
95

96
    """
97
    @handle_message_delivery
98
    @wraps(func)
99
    def wrapper(msg):
100
        try:
101
            network_id = utils.id_from_network_name(msg["network"])
102
            network = Network.objects.select_for_update().get(id=network_id)
103
            backend = Backend.objects.get(clustername=msg['cluster'])
104
            backend_network = BackendNetwork.objects.get(network=network,
105
                                                         backend=backend)
106
            func(backend_network, msg)
107
        except Network.InvalidBackendIdError:
108
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
109
        except Network.DoesNotExist:
110
            log.error("Network %s not found in DB.", msg['network'])
111
        except Backend.DoesNotExist:
112
            log.error("Backend %s not found in DB.", msg['cluster'])
113
        except BackendNetwork.DoesNotExist:
114
            log.error("Network %s on backend %s not found in DB.",
115
                      msg['network'], msg['cluster'])
116
    return wrapper
117

    
118

    
119
def if_update_required(func):
120
    """
121
    Decorator for checking if an incoming message needs to update the db.
122

123
    The database will not be updated in the following cases:
124
    - The message has been redelivered and the action has already been
125
      completed. In this case the event_time will be equal with the one
126
      in the database.
127
    - The message describes a previous state in the ganeti, from the one that
128
      is described in the db. In this case the event_time will be smaller from
129
      the one in the database.
130

131
    """
132
    @wraps(func)
133
    def wrapper(target, msg):
134
        try:
135
            event_time = merge_time(msg['event_time'])
136
        except:
137
            log.error("Received message with malformed time: %s",
138
                      msg['event_time'])
139
            raise KeyError
140

    
141
        db_time = target.backendtime
142

    
143
        if db_time and event_time <= db_time:
144
            format_ = "%d/%m/%y %H:%M:%S:%f"
145
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
146
                      msg,
147
                      event_time.strftime(format_),
148
                      db_time.strftime(format_))
149
            return
150
        # New message. Update the database!
151
        func(target, msg, event_time)
152

    
153
    return wrapper
154

    
155

    
156
@instance_from_msg
157
@if_update_required
158
def update_db(vm, msg, event_time):
159
    """Process a notification of type 'ganeti-op-status'"""
160
    log.debug("Processing ganeti-op-status msg: %s", msg)
161

    
162
    if msg['type'] != "ganeti-op-status":
163
        log.error("Message is of unknown type %s.", msg['type'])
164
        return
165

    
166
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
167
                              msg['status'], msg['logmsg'])
168

    
169
    log.debug("Done processing ganeti-op-status msg for vm %s.",
170
              msg['instance'])
171

    
172

    
173
@instance_from_msg
174
@if_update_required
175
def update_net(vm, msg, event_time):
176
    """Process a notification of type 'ganeti-net-status'"""
177
    log.debug("Processing ganeti-net-status msg: %s", msg)
178

    
179
    if msg['type'] != "ganeti-net-status":
180
        log.error("Message is of unknown type %s", msg['type'])
181
        return
182

    
183
    backend.process_net_status(vm, event_time, msg['nics'])
184

    
185
    log.debug("Done processing ganeti-net-status msg for vm %s.",
186
              msg["instance"])
187

    
188

    
189
@network_from_msg
190
@if_update_required
191
def update_network(network, msg, event_time):
192
    """Process a notification of type 'ganeti-network-status'"""
193
    log.debug("Processing ganeti-network-status msg: %s", msg)
194

    
195
    if msg['type'] != "ganeti-network-status":
196
        log.error("Message is of unknown type %s.", msg['type'])
197
        return
198

    
199
    backend.process_network_status(network, event_time,
200
                                   msg['jobId'], msg['operation'],
201
                                   msg['status'], msg['logmsg'])
202

    
203
    log.debug("Done processing ganeti-network-status msg for network %s.",
204
              msg['network'])
205

    
206

    
207
@instance_from_msg
208
@if_update_required
209
def update_build_progress(vm, msg, event_time):
210
    """Process a create progress message"""
211
    log.debug("Processing ganeti-create-progress msg: %s", msg)
212

    
213
    if msg['type'] != "ganeti-create-progress":
214
        log.error("Message is of unknown type %s", msg['type'])
215
        return
216

    
217
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
218

    
219
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
220
              msg['instance'])
221

    
222

    
223
def dummy_proc(client, message, *args, **kwargs):
224
    try:
225
        log.debug("Msg: %s", message['body'])
226
        client.basic_ack(message)
227
    except Exception as e:
228
        log.exception("Could not receive message %s" % e)