Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ a17a8e98

History | View | Annotate | Download (7.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36
from datetime import datetime
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend
40

    
41
from synnefo.lib.utils import merge_time
42

    
43

    
44
log = logging.getLogger()
45

    
46

    
47
def handle_message_delivery(func):
48
    """ Generic decorator for handling messages.
49

50
    This decorator is responsible for converting the message into json format,
51
    handling of common exceptions and acknowledment of message if needed.
52

53
    """
54
    @wraps(func)
55
    def wrapper(client, message, *args, **kwargs):
56
        try:
57
            msg = json.loads(message['body'])
58
            func(msg)
59
            client.basic_ack(message)
60
        except ValueError:
61
            log.error("Incoming message not in JSON format: %s", message)
62
            client.basic_ack(message)
63
        except KeyError:
64
            log.error("Malformed incoming JSON, missing attributes: %s",
65
                      message)
66
            client.basic_ack(message)
67
        except Exception as e:
68
            log.exception("Unexpected error: %s, msg: %s", e, msg)
69

    
70
    return wrapper
71

    
72
def instance_from_msg(func):
73
    """ Decorator for getting the VirtualMachine object of the msg.
74

75
    """
76
    @handle_message_delivery
77
    @wraps(func)
78
    def wrapper(msg):
79
        try:
80
            vm_id = utils.id_from_instance_name(msg["instance"])
81
            vm = VirtualMachine.objects.get(id=vm_id)
82
            func(vm, msg)
83
        except VirtualMachine.InvalidBackendIdError:
84
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
85
        except VirtualMachine.DoesNotExist:
86
            log.error("VM for instance %s with id %d not found in DB.",
87
                      msg['instance'], vm_id)
88
    return wrapper
89

    
90
def network_from_msg(func):
91
    """ Decorator for getting the Network object of the msg.
92

93
    """
94
    @handle_message_delivery
95
    @wraps(func)
96
    def wrapper(msg):
97
        try:
98
            network_id = utils.id_from_network_name(msg["network"])
99
            network = Network.objects.get(id=network_id)
100
            func(network, msg)
101
        except Network.InvalidBackendIdError:
102
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
103
        except Network.DoesNotExist:
104
            log.error("Network %s with id %d not found in DB.",
105
                      msg['network'], vm_id)
106
    return wrapper
107

    
108
def if_update_required(func):
109
    """
110
    Decorator for checking if an incoming message needs to update the db.
111

112
    The database will not be updated in the following cases:
113
    - The message has been redelivered and the action has already been
114
      completed. In this case the event_time will be equal with the one
115
      in the database.
116
    - The message describes a previous state in the ganeti, from the one that is
117
      described in the db. In this case the event_time will be smaller from the
118
      one in the database.
119

120
    """
121
    @wraps(func)
122
    def wrapper(target, msg):
123
        event_time = merge_time(msg['event_time'])
124
        db_time = target.backendtime
125

    
126
        if event_time <= db_time:
127
            format_ = "%d/%m/%y %H:%M:%S:%f"
128
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
129
                      msg,
130
                      event_time.strftime(format_),
131
                      db_time.strftime(format_))
132
            return
133
        # New message. Update the database!
134
        func(target, msg, event_time)
135

    
136
    return wrapper
137

    
138

    
139
@instance_from_msg
140
@if_update_required
141
def update_db(vm, msg, event_time):
142
    """Process a notification of type 'ganeti-op-status'"""
143
    log.debug("Processing ganeti-op-status msg: %s", msg)
144

    
145
    if msg['type'] != "ganeti-op-status":
146
        log.error("Message is of unknown type %s.", msg['type'])
147
        return
148

    
149
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
150
                              msg['status'], msg['logmsg'])
151

    
152
    log.debug("Done processing ganeti-op-status msg for vm %s.",
153
              msg['instance'])
154

    
155

    
156
@instance_from_msg
157
@if_update_required
158
def update_net(vm, msg, event_time):
159
    """Process a notification of type 'ganeti-net-status'"""
160
    log.debug("Processing ganeti-net-status msg: %s", msg)
161

    
162
    if msg['type'] != "ganeti-net-status":
163
        log.error("Message is of unknown type %s", msg['type'])
164
        return
165

    
166
    backend.process_net_status(vm, event_time, msg['nics'])
167

    
168
    log.debug("Done processing ganeti-net-status msg for vm %s.",
169
              msg["instance"])
170

    
171

    
172
@network_from_msg
173
@if_update_required
174
def update_network(network, msg, event_time):
175
    """Process a notification of type 'ganeti-network-status'"""
176
    log.debug("Processing ganeti-network-status msg: %s", msg)
177

    
178
    if msg['type'] != "ganeti-network-status":
179
        log.error("Message is of unknown type %s.", msg['type'])
180
        return
181

    
182

    
183
    log.debug("Done processing ganeti-network-status msg for vm %s.",
184
              msg['instance'])
185

    
186

    
187
@instance_from_msg
188
@if_update_required
189
def update_build_progress(vm, msg, event_time):
190
    """Process a create progress message"""
191
    log.debug("Processing ganeti-create-progress msg: %s", msg)
192

    
193
    if msg['type'] != "ganeti-create-progress":
194
        log.error("Message is of unknown type %s", msg['type'])
195
        return
196

    
197
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
198

    
199
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
200
              msg['instance'])
201

    
202

    
203
def dummy_proc(client, message, *args, **kwargs):
204
    try:
205
        log.debug("Msg: %s", message['body'])
206
        client.basic_ack(message)
207
    except Exception as e:
208
        log.exception("Could not receive message")