Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ a17a8e98

History | View | Annotate | Download (7.1 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 86f046a8 Giorgos Verigakis
import logging
34 23c84263 Georgios Gousios
import json
35 c4e55622 Christos Stavrakakis
from functools import wraps
36 c4e55622 Christos Stavrakakis
from datetime import datetime
37 23c84263 Georgios Gousios
38 23c84263 Georgios Gousios
from synnefo.db.models import VirtualMachine
39 1ed37c1d Giorgos Verigakis
from synnefo.logic import utils, backend
40 23c84263 Georgios Gousios
41 c4e55622 Christos Stavrakakis
from synnefo.lib.utils import merge_time
42 c4e55622 Christos Stavrakakis
43 86f046a8 Giorgos Verigakis
44 86f046a8 Giorgos Verigakis
log = logging.getLogger()
45 23c84263 Georgios Gousios
46 95aee02c Vangelis Koukis
47 a17a8e98 Christos Stavrakakis
def handle_message_delivery(func):
48 a17a8e98 Christos Stavrakakis
    """ Generic decorator for handling messages.
49 c4e55622 Christos Stavrakakis

50 a17a8e98 Christos Stavrakakis
    This decorator is responsible for converting the message into json format,
51 a17a8e98 Christos Stavrakakis
    handling of common exceptions and acknowledment of message if needed.
52 c4e55622 Christos Stavrakakis

53 c4e55622 Christos Stavrakakis
    """
54 c4e55622 Christos Stavrakakis
    @wraps(func)
55 c4e55622 Christos Stavrakakis
    def wrapper(client, message, *args, **kwargs):
56 c4e55622 Christos Stavrakakis
        try:
57 c4e55622 Christos Stavrakakis
            msg = json.loads(message['body'])
58 a17a8e98 Christos Stavrakakis
            func(msg)
59 a17a8e98 Christos Stavrakakis
            client.basic_ack(message)
60 c4e55622 Christos Stavrakakis
        except ValueError:
61 c4e55622 Christos Stavrakakis
            log.error("Incoming message not in JSON format: %s", message)
62 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
63 c4e55622 Christos Stavrakakis
        except KeyError:
64 c4e55622 Christos Stavrakakis
            log.error("Malformed incoming JSON, missing attributes: %s",
65 c4e55622 Christos Stavrakakis
                      message)
66 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
67 a17a8e98 Christos Stavrakakis
        except Exception as e:
68 a17a8e98 Christos Stavrakakis
            log.exception("Unexpected error: %s, msg: %s", e, msg)
69 a17a8e98 Christos Stavrakakis
70 a17a8e98 Christos Stavrakakis
    return wrapper
71 a17a8e98 Christos Stavrakakis
72 a17a8e98 Christos Stavrakakis
def instance_from_msg(func):
73 a17a8e98 Christos Stavrakakis
    """ Decorator for getting the VirtualMachine object of the msg.
74 a17a8e98 Christos Stavrakakis

75 a17a8e98 Christos Stavrakakis
    """
76 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
77 a17a8e98 Christos Stavrakakis
    @wraps(func)
78 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
79 a17a8e98 Christos Stavrakakis
        try:
80 a17a8e98 Christos Stavrakakis
            vm_id = utils.id_from_instance_name(msg["instance"])
81 a17a8e98 Christos Stavrakakis
            vm = VirtualMachine.objects.get(id=vm_id)
82 a17a8e98 Christos Stavrakakis
            func(vm, msg)
83 c4e55622 Christos Stavrakakis
        except VirtualMachine.InvalidBackendIdError:
84 c4e55622 Christos Stavrakakis
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
85 c4e55622 Christos Stavrakakis
        except VirtualMachine.DoesNotExist:
86 c4e55622 Christos Stavrakakis
            log.error("VM for instance %s with id %d not found in DB.",
87 c4e55622 Christos Stavrakakis
                      msg['instance'], vm_id)
88 a17a8e98 Christos Stavrakakis
    return wrapper
89 a17a8e98 Christos Stavrakakis
90 a17a8e98 Christos Stavrakakis
def network_from_msg(func):
91 a17a8e98 Christos Stavrakakis
    """ Decorator for getting the Network object of the msg.
92 a17a8e98 Christos Stavrakakis

93 a17a8e98 Christos Stavrakakis
    """
94 a17a8e98 Christos Stavrakakis
    @handle_message_delivery
95 a17a8e98 Christos Stavrakakis
    @wraps(func)
96 a17a8e98 Christos Stavrakakis
    def wrapper(msg):
97 a17a8e98 Christos Stavrakakis
        try:
98 a17a8e98 Christos Stavrakakis
            network_id = utils.id_from_network_name(msg["network"])
99 a17a8e98 Christos Stavrakakis
            network = Network.objects.get(id=network_id)
100 a17a8e98 Christos Stavrakakis
            func(network, msg)
101 a17a8e98 Christos Stavrakakis
        except Network.InvalidBackendIdError:
102 a17a8e98 Christos Stavrakakis
            log.debug("Ignoring msg for unknown network %s.", msg['network'])
103 a17a8e98 Christos Stavrakakis
        except Network.DoesNotExist:
104 a17a8e98 Christos Stavrakakis
            log.error("Network %s with id %d not found in DB.",
105 a17a8e98 Christos Stavrakakis
                      msg['network'], vm_id)
106 a17a8e98 Christos Stavrakakis
    return wrapper
107 a17a8e98 Christos Stavrakakis
108 a17a8e98 Christos Stavrakakis
def if_update_required(func):
109 a17a8e98 Christos Stavrakakis
    """
110 a17a8e98 Christos Stavrakakis
    Decorator for checking if an incoming message needs to update the db.
111 a17a8e98 Christos Stavrakakis

112 a17a8e98 Christos Stavrakakis
    The database will not be updated in the following cases:
113 a17a8e98 Christos Stavrakakis
    - The message has been redelivered and the action has already been
114 a17a8e98 Christos Stavrakakis
      completed. In this case the event_time will be equal with the one
115 a17a8e98 Christos Stavrakakis
      in the database.
116 a17a8e98 Christos Stavrakakis
    - The message describes a previous state in the ganeti, from the one that is
117 a17a8e98 Christos Stavrakakis
      described in the db. In this case the event_time will be smaller from the
118 a17a8e98 Christos Stavrakakis
      one in the database.
119 a17a8e98 Christos Stavrakakis

120 a17a8e98 Christos Stavrakakis
    """
121 a17a8e98 Christos Stavrakakis
    @wraps(func)
122 a17a8e98 Christos Stavrakakis
    def wrapper(target, msg):
123 a17a8e98 Christos Stavrakakis
        event_time = merge_time(msg['event_time'])
124 a17a8e98 Christos Stavrakakis
        db_time = target.backendtime
125 a17a8e98 Christos Stavrakakis
126 a17a8e98 Christos Stavrakakis
        if event_time <= db_time:
127 a17a8e98 Christos Stavrakakis
            format_ = "%d/%m/%y %H:%M:%S:%f"
128 a17a8e98 Christos Stavrakakis
            log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
129 a17a8e98 Christos Stavrakakis
                      msg,
130 a17a8e98 Christos Stavrakakis
                      event_time.strftime(format_),
131 a17a8e98 Christos Stavrakakis
                      db_time.strftime(format_))
