Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 9c766317

History | View | Annotate | Download (6.3 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 86f046a8 Giorgos Verigakis
import logging
34 23c84263 Georgios Gousios
import json
35 c4e55622 Christos Stavrakakis
from functools import wraps
36 c4e55622 Christos Stavrakakis
from datetime import datetime
37 23c84263 Georgios Gousios
38 23c84263 Georgios Gousios
from synnefo.db.models import VirtualMachine
39 1ed37c1d Giorgos Verigakis
from synnefo.logic import utils, backend
40 23c84263 Georgios Gousios
41 c4e55622 Christos Stavrakakis
from synnefo.lib.utils import merge_time
42 c4e55622 Christos Stavrakakis
43 86f046a8 Giorgos Verigakis
44 86f046a8 Giorgos Verigakis
log = logging.getLogger()
45 23c84263 Georgios Gousios
46 95aee02c Vangelis Koukis
47 c4e55622 Christos Stavrakakis
def is_update_required(func):
48 c4e55622 Christos Stavrakakis
    """
49 c4e55622 Christos Stavrakakis
    Decorator for checking if an incoming message needs to update the db.
50 c4e55622 Christos Stavrakakis

51 c4e55622 Christos Stavrakakis
    The database will not be updated in the following cases:
52 c4e55622 Christos Stavrakakis
    - The message has been redelivered and the action has already been
53 c4e55622 Christos Stavrakakis
      completed. In this case the event_time will be equal with the one
54 c4e55622 Christos Stavrakakis
      in the database.
55 c4e55622 Christos Stavrakakis
    - The message describes a previous state in the ganeti, from the one that is
56 c4e55622 Christos Stavrakakis
      described in the db. In this case the event_time will be smaller from the
57 c4e55622 Christos Stavrakakis
      one in the database.
58 c4e55622 Christos Stavrakakis

59 c4e55622 Christos Stavrakakis
    This decorator is also acknowledging the messages to the AMQP broker.
60 c4e55622 Christos Stavrakakis

61 c4e55622 Christos Stavrakakis
    """
62 c4e55622 Christos Stavrakakis
    @wraps(func)
63 c4e55622 Christos Stavrakakis
    def wrapper(client, message, *args, **kwargs):
64 c4e55622 Christos Stavrakakis
        try:
65 c4e55622 Christos Stavrakakis
            msg = json.loads(message['body'])
66 c4e55622 Christos Stavrakakis
67 c4e55622 Christos Stavrakakis
            event_time = merge_time(msg['event_time'])
68 c4e55622 Christos Stavrakakis
69 c4e55622 Christos Stavrakakis
            vm_id = utils.id_from_instance_name(msg["instance"])
70 c4e55622 Christos Stavrakakis
            vm = VirtualMachine.objects.get(id=vm_id)
71 c4e55622 Christos Stavrakakis
72 c4e55622 Christos Stavrakakis
            db_time = vm.backendtime
73 c4e55622 Christos Stavrakakis
            if event_time <= db_time:
74 c4e55622 Christos Stavrakakis
                format_ = "%d/%m/%y %H:%M:%S:%f"
75 9c766317 Christos Stavrakakis
                log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
76 9c766317 Christos Stavrakakis
                          message,
77 c4e55622 Christos Stavrakakis
                          event_time.strftime(format_),
78 c4e55622 Christos Stavrakakis
                          db_time.strftime(format_))
79 c4e55622 Christos Stavrakakis
                client.basic_ack(message)
80 c4e55622 Christos Stavrakakis
                return
81 c4e55622 Christos Stavrakakis
82 c4e55622 Christos Stavrakakis
            # New message. Update the database!
83 c4e55622 Christos Stavrakakis
            func(client, message)
84 c4e55622 Christos Stavrakakis
85 c4e55622 Christos Stavrakakis
        except ValueError:
86 c4e55622 Christos Stavrakakis
            log.error("Incoming message not in JSON format: %s", message)
87 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
88 c4e55622 Christos Stavrakakis
        except KeyError:
89 c4e55622 Christos Stavrakakis
            log.error("Malformed incoming JSON, missing attributes: %s",
90 c4e55622 Christos Stavrakakis
                      message)
91 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
92 c4e55622 Christos Stavrakakis
        except VirtualMachine.InvalidBackendIdError:
93 c4e55622 Christos Stavrakakis
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
94 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
95 c4e55622 Christos Stavrakakis
        except VirtualMachine.DoesNotExist:
96 c4e55622 Christos Stavrakakis
            log.error("VM for instance %s with id %d not found in DB.",
97 c4e55622 Christos Stavrakakis
                      msg['instance'], vm_id)
98 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
99 c4e55622 Christos Stavrakakis
        except Exception as e:
100 c4e55622 Christos Stavrakakis
            log.exception("Unexpected error: %s, msg: %s", e, msg)
101 c4e55622 Christos Stavrakakis
        else:
102 c4e55622 Christos Stavrakakis
            # Acknowledge the message
