Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ c4e55622

History | View | Annotate | Download (7.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36
from datetime import datetime
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend
40

    
41
from synnefo.lib.utils import merge_time
42

    
43

    
44
log = logging.getLogger()
45

    
46

    
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
58

59
    This decorator is also acknowledging the messages to the AMQP broker.
60

61
    """
62
    @wraps(func)
63
    def wrapper(client, message, *args, **kwargs):
64
        log.debug("Checking if action is required for msg %s",  message)
65

    
66
        try:
67
            msg = json.loads(message['body'])
68

    
69
            event_time = merge_time(msg['event_time'])
70

    
71
            vm_id = utils.id_from_instance_name(msg["instance"])
72
            vm = VirtualMachine.objects.get(id=vm_id)
73

    
74
            db_time = vm.backendtime
75
            if event_time <= db_time:
76
                format_ = "%d/%m/%y %H:%M:%S:%f"
77
                log.debug("Ignoring message. event_timestamp: %s db_timestamp: %s",
78
                          event_time.strftime(format_),
79
                          db_time.strftime(format_))
80
                client.basic_ack(message)
81
                return
82

    
83
            # New message. Update the database!
84
            func(client, message)
85

    
86
        except ValueError:
87
            log.error("Incoming message not in JSON format: %s", message)
88
            client.basic_ack(message)
89
        except KeyError:
90
            log.error("Malformed incoming JSON, missing attributes: %s",
91
                      message)
92
            client.basic_ack(message)
93
        except VirtualMachine.InvalidBackendIdError:
94
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
95
            client.basic_ack(message)
96
        except VirtualMachine.DoesNotExist:
97
            log.error("VM for instance %s with id %d not found in DB.",
98
                      msg['instance'], vm_id)
99
            client.basic_ack(message)
100
        except Exception as e:
101
            log.exception("Unexpected error: %s, msg: %s", e, msg)
102
        else:
103
            # Acknowledge the message
104
            client.basic_ack(message)
105

    
106
    return wrapper
107

    
108

    
109
@is_update_required
110
def update_db(client, message):
111
    """Process a notification of type 'ganeti-op-status'"""
112
    log.debug("Processing ganeti-op-status msg: %s", message['body'])
113

    
114
    msg = json.loads(message['body'])
115

    
116
    if msg['type'] != "ganeti-op-status":
117
        log.error("Message is of unknown type %s.", msg['type'])
118
        return
119

    
120
    if msg['operation'] == "OP_INSTANCE_QUERY_DATA":
121
        return status_job_finished(client, message)
122

    
123
    vm_id = utils.id_from_instance_name(msg['instance'])
124
    vm = VirtualMachine.objects.get(id=vm_id)
125

    
126
    event_time = merge_time(msg['event_time'])
127
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
128
                              msg['status'], msg['logmsg'])
129

    
130
    log.debug("Done processing ganeti-op-status msg for vm %s.",
131
              msg['instance'])
132

    
133

    
134
@is_update_required
135
def update_net(client, message):
136
    """Process a notification of type 'ganeti-net-status'"""
137
    log.debug("Processing ganeti-net-status msg: %s", message['body'])
138

    
139
    msg = json.loads(message['body'])
140

    
141
    if msg['type'] != "ganeti-net-status":
142
        log.error("Message is of unknown type %s", msg['type'])
143
        return
144

    
145
    vm_id = utils.id_from_instance_name(msg['instance'])
146
    vm = VirtualMachine.objects.get(id=vm_id)
147

    
148
    event_time = merge_time(msg['event_time'])
149
    backend.process_net_status(vm, event_time, msg['nics'])
150

    
151
    log.debug("Done processing ganeti-net-status msg for vm %s.",
152
              msg["instance"])
153

    
154

    
155
@is_update_required
156
def update_build_progress(client, message):
157
    """Process a create progress message"""
158
    log.debug("Processing ganeti-create-progress msg: %s", message['body'])
159

    
160
    msg = json.loads(message['body'])
161

    
162
    if msg['type'] != "ganeti-create-progress":
163
        log.error("Message is of unknown type %s", msg['type'])
164
        return
165

    
166
    vm_id = utils.id_from_instance_name(msg['instance'])
167
    vm = VirtualMachine.objects.get(id=vm_id)
168

    
169
    event_time = merge_time(msg['event_time'])
170
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
171

    
172
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
173
              msg['instance'])
174

    
175

    
176
def status_job_finished(client, message):
177
    """Updates VM status based on a previously sent status update request"""
178

    
179
    msg = json.loads(message['body'])
180

    
181
    if msg['operation'] != 'OP_INSTANCE_QUERY_DATA':
182
        log.error("Message is of unknown type %s", msg['operation'])
183
        return
184

    
185
    if msg['status'] != 'success':
186
        log.warn("Ignoring non-success status update from job %d on VM %s",
187
                 msg['jobId'], msg['instance'])
188
        client.basic_ack(message.delivery_tag)
189
        return
190

    
191
    status = backend.get_job_status(msg['jobId'])
192

    
193
    log.debug("Node status job result: %s", status)
194

    
195
    if status['summary'][0] != u'INSTANCE_QUERY_DATA':
196
         log.error("Status update is of unknown type %s",
197
                    status['summary'])
198
         return
199

    
200
    conf_state = status['opresult'][0][msg['instance']]['config_state']
201
    run_state = status['opresult'][0][msg['instance']]['run_state']
202

    
203
    vm_id = utils.id_from_instance_name(msg['instance'])
204
    vm = VirtualMachine.objects.get(id = vm_id)
205

    
206
    if run_state == "up":
207
        opcode = "OP_INSTANCE_REBOOT"
208
    else:
209
        opcode = "OP_INSTANCE_SHUTDOWN"
210

    
211
    event_time = merge_time(msg['event_time'])
212
    backend.process_op_status(vm=vm, etime=event_time,
213
                              jobid=msg['jobId'], opcode=opcode,
214
                              status='success',
215
                              logmsg="Reconciliation: simulated event")
216

    
217
@is_update_required
218
def dummy_proc(client, message):
219
    try:
220
        log.debug("Msg: %s", message['body'])
221
        pass
222
    except Exception as e:
223
        log.exception("Could not receive message")