132 a17a8e98 Christos Stavrakakis
            return
133 a17a8e98 Christos Stavrakakis
        # New message. Update the database!
134 a17a8e98 Christos Stavrakakis
        func(target, msg, event_time)
135 c4e55622 Christos Stavrakakis
136 c4e55622 Christos Stavrakakis
    return wrapper
137 c4e55622 Christos Stavrakakis
138 c4e55622 Christos Stavrakakis
139 a17a8e98 Christos Stavrakakis
@instance_from_msg
140 a17a8e98 Christos Stavrakakis
@if_update_required
141 a17a8e98 Christos Stavrakakis
def update_db(vm, msg, event_time):
142 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-op-status'"""
143 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-op-status msg: %s", msg)
144 23c84263 Georgios Gousios
145 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-op-status":
146 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
147 c4e55622 Christos Stavrakakis
        return
148 7ca9e930 Vangelis Koukis
149 c4e55622 Christos Stavrakakis
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
150 c4e55622 Christos Stavrakakis
                              msg['status'], msg['logmsg'])
151 c4e55622 Christos Stavrakakis
152 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-op-status msg for vm %s.",
153 c4e55622 Christos Stavrakakis
              msg['instance'])
154 c4e55622 Christos Stavrakakis
155 c4e55622 Christos Stavrakakis
156 a17a8e98 Christos Stavrakakis
@instance_from_msg
157 a17a8e98 Christos Stavrakakis
@if_update_required
158 a17a8e98 Christos Stavrakakis
def update_net(vm, msg, event_time):
159 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-net-status'"""
160 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-net-status msg: %s", msg)
161 7ca9e930 Vangelis Koukis
162 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-net-status":
163 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
164 c4e55622 Christos Stavrakakis
        return
