Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 9c766317

History | View | Annotate | Download (6.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36
from datetime import datetime
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend
40

    
41
from synnefo.lib.utils import merge_time
42

    
43

    
44
log = logging.getLogger()
45

    
46

    
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
58

59
    This decorator is also acknowledging the messages to the AMQP broker.
60

61
    """
62
    @wraps(func)
63
    def wrapper(client, message, *args, **kwargs):
64
        try:
65
            msg = json.loads(message['body'])
66

    
67
            event_time = merge_time(msg['event_time'])
68

    
69
            vm_id = utils.id_from_instance_name(msg["instance"])
70
            vm = VirtualMachine.objects.get(id=vm_id)
71

    
72
            db_time = vm.backendtime
73
            if event_time <= db_time:
74
                format_ = "%d/%m/%y %H:%M:%S:%f"
75
                log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
76
                          message,
77
                          event_time.strftime(format_),
78
                          db_time.strftime(format_))
79
                client.basic_ack(message)
80
                return
81

    
82
            # New message. Update the database!
83
            func(client, message)
84

    
85
        except ValueError:
86
            log.error("Incoming message not in JSON format: %s", message)
87
            client.basic_ack(message)
88
        except KeyError:
89
            log.error("Malformed incoming JSON, missing attributes: %s",
90
                      message)
91
            client.basic_ack(message)
92
        except VirtualMachine.InvalidBackendIdError:
93
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
94
            client.basic_ack(message)
95
        except VirtualMachine.DoesNotExist:
96
            log.error("VM for instance %s with id %d not found in DB.",
97
                      msg['instance'], vm_id)
98
            client.basic_ack(message)
99
        except Exception as e:
100
            log.exception("Unexpected error: %s, msg: %s", e, msg)
101
        else:
102
            # Acknowledge the message
103
            client.basic_ack(message)
104

    
105
    return wrapper
106

    
107

    
108
@is_update_required
109
def update_db(client, message):
110
    """Process a notification of type 'ganeti-op-status'"""
111
    log.debug("Processing ganeti-op-status msg: %s", message['body'])
112

    
113
    msg = json.loads(message['body'])
114

    
115
    if msg['type'] != "ganeti-op-status":
116
        log.error("Message is of unknown type %s.", msg['type'])
117
        return
118

    
119
    vm_id = utils.id_from_instance_name(msg['instance'])
120
    vm = VirtualMachine.objects.get(id=vm_id)
121

    
122
    event_time = merge_time(msg['event_time'])
123
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
124
                              msg['status'], msg['logmsg'])
125

    
126
    log.debug("Done processing ganeti-op-status msg for vm %s.",
127
              msg['instance'])
128

    
129

    
130
@is_update_required
131
def update_net(client, message):
132
    """Process a notification of type 'ganeti-net-status'"""
133
    log.debug("Processing ganeti-net-status msg: %s", message['body'])
134

    
135
    msg = json.loads(message['body'])
136

    
137
    if msg['type'] != "ganeti-net-status":
138
        log.error("Message is of unknown type %s", msg['type'])
139
        return
140

    
141
    vm_id = utils.id_from_instance_name(msg['instance'])
142
    vm = VirtualMachine.objects.get(id=vm_id)
143

    
144
    event_time = merge_time(msg['event_time'])
145
    backend.process_net_status(vm, event_time, msg['nics'])
146

    
147
    log.debug("Done processing ganeti-net-status msg for vm %s.",
148
              msg["instance"])
149

    
150

    
151
@is_update_required
152
def update_build_progress(client, message):
153
    """Process a create progress message"""
154
    log.debug("Processing ganeti-create-progress msg: %s", message['body'])
155

    
156
    msg = json.loads(message['body'])
157

    
158
    if msg['type'] != "ganeti-create-progress":
159
        log.error("Message is of unknown type %s", msg['type'])
160
        return
161

    
162
    vm_id = utils.id_from_instance_name(msg['instance'])
163
    vm = VirtualMachine.objects.get(id=vm_id)
164

    
165
    event_time = merge_time(msg['event_time'])
166
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
167

    
168
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
169
              msg['instance'])
170

    
171

    
172
@is_update_required
173
def dummy_proc(client, message):
174
    try:
175
        log.debug("Msg: %s", message['body'])
176
    except Exception as e:
177
        log.exception("Could not receive message")