103 c4e55622 Christos Stavrakakis
            client.basic_ack(message)
104 c4e55622 Christos Stavrakakis
105 c4e55622 Christos Stavrakakis
    return wrapper
106 c4e55622 Christos Stavrakakis
107 c4e55622 Christos Stavrakakis
108 c4e55622 Christos Stavrakakis
@is_update_required
109 c4e55622 Christos Stavrakakis
def update_db(client, message):
110 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-op-status'"""
111 c4e55622 Christos Stavrakakis
    log.debug("Processing ganeti-op-status msg: %s", message['body'])
112 c4e55622 Christos Stavrakakis
113 c4e55622 Christos Stavrakakis
    msg = json.loads(message['body'])
114 23c84263 Georgios Gousios
115 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-op-status":
116 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s.", msg['type'])
117 c4e55622 Christos Stavrakakis
        return
118 7ca9e930 Vangelis Koukis
119 c4e55622 Christos Stavrakakis
    vm_id = utils.id_from_instance_name(msg['instance'])
120 c4e55622 Christos Stavrakakis
    vm = VirtualMachine.objects.get(id=vm_id)
121 c4e55622 Christos Stavrakakis
122 c4e55622 Christos Stavrakakis
    event_time = merge_time(msg['event_time'])
123 c4e55622 Christos Stavrakakis
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
124 c4e55622 Christos Stavrakakis
                              msg['status'], msg['logmsg'])
125 c4e55622 Christos Stavrakakis
126 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-op-status msg for vm %s.",
127 c4e55622 Christos Stavrakakis
              msg['instance'])
128 c4e55622 Christos Stavrakakis
129 c4e55622 Christos Stavrakakis
130 c4e55622 Christos Stavrakakis
@is_update_required
131 c4e55622 Christos Stavrakakis
def update_net(client, message):
132 c25cc9ec Vangelis Koukis
    """Process a notification of type 'ganeti-net-status'"""
133 c4e55622 Christos Stavrakakis
    log.debug("Processing ganeti-net-status msg: %s", message['body'])
134 7ca9e930 Vangelis Koukis
135 c4e55622 Christos Stavrakakis
    msg = json.loads(message['body'])
136 7ca9e930 Vangelis Koukis
137 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-net-status":
138 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
139 c4e55622 Christos Stavrakakis
        return
140 9068cd85 Georgios Gousios
141 c4e55622 Christos Stavrakakis
    vm_id = utils.id_from_instance_name(msg['instance'])
142 c4e55622 Christos Stavrakakis
    vm = VirtualMachine.objects.get(id=vm_id)
143 9068cd85 Georgios Gousios
144 c4e55622 Christos Stavrakakis
    event_time = merge_time(msg['event_time'])
145 c4e55622 Christos Stavrakakis
    backend.process_net_status(vm, event_time, msg['nics'])
146 604b2bf8 Georgios Gousios
147 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-net-status msg for vm %s.",
148 c4e55622 Christos Stavrakakis
              msg["instance"])
149 604b2bf8 Georgios Gousios
150 604b2bf8 Georgios Gousios
151 c4e55622 Christos Stavrakakis
@is_update_required
152 c4e55622 Christos Stavrakakis
def update_build_progress(client, message):
153 c4e55622 Christos Stavrakakis
    """Process a create progress message"""
154 c4e55622 Christos Stavrakakis
    log.debug("Processing ganeti-create-progress msg: %s", message['body'])
155 604b2bf8 Georgios Gousios
156 c4e55622 Christos Stavrakakis
    msg = json.loads(message['body'])
157 604b2bf8 Georgios Gousios
158 c4e55622 Christos Stavrakakis
    if msg['type'] != "ganeti-create-progress":
159 c4e55622 Christos Stavrakakis
        log.error("Message is of unknown type %s", msg['type'])
160 c4e55622 Christos Stavrakakis
        return
161 c25cc9ec Vangelis Koukis
162 c4e55622 Christos Stavrakakis
    vm_id = utils.id_from_instance_name(msg['instance'])
163 c4e55622 Christos Stavrakakis
    vm = VirtualMachine.objects.get(id=vm_id)
164 604b2bf8 Georgios Gousios
165 c4e55622 Christos Stavrakakis
    event_time = merge_time(msg['event_time'])
166 c4e55622 Christos Stavrakakis
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
167 604b2bf8 Georgios Gousios
168 c4e55622 Christos Stavrakakis
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
169 c4e55622 Christos Stavrakakis
              msg['instance'])
170 604b2bf8 Georgios Gousios
171 604b2bf8 Georgios Gousios
172 c4e55622 Christos Stavrakakis
@is_update_required
173 c4e55622 Christos Stavrakakis
def dummy_proc(client, message):
174 23c84263 Georgios Gousios
    try:
175 c4e55622 Christos Stavrakakis
        log.debug("Msg: %s", message['body'])
176 23c84263 Georgios Gousios
    except Exception as e:
177 86f046a8 Giorgos Verigakis
        log.exception("Could not receive message")