165 9068cd85 Georgios Gousios
166 c4e55622 Christos Stavrakakis
    backend.process_net_status(vm, event_time, msg['nics'])
167 604b2bf8 Georgios Gousios
168 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-net-status msg for vm %s.",
169 c4e55622 Christos Stavrakakis
              msg["instance"])
170 604b2bf8 Georgios Gousios
171 604b2bf8 Georgios Gousios
172 a17a8e98 Christos Stavrakakis
@network_from_msg
173 a17a8e98 Christos Stavrakakis
@if_update_required
174 a17a8e98 Christos Stavrakakis
def update_network(network, msg, event_time):
175 a17a8e98 Christos Stavrakakis
    """Process a notification of type 'ganeti-network-status'"""
176 a17a8e98 Christos Stavrakakis
    log.debug("Processing ganeti-network-status msg: %s", msg)
177 a17a8e98 Christos Stavrakakis
178 a17a8e98 Christos Stavrakakis
    if msg['type'] != "ganeti-network-status":
179 a17a8e98 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
180 a17a8e98 Christos Stavrakakis
        return
181 a17a8e98 Christos Stavrakakis
182 a17a8e98 Christos Stavrakakis
183 a17a8e98 Christos Stavrakakis
    log.debug("Done processing ganeti-network-status msg for vm %s.",
184 a17a8e98 Christos Stavrakakis
              msg['instance'])
185 a17a8e98 Christos Stavrakakis
186 a17a8e98 Christos Stavrakakis
187 a17a8e98 Christos Stavrakakis
@instance_from_msg
188 a17a8e98 Christos Stavrakakis
@if_update_required
189 a17a8e98 Christos Stavrakakis
def update_build_progress(vm, msg, event_time):
190 c4e55622 Christos Stavrakakis
    """Process a create progress message"""
191 33b93f81 Christos Stavrakakis
    log.debug("Processing ganeti-create-progress msg: %s", msg)
192 604b2bf8 Georgios Gousios
193 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-create-progress":
194 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
195 c4e55622 Christos Stavrakakis
        return
196 c25cc9ec Vangelis Koukis
197 c4e55622 Christos Stavrakakis
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
198 604b2bf8 Georgios Gousios
199 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
200 c4e55622 Christos Stavrakakis
              msg['instance'])
201 604b2bf8 Georgios Gousios
202 604b2bf8 Georgios Gousios
203 a17a8e98 Christos Stavrakakis
def dummy_proc(client, message, *args, **kwargs):
204 23c84263 Georgios Gousios
    try:
205 c4e55622 Christos Stavrakakis
        log.debug("Msg: %s", message['body'])
206 33b93f81 Christos Stavrakakis
        client.basic_ack(message)
207 23c84263 Georgios Gousios
    except Exception as e:
208 86f046a8 Giorgos Verigakis
        log.exception("Could not receive message")