Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher_callbacks.py @ cb409cfd

History | View | Annotate | Download (7 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import traceback
34
import json
35
import logging
36
import sys
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend
40

    
41
_logger = logging.getLogger("synnefo.dispatcher")
42

    
43
def update_db(message):
44
    """Process the status of a VM based on a ganeti status message"""
45
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
46
    try:
47
        msg = json.loads(message.body)
48

    
49
        if msg["type"] != "ganeti-op-status":
50
            _logger.error("Message is of unknown type %s.", msg["type"])
51
            return
52

    
53
        vmid = utils.id_from_instance_name(msg["instance"])
54
        vm = VirtualMachine.objects.get(id=vmid)
55

    
56
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
57
                                  msg["status"], msg["logmsg"])
58
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
59
                      msg["instance"])
60
        message.channel.basic_ack(message.delivery_tag)
61
    except KeyError:
62
        _logger.error("Malformed incoming JSON, missing attributes: %s",
63
                      message.body)
64
    except VirtualMachine.InvalidBackendIdError:
65
        _logger.debug("Ignoring msg for unknown instance %s.",
66
                      msg["instance"])
67
    except VirtualMachine.DoesNotExist:
68
        _logger.error("VM for instance %s with id %d not found in DB.",
69
                      msg["instance"], vmid)
70
    except Exception as e:
71
        _logger.error("Unexpected error:\n%s" %
72
            "".join(traceback.format_exception(*sys.exc_info())))
73

    
74

    
75
def update_net(message):
76
    """Process a network status update notification from Ganeti"""
77
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
78
    try:
79
        msg = json.loads(message.body)
80

    
81
        if msg["type"] != "ganeti-net-status":
82
            _logger.error("Message is of unknown type %s", msg["type"])
83
            return
84

    
85
        vmid = utils.id_from_instance_name(msg["instance"])
86
        vm = VirtualMachine.objects.get(id=vmid)
87

    
88
        backend.process_net_status(vm, msg["nics"])
89
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
90
                      msg["instance"])
91
        message.channel.basic_ack(message.delivery_tag)
92
    except KeyError:
93
        _logger.error("Malformed incoming JSON, missing attributes: %s",
94
                      message.body)
95
    except VirtualMachine.InvalidBackendIdError:
96
        _logger.debug("Ignoring msg for unknown instance %s.",
97
                      msg["instance"])
98
    except VirtualMachine.DoesNotExist:
99
        _logger.error("VM for instance %s with id %d not found in DB.",
100
                      msg["instance"], vmid)
101
    except Exception as e:
102
        _logger.error("Unexpected error:\n%s" %
103
            "".join(traceback.format_exception(*sys.exc_info())))
104

    
105

    
106
def send_email(message):
107
    _logger.debug("Request to send email message")
108
    message.channel.basic_ack(message.delivery_tag)
109

    
110

    
111
def update_credits(message):
112
    _logger.debug("Request to update credits")
113
    message.channel.basic_ack(message.delivery_tag)
114

    
115
def trigger_status_update(message):
116
    _logger.debug("Request to trigger status update:", message.body)
117

    
118
    try:
119
        msg = json.loads(message.body)
120

    
121
        if msg["type"] != "reconciliate" :
122
             _logger.error("Message is of unknown type %s", msg["type"])
123
             return
124

    
125
        if msg["vm-id"] == "" :
126
            _logger.error("Message does not specify a VM id")
127
            return
128

    
129
        vm = VirtualMachine.objects.get(id=msg["vm-id"])
130
        backend.request_status_update(vm)
131

    
132
        message.channel.basic_ack(message.delivery_tag)
133
    except KeyError:
134
        _logger.error("Malformed incoming JSON, missing attributes: %s",
135
                      message.body)
136
    except Exception as e:
137
        _logger.error("Unexpected error:\n%s" %
138
                      "".join(traceback.format_exception(*sys.exc_info())))
139

    
140
def status_job_finished (message) :
141
    _logger.debug("Job status message received:", message.body)
142
    try:
143

    
144
        msg = message.body;
145

    
146
        if msg['operation'] != u'OP_INSTANCE_QUERY_DATA':
147
            _logger.error("Message is of unknown type %s", msg["operation"])
148
            return
149

    
150
        if msg["status"] != "success" :
151
            _logger.error("Status job %d for %s did not finish properly",
152
                          msg['jobId'], msg['instance'])
153
            return
154

    
155
        status = backend.get_job_status(msg['jobid'])
156

    
157
        if status["summary"] != "INSTANCE_QUERY_DATA" or type(status["opresult"]) is not list:
158
             _logger.error("Status is of unknown type %s", msg["summary"])
159
             return
160

    
161
        req_state = status['opresult'][msg['instance']]['config_state']
162
        run_state = status['opresult'][msg['instance']]['run_state']
163
        vm = VirtualMachine.objects.get(name=msg['instance'])
164
        backend.update_status(vm, run_state)
165
        
166
        message.channel.basic_ack(message.delivery_tag)
167
    except KeyError:
168
        _logger.error("Malformed incoming JSON, missing attributes: %s",
169
                      message.body)
170
    except Exception as e:
171
        _logger.error("Unexpected error:\n%s" %
172
                      "".join(traceback.format_exception(*sys.exc_info())))
173

    
174
def dummy_proc(message):
175
    try:
176
        msg = json.loads(message.body)
177
        _logger.debug("Msg (exchange:%s) ", msg)
178
        message.channel.basic_ack(message.delivery_tag)
179
    except Exception as e:
180
        _logger.error("Could not receive message %s" % e.message)
181
        pass