Statistics
| Branch: | Tag: | Revision:

root / logic / callbacks.py @ 9cb903f9

History | View | Annotate | Download (8.2 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import socket
34
import traceback
35
import json
36
import logging
37
import sys
38

    
39
from synnefo.db.models import VirtualMachine
40
from synnefo.logic import utils, backend, email_send
41

    
42
_logger = logging.getLogger("synnefo.dispatcher")
43

    
44
def update_db(message):
45
    """Process the status of a VM based on a ganeti status message"""
46
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
47
    try:
48
        msg = _parse_json(message.body)
49

    
50
        if msg["type"] != "ganeti-op-status":
51
            _logger.error("Message is of unknown type %s.", msg["type"])
52
            return
53

    
54
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
55
            return status_job_finished(message)
56

    
57
        vmid = utils.id_from_instance_name(msg["instance"])
58
        vm = VirtualMachine.objects.get(id=vmid)
59

    
60
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
61
                                  msg["status"], msg["logmsg"])
62
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
63
                      msg["instance"])
64
        message.channel.basic_ack(message.delivery_tag)
65
    except KeyError:
66
        _logger.error("Malformed incoming JSON, missing attributes: %s",
67
                      message.body)
68
    except VirtualMachine.InvalidBackendIdError:
69
        _logger.debug("Ignoring msg for unknown instance %s.",
70
                      msg["instance"])
71
    except VirtualMachine.DoesNotExist:
72
        _logger.error("VM for instance %s with id %d not found in DB.",
73
                      msg["instance"], vmid)
74
    except Exception as e:
75
        _logger.error("Unexpected error:\n%s" %
76
            "".join(traceback.format_exception(*sys.exc_info())))
77

    
78

    
79
def update_net(message):
80
    """Process a network status update notification from Ganeti"""
81
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
82
    try:
83
        msg = _parse_json(message.body)
84

    
85
        if msg["type"] != "ganeti-net-status":
86
            _logger.error("Message is of unknown type %s", msg["type"])
87
            return
88

    
89
        vmid = utils.id_from_instance_name(msg["instance"])
90
        vm = VirtualMachine.objects.get(id=vmid)
91

    
92
        backend.process_net_status(vm, msg["nics"])
93
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
94
                      msg["instance"])
95
        message.channel.basic_ack(message.delivery_tag)
96
    except KeyError:
97
        _logger.error("Malformed incoming JSON, missing attributes: %s",
98
                      message.body)
99
    except VirtualMachine.InvalidBackendIdError:
100
        _logger.debug("Ignoring msg for unknown instance %s.",
101
                      msg["instance"])
102
    except VirtualMachine.DoesNotExist:
103
        _logger.error("VM for instance %s with id %d not found in DB.",
104
                      msg["instance"], vmid)
105
    except Exception as e:
106
        _logger.error("Unexpected error:\n%s" %
107
            "".join(traceback.format_exception(*sys.exc_info())))
108

    
109

    
110
def send_email(message):
111
    """Process an email submission request"""
112

    
113
    try:
114
        msg = json.loads(message.body)
115

    
116
        email_send.send(sender=msg['frm'], recipient = msg['to'],
117
                        body=msg['body'], subject=msg['subject'])
118
        message.channel.basic_ack(message.delivery_tag)
119
    except KeyError:
120
        _logger.error("Malformed incoming JSON, missing attributes: %s",
121
                      message.body)
122
    except socket.error as e:
123
        _logger.error("Cannot connect to SMTP server:%s\n", e)
124
    except Exception as e:
125
        _logger.error("Unexpected error:%s\n", e)
126
        raise
127

    
128

    
129
def update_credits(message):
130
    _logger.debug("Request to update credits")
131
    message.channel.basic_ack(message.delivery_tag)
132

    
133
def trigger_status_update(message):
134
    """
135
        Triggers a status update job for a specific VM id.
136
    """
137
    _logger.debug("Request to trigger status update: %s", message.body)
138

    
139
    try:
140
        msg = _parse_json(message.body)
141

    
142
        if msg["type"] != "reconcile" :
143
             _logger.error("Message is of unknown type %s", msg["type"])
144
             return
145

    
146
        if msg["vmid"] == "" :
147
            _logger.error("Reconciliate message does not specify a VM id")
148
            return
149

    
150
        vm = VirtualMachine.objects.get(id=msg["vmid"])
151
        backend.request_status_update(vm)
152

    
153
        message.channel.basic_ack(message.delivery_tag)
154
    except KeyError as k:
155
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
156
    except Exception as e:
157
        _logger.error("Unexpected error:%s", e)
158

    
159
def status_job_finished (message) :
160
    """
161
        Updates VM status based on a previously sent status update request
162
    """
163
    try:
164
        msg = _parse_json(message.body)
165

    
166
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
167
            _logger.error("Message is of unknown type %s", msg["operation"])
168
            return
169

    
170
        if msg["status"] != "success" :
171
            _logger.warn("Ignoring non-success status update from job %d on VM %s",
172
                          msg['jobId'], msg['instance'])
173
            message.channel.basic_ack(message.delivery_tag)
174
            return
175

    
176
        status = backend.get_job_status(msg['jobId'])
177

    
178
        _logger.debug("Node status job result: %s" % status)
179

    
180
        if status['summary'][0] != u'INSTANCE_QUERY_DATA' :
181
             _logger.error("Status update is of unknown type %s", status['summary'])
182
             return
183

    
184
        conf_state = status['opresult'][0][msg['instance']]['config_state']
185
        run_state = status['opresult'][0][msg['instance']]['run_state']
186

    
187
        # XXX: The following assumes names like snf-12
188
        instid = msg['instance'].split('-')[1]
189

    
190
        vm = VirtualMachine.objects.get(id = instid)
191

    
192
        if run_state == "up":
193
            opcode = "OP_INSTANCE_REBOOT"
194
        else :
195
            opcode = "OP_INSTANCE_SHUTDOWN"
196

    
197
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
198
                                  status="success",
199
                                  logmsg="Reconciliation: simulated event")
200

    
201
        message.channel.basic_ack(message.delivery_tag)
202
    except KeyError as k:
203
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
204
    except Exception as e:
205
        _logger.error("Unexpected error:%s"%e)
206

    
207
def dummy_proc(message):
208
    try:
209
        msg = _logger.debug(message.body)
210
        _logger.debug("Msg (exchange:%s) ", msg)
211
        message.channel.basic_ack(message.delivery_tag)
212
    except Exception as e:
213
        _logger.error("Could not receive message %s" % e.message)
214
        pass
215

    
216
def _parse_json(data):
217
    try:
218
        return json.loads(data)
219
    except Exception as e:
220
        _logger.error("Could not parse JSON file: %s", e)
221
        raise