Statistics
| Branch: | Tag: | Revision:

root / logic / callbacks.py @ e6209aa2

History | View | Annotate | Download (8.1 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import socket
34
import traceback
35
import json
36
import sys
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend, email_send, log
40

    
41
_logger = log.get_logger("synnefo.dispatcher")
42

    
43
def update_db(message):
44
    """Process the status of a VM based on a ganeti status message"""
45
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
46
    try:
47
        msg = _parse_json(message.body)
48

    
49
        if msg["type"] != "ganeti-op-status":
50
            _logger.error("Message is of unknown type %s.", msg["type"])
51
            return
52

    
53
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
54
            return status_job_finished(message)
55

    
56
        vmid = utils.id_from_instance_name(msg["instance"])
57
        vm = VirtualMachine.objects.get(id=vmid)
58

    
59
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
60
                                  msg["status"], msg["logmsg"])
61
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
62
                      msg["instance"])
63
        message.channel.basic_ack(message.delivery_tag)
64
    except KeyError:
65
        _logger.error("Malformed incoming JSON, missing attributes: %s",
66
                      message.body)
67
    except VirtualMachine.InvalidBackendIdError:
68
        _logger.debug("Ignoring msg for unknown instance %s.",
69
                      msg["instance"])
70
    except VirtualMachine.DoesNotExist:
71
        _logger.error("VM for instance %s with id %d not found in DB.",
72
                      msg["instance"], vmid)
73
    except Exception as e:
74
        _logger.error("Unexpected error:\n%s" %
75
            "".join(traceback.format_exception(*sys.exc_info())))
76

    
77

    
78
def update_net(message):
79
    """Process a network status update notification from Ganeti"""
80
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
81
    try:
82
        msg = _parse_json(message.body)
83

    
84
        if msg["type"] != "ganeti-net-status":
85
            _logger.error("Message is of unknown type %s", msg["type"])
86
            return
87

    
88
        vmid = utils.id_from_instance_name(msg["instance"])
89
        vm = VirtualMachine.objects.get(id=vmid)
90

    
91
        backend.process_net_status(vm, msg["nics"])
92
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
93
                      msg["instance"])
94
        message.channel.basic_ack(message.delivery_tag)
95
    except KeyError:
96
        _logger.error("Malformed incoming JSON, missing attributes: %s",
97
                      message.body)
98
    except VirtualMachine.InvalidBackendIdError:
99
        _logger.debug("Ignoring msg for unknown instance %s.",
100
                      msg["instance"])
101
    except VirtualMachine.DoesNotExist:
102
        _logger.error("VM for instance %s with id %d not found in DB.",
103
                      msg["instance"], vmid)
104
    except Exception as e:
105
        _logger.error("Unexpected error:\n%s" %
106
            "".join(traceback.format_exception(*sys.exc_info())))
107

    
108

    
109
def send_email(message):
110
    """Process an email submission request"""
111

    
112
    try:
113
        msg = json.loads(message.body)
114

    
115
        email_send.send(sender=msg['frm'], recipient = msg['to'],
116
                        body=msg['body'], subject=msg['subject'])
117
        message.channel.basic_ack(message.delivery_tag)
118
    except KeyError:
119
        _logger.error("Malformed incoming JSON, missing attributes: %s",
120
                      message.body)
121
    except socket.error as e:
122
        _logger.error("Cannot connect to SMTP server:%s\n", e)
123
    except Exception as e:
124
        _logger.error("Unexpected error:%s\n", e)
125
        raise
126

    
127

    
128
def update_credits(message):
129
    _logger.debug("Request to update credits")
130
    message.channel.basic_ack(message.delivery_tag)
131

    
132
def trigger_status_update(message):
133
    """
134
        Triggers a status update job for a specific VM id.
135
    """
136
    _logger.debug("Request to trigger status update: %s", message.body)
137

    
138
    try:
139
        msg = _parse_json(message.body)
140

    
141
        if msg["type"] != "reconcile":
142
             _logger.error("Message is of unknown type %s", msg["type"])
143
             return
144

    
145
        if msg["vmid"] == "":
146
            _logger.error("Reconciliate message does not specify a VM id")
147
            return
148

    
149
        vm = VirtualMachine.objects.get(id=msg["vmid"])
150
        backend.request_status_update(vm)
151

    
152
        message.channel.basic_ack(message.delivery_tag)
153
    except KeyError as k:
154
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
155
    except Exception as e:
156
        _logger.error("Unexpected error:%s", e)
157

    
158
def status_job_finished (message):
159
    """
160
        Updates VM status based on a previously sent status update request
161
    """
162
    try:
163
        msg = _parse_json(message.body)
164

    
165
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
166
            _logger.error("Message is of unknown type %s", msg["operation"])
167
            return
168

    
169
        if msg["status"] != "success":
170
            _logger.warn("Ignoring non-success status update from job %d on VM %s",
171
                          msg['jobId'], msg['instance'])
172
            message.channel.basic_ack(message.delivery_tag)
173
            return
174

    
175
        status = backend.get_job_status(msg['jobId'])
176

    
177
        _logger.debug("Node status job result: %s" % status)
178

    
179
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
180
             _logger.error("Status update is of unknown type %s", status['summary'])
181
             return
182

    
183
        conf_state = status['opresult'][0][msg['instance']]['config_state']
184
        run_state = status['opresult'][0][msg['instance']]['run_state']
185

    
186
        # XXX: The following assumes names like snf-12
187
        instid = msg['instance'].split('-')[1]
188

    
189
        vm = VirtualMachine.objects.get(id = instid)
190

    
191
        if run_state == "up":
192
            opcode = "OP_INSTANCE_REBOOT"
193
        else:
194
            opcode = "OP_INSTANCE_SHUTDOWN"
195

    
196
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
197
                                  status="success",
198
                                  logmsg="Reconciliation: simulated event")
199

    
200
        message.channel.basic_ack(message.delivery_tag)
201
    except KeyError as k:
202
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
203
    except Exception as e:
204
        _logger.error("Unexpected error:%s"%e)
205

    
206
def dummy_proc(message):
207
    try:
208
        msg = _logger.debug(message.body)
209
        _logger.debug("Msg (exchange:%s) ", msg)
210
        message.channel.basic_ack(message.delivery_tag)
211
    except Exception as e:
212
        _logger.error("Could not receive message %s" % e.message)
213
        pass
214

    
215
def _parse_json(data):
216
    try:
217
        return json.loads(data)
218
    except Exception as e:
219
        _logger.error("Could not parse JSON file: %s", e)
220
        raise