Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 33b93f81

History | View | Annotate | Download (5.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import json
35
from functools import wraps
36
from datetime import datetime
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend
40

    
41
from synnefo.lib.utils import merge_time
42

    
43

    
44
log = logging.getLogger()
45

    
46

    
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
58

59
    This decorator is also acknowledging the messages to the AMQP broker.
60

61
    """
62
    @wraps(func)
63
    def wrapper(client, message, *args, **kwargs):
64
        try:
65
            msg = json.loads(message['body'])
66

    
67
            event_time = merge_time(msg['event_time'])
68

    
69
            vm_id = utils.id_from_instance_name(msg["instance"])
70
            vm = VirtualMachine.objects.get(id=vm_id)
71

    
72
            db_time = vm.backendtime
73
            if event_time <= db_time:
74
                format_ = "%d/%m/%y %H:%M:%S:%f"
75
                log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
76
                          message,
77
                          event_time.strftime(format_),
78
                          db_time.strftime(format_))
79
                client.basic_ack(message)
80
                return
81

    
82
            # New message. Update the database!
83
            func(vm, msg)
84

    
85
        except ValueError:
86
            log.error("Incoming message not in JSON format: %s", message)
87
            client.basic_ack(message)
88
        except KeyError:
89
            log.error("Malformed incoming JSON, missing attributes: %s",
90
                      message)
91
            client.basic_ack(message)
92
        except VirtualMachine.InvalidBackendIdError:
93
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
94
            client.basic_ack(message)
95
        except VirtualMachine.DoesNotExist:
96
            log.error("VM for instance %s with id %d not found in DB.",
97
                      msg['instance'], vm_id)
98
            client.basic_ack(message)
99
        except Exception as e:
100
            log.exception("Unexpected error: %s, msg: %s", e, msg)
101
        else:
102
            # Acknowledge the message
103
            client.basic_ack(message)
104

    
105
    return wrapper
106

    
107

    
108
@is_update_required
109
def update_db(vm, msg):
110
    """Process a notification of type 'ganeti-op-status'"""
111
    log.debug("Processing ganeti-op-status msg: %s", msg)
112

    
113
    if msg['type'] != "ganeti-op-status":
114
        log.error("Message is of unknown type %s.", msg['type'])
115
        return
116

    
117
    event_time = merge_time(msg['event_time'])
118
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
119
                              msg['status'], msg['logmsg'])
120

    
121
    log.debug("Done processing ganeti-op-status msg for vm %s.",
122
              msg['instance'])
123

    
124

    
125
@is_update_required
126
def update_net(vm, msg):
127
    """Process a notification of type 'ganeti-net-status'"""
128
    log.debug("Processing ganeti-net-status msg: %s", msg)
129

    
130
    if msg['type'] != "ganeti-net-status":
131
        log.error("Message is of unknown type %s", msg['type'])
132
        return
133

    
134
    event_time = merge_time(msg['event_time'])
135
    backend.process_net_status(vm, event_time, msg['nics'])
136

    
137
    log.debug("Done processing ganeti-net-status msg for vm %s.",
138
              msg["instance"])
139

    
140

    
141
@is_update_required
142
def update_build_progress(vm, msg):
143
    """Process a create progress message"""
144
    log.debug("Processing ganeti-create-progress msg: %s", msg)
145

    
146
    if msg['type'] != "ganeti-create-progress":
147
        log.error("Message is of unknown type %s", msg['type'])
148
        return
149

    
150
    event_time = merge_time(msg['event_time'])
151
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
152

    
153
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
154
              msg['instance'])
155

    
156

    
157
def dummy_proc(client, message):
158
    try:
159
        log.debug("Msg: %s", message['body'])
160
        client.basic_ack(message)
161
    except Exception as e:
162
        log.exception("Could not receive message")