Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ f2264859

History | View | Annotate | Download (6.3 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36
from datetime import datetime
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend
40

    
41
from synnefo.lib.utils import merge_time
42

    
43

    
44
log = logging.getLogger()
45

    
46

    
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
58

59
    This decorator is also acknowledging the messages to the AMQP broker.
60

61
    """
62
    @wraps(func)
63
    def wrapper(client, message, *args, **kwargs):
64
        log.debug("Checking if action is required for msg %s",  message)
65

    
66
        try:
67
            msg = json.loads(message['body'])
68

    
69
            event_time = merge_time(msg['event_time'])
70

    
71
            vm_id = utils.id_from_instance_name(msg["instance"])
72
            vm = VirtualMachine.objects.get(id=vm_id)
73

    
74
            db_time = vm.backendtime
75
            if event_time <= db_time:
76
                format_ = "%d/%m/%y %H:%M:%S:%f"
77
                log.debug("Ignoring message. event_timestamp: %s db_timestamp: %s",
78
                          event_time.strftime(format_),
79
                          db_time.strftime(format_))
80
                client.basic_ack(message)
81
                return
82

    
83
            # New message. Update the database!
84
            func(client, message)
85

    
86
        except ValueError:
87
            log.error("Incoming message not in JSON format: %s", message)
88
            client.basic_ack(message)
89
        except KeyError:
90
            log.error("Malformed incoming JSON, missing attributes: %s",
91
                      message)
92
            client.basic_ack(message)
93
        except VirtualMachine.InvalidBackendIdError:
94
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
95
            client.basic_ack(message)
96
        except VirtualMachine.DoesNotExist:
97
            log.error("VM for instance %s with id %d not found in DB.",
98
                      msg['instance'], vm_id)
99
            client.basic_ack(message)
100
        except Exception as e:
101
            log.exception("Unexpected error: %s, msg: %s", e, msg)
102
        else:
103
            # Acknowledge the message
104
            client.basic_ack(message)
105

    
106
    return wrapper
107

    
108

    
109
@is_update_required
110
def update_db(client, message):
111
    """Process a notification of type 'ganeti-op-status'"""
112
    log.debug("Processing ganeti-op-status msg: %s", message['body'])
113

    
114
    msg = json.loads(message['body'])
115

    
116
    if msg['type'] != "ganeti-op-status":
117
        log.error("Message is of unknown type %s.", msg['type'])
118
        return
119

    
120
    vm_id = utils.id_from_instance_name(msg['instance'])
121
    vm = VirtualMachine.objects.get(id=vm_id)
122

    
123
    event_time = merge_time(msg['event_time'])
124
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
125
                              msg['status'], msg['logmsg'])
126

    
127
    log.debug("Done processing ganeti-op-status msg for vm %s.",
128
              msg['instance'])
129

    
130

    
131
@is_update_required
132
def update_net(client, message):
133
    """Process a notification of type 'ganeti-net-status'"""
134
    log.debug("Processing ganeti-net-status msg: %s", message['body'])
135

    
136
    msg = json.loads(message['body'])
137

    
138
    if msg['type'] != "ganeti-net-status":
139
        log.error("Message is of unknown type %s", msg['type'])
140
        return
141

    
142
    vm_id = utils.id_from_instance_name(msg['instance'])
143
    vm = VirtualMachine.objects.get(id=vm_id)
144

    
145
    event_time = merge_time(msg['event_time'])
146
    backend.process_net_status(vm, event_time, msg['nics'])
147

    
148
    log.debug("Done processing ganeti-net-status msg for vm %s.",
149
              msg["instance"])
150

    
151

    
152
@is_update_required
153
def update_build_progress(client, message):
154
    """Process a create progress message"""
155
    log.debug("Processing ganeti-create-progress msg: %s", message['body'])
156

    
157
    msg = json.loads(message['body'])
158

    
159
    if msg['type'] != "ganeti-create-progress":
160
        log.error("Message is of unknown type %s", msg['type'])
161
        return
162

    
163
    vm_id = utils.id_from_instance_name(msg['instance'])
164
    vm = VirtualMachine.objects.get(id=vm_id)
165

    
166
    event_time = merge_time(msg['event_time'])
167
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
168

    
169
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
170
              msg['instance'])
171

    
172

    
173
@is_update_required
174
def dummy_proc(client, message):
175
    try:
176
        log.debug("Msg: %s", message['body'])
177
        pass
178
    except Exception as e:
179
        log.exception("Could not receive message")