Statistics
| Branch: | Tag: | Revision:

root / logic / callbacks.py @ 9544c82f

History | View | Annotate | Download (8.3 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 35079ab2 Georgios Gousios
import socket
34 23c84263 Georgios Gousios
import traceback
35 23c84263 Georgios Gousios
import json
36 23c84263 Georgios Gousios
import sys
37 23c84263 Georgios Gousios
38 23c84263 Georgios Gousios
from synnefo.db.models import VirtualMachine
39 c718de42 Georgios Gousios
from synnefo.logic import utils, backend, email_send, log
40 23c84263 Georgios Gousios
41 c718de42 Georgios Gousios
_logger = log.get_logger("synnefo.dispatcher")
42 23c84263 Georgios Gousios
43 23c84263 Georgios Gousios
def update_db(message):
44 76343e61 Georgios Gousios
    """Process the status of a VM based on a ganeti status message"""
45 ad2d6807 Vangelis Koukis
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
46 23c84263 Georgios Gousios
    try:
47 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
48 23c84263 Georgios Gousios
49 23c84263 Georgios Gousios
        if msg["type"] != "ganeti-op-status":
50 ad2d6807 Vangelis Koukis
            _logger.error("Message is of unknown type %s.", msg["type"])
51 23c84263 Georgios Gousios
            return
52 23c84263 Georgios Gousios
53 0d0aa87d Georgios Gousios
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
54 0d0aa87d Georgios Gousios
            return status_job_finished(message)
55 0d0aa87d Georgios Gousios
56 23c84263 Georgios Gousios
        vmid = utils.id_from_instance_name(msg["instance"])
57 23c84263 Georgios Gousios
        vm = VirtualMachine.objects.get(id=vmid)
58 23c84263 Georgios Gousios
59 ad2d6807 Vangelis Koukis
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
60 ad2d6807 Vangelis Koukis
                                  msg["status"], msg["logmsg"])
61 ad2d6807 Vangelis Koukis
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
62 ad2d6807 Vangelis Koukis
                      msg["instance"])
63 23c84263 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
64 23c84263 Georgios Gousios
    except KeyError:
65 ad2d6807 Vangelis Koukis
        _logger.error("Malformed incoming JSON, missing attributes: %s",
66 ad2d6807 Vangelis Koukis
                      message.body)
67 23c84263 Georgios Gousios
    except VirtualMachine.InvalidBackendIdError:
68 ad2d6807 Vangelis Koukis
        _logger.debug("Ignoring msg for unknown instance %s.",
69 ad2d6807 Vangelis Koukis
                      msg["instance"])
70 23c84263 Georgios Gousios
    except VirtualMachine.DoesNotExist:
71 ad2d6807 Vangelis Koukis
        _logger.error("VM for instance %s with id %d not found in DB.",
72 ad2d6807 Vangelis Koukis
                      msg["instance"], vmid)
73 23c84263 Georgios Gousios
    except Exception as e:
74 ad2d6807 Vangelis Koukis
        _logger.error("Unexpected error:\n%s" %
75 ad2d6807 Vangelis Koukis
            "".join(traceback.format_exception(*sys.exc_info())))
76 23c84263 Georgios Gousios
77 7ca9e930 Vangelis Koukis
78 7ca9e930 Vangelis Koukis
def update_net(message):
79 7ca9e930 Vangelis Koukis
    """Process a network status update notification from Ganeti"""
80 ad2d6807 Vangelis Koukis
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
81 ad2d6807 Vangelis Koukis
    try:
82 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
83 ad2d6807 Vangelis Koukis
84 ad2d6807 Vangelis Koukis
        if msg["type"] != "ganeti-net-status":
85 ad2d6807 Vangelis Koukis
            _logger.error("Message is of unknown type %s", msg["type"])
86 ad2d6807 Vangelis Koukis
            return
87 ad2d6807 Vangelis Koukis
88 ad2d6807 Vangelis Koukis
        vmid = utils.id_from_instance_name(msg["instance"])
89 ad2d6807 Vangelis Koukis
        vm = VirtualMachine.objects.get(id=vmid)
90 ad2d6807 Vangelis Koukis
91 ad2d6807 Vangelis Koukis
        backend.process_net_status(vm, msg["nics"])
92 ad2d6807 Vangelis Koukis
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
93 ad2d6807 Vangelis Koukis
                      msg["instance"])
94 ad2d6807 Vangelis Koukis
        message.channel.basic_ack(message.delivery_tag)
95 ad2d6807 Vangelis Koukis
    except KeyError:
96 ad2d6807 Vangelis Koukis
        _logger.error("Malformed incoming JSON, missing attributes: %s",
97 ad2d6807 Vangelis Koukis
                      message.body)
98 ad2d6807 Vangelis Koukis
    except VirtualMachine.InvalidBackendIdError:
99 ad2d6807 Vangelis Koukis
        _logger.debug("Ignoring msg for unknown instance %s.",
100 ad2d6807 Vangelis Koukis
                      msg["instance"])
101 ad2d6807 Vangelis Koukis
    except VirtualMachine.DoesNotExist:
102 ad2d6807 Vangelis Koukis
        _logger.error("VM for instance %s with id %d not found in DB.",
103 ad2d6807 Vangelis Koukis
                      msg["instance"], vmid)
104 ad2d6807 Vangelis Koukis
    except Exception as e:
105 ad2d6807 Vangelis Koukis
        _logger.error("Unexpected error:\n%s" %
106 ad2d6807 Vangelis Koukis
            "".join(traceback.format_exception(*sys.exc_info())))
107 7ca9e930 Vangelis Koukis
108 7ca9e930 Vangelis Koukis
109 23c84263 Georgios Gousios
def send_email(message):
110 f8fd9e7b Georgios Gousios
    """Process an email submission request"""
111 f8fd9e7b Georgios Gousios
112 f8fd9e7b Georgios Gousios
    try:
113 f8fd9e7b Georgios Gousios
        msg = json.loads(message.body)
114 f8fd9e7b Georgios Gousios
115 583bfaa0 Georgios Gousios
        sent = email_send.send(sender=msg['frm'], recipient = msg['to'],
116 44837d17 Georgios Gousios
                        body=msg['body'], subject=msg['subject'])
117 583bfaa0 Georgios Gousios
118 583bfaa0 Georgios Gousios
        if not sent:
119 583bfaa0 Georgios Gousios
            _logger.warn("Failed to send email to %s", msg['to'])
120 583bfaa0 Georgios Gousios
        else:
121 583bfaa0 Georgios Gousios
            message.channel.basic_ack(message.delivery_tag)
122 f8fd9e7b Georgios Gousios
    except KeyError:
123 f8fd9e7b Georgios Gousios
        _logger.error("Malformed incoming JSON, missing attributes: %s",
124 f8fd9e7b Georgios Gousios
                      message.body)
125 35079ab2 Georgios Gousios
    except socket.error as e:
126 35079ab2 Georgios Gousios
        _logger.error("Cannot connect to SMTP server:%s\n", e)
127 f8fd9e7b Georgios Gousios
    except Exception as e:
128 35079ab2 Georgios Gousios
        _logger.error("Unexpected error:%s\n", e)
129 35079ab2 Georgios Gousios
        raise
130 23c84263 Georgios Gousios
131 23c84263 Georgios Gousios
132 23c84263 Georgios Gousios
def update_credits(message):
133 23c84263 Georgios Gousios
    _logger.debug("Request to update credits")
134 23c84263 Georgios Gousios
    message.channel.basic_ack(message.delivery_tag)
135 23c84263 Georgios Gousios
136 604b2bf8 Georgios Gousios
def trigger_status_update(message):
137 882c0c1f Georgios Gousios
    """
138 882c0c1f Georgios Gousios
        Triggers a status update job for a specific VM id.
139 882c0c1f Georgios Gousios
    """
140 0d0aa87d Georgios Gousios
    _logger.debug("Request to trigger status update: %s", message.body)
141 604b2bf8 Georgios Gousios
142 604b2bf8 Georgios Gousios
    try:
143 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
144 604b2bf8 Georgios Gousios
145 c4367587 Vangelis Koukis
        if msg["type"] != "reconcile":
146 604b2bf8 Georgios Gousios
             _logger.error("Message is of unknown type %s", msg["type"])
147 604b2bf8 Georgios Gousios
             return
148 604b2bf8 Georgios Gousios
149 c4367587 Vangelis Koukis
        if msg["vmid"] == "":
150 0d0aa87d Georgios Gousios
            _logger.error("Reconciliate message does not specify a VM id")
151 604b2bf8 Georgios Gousios
            return
152 604b2bf8 Georgios Gousios
153 0d0aa87d Georgios Gousios
        vm = VirtualMachine.objects.get(id=msg["vmid"])
154 604b2bf8 Georgios Gousios
        backend.request_status_update(vm)
155 604b2bf8 Georgios Gousios
156 604b2bf8 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
157 0d0aa87d Georgios Gousios
    except KeyError as k:
158 0d0aa87d Georgios Gousios
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
159 604b2bf8 Georgios Gousios
    except Exception as e:
160 0d0aa87d Georgios Gousios
        _logger.error("Unexpected error:%s", e)
161 604b2bf8 Georgios Gousios
162 c4367587 Vangelis Koukis
def status_job_finished (message):
163 882c0c1f Georgios Gousios
    """
164 882c0c1f Georgios Gousios
        Updates VM status based on a previously sent status update request
165 882c0c1f Georgios Gousios
    """
166 604b2bf8 Georgios Gousios
    try:
167 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
168 604b2bf8 Georgios Gousios
169 0d0aa87d Georgios Gousios
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
170 604b2bf8 Georgios Gousios
            _logger.error("Message is of unknown type %s", msg["operation"])
171 604b2bf8 Georgios Gousios
            return
172 604b2bf8 Georgios Gousios
173 c4367587 Vangelis Koukis
        if msg["status"] != "success":
174 0d0aa87d Georgios Gousios
            _logger.warn("Ignoring non-success status update from job %d on VM %s",
175 604b2bf8 Georgios Gousios
                          msg['jobId'], msg['instance'])
176 2f355fb5 Georgios Gousios
            message.channel.basic_ack(message.delivery_tag)
177 604b2bf8 Georgios Gousios
            return
178 604b2bf8 Georgios Gousios
179 0d0aa87d Georgios Gousios
        status = backend.get_job_status(msg['jobId'])
180 604b2bf8 Georgios Gousios
181 0d0aa87d Georgios Gousios
        _logger.debug("Node status job result: %s" % status)
182 0d0aa87d Georgios Gousios
183 c4367587 Vangelis Koukis
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
184 2f355fb5 Georgios Gousios
             _logger.error("Status update is of unknown type %s", status['summary'])
185 604b2bf8 Georgios Gousios
             return
186 604b2bf8 Georgios Gousios
187 2f355fb5 Georgios Gousios
        conf_state = status['opresult'][0][msg['instance']]['config_state']
188 2f355fb5 Georgios Gousios
        run_state = status['opresult'][0][msg['instance']]['run_state']
189 2f355fb5 Georgios Gousios
190 2f355fb5 Georgios Gousios
        # XXX: The following assumes names like snf-12
191 2f355fb5 Georgios Gousios
        instid = msg['instance'].split('-')[1]
192 2f355fb5 Georgios Gousios
193 2f355fb5 Georgios Gousios
        vm = VirtualMachine.objects.get(id = instid)
194 2f355fb5 Georgios Gousios
195 2f355fb5 Georgios Gousios
        if run_state == "up":
196 2f355fb5 Georgios Gousios
            opcode = "OP_INSTANCE_REBOOT"
197 c4367587 Vangelis Koukis
        else:
198 2f355fb5 Georgios Gousios
            opcode = "OP_INSTANCE_SHUTDOWN"
199 2f355fb5 Georgios Gousios
200 2f355fb5 Georgios Gousios
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
201 2f355fb5 Georgios Gousios
                                  status="success",
202 2f355fb5 Georgios Gousios
                                  logmsg="Reconciliation: simulated event")
203 2f355fb5 Georgios Gousios
204 604b2bf8 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
205 0d0aa87d Georgios Gousios
    except KeyError as k:
206 0d0aa87d Georgios Gousios
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
207 604b2bf8 Georgios Gousios
    except Exception as e:
208 0d0aa87d Georgios Gousios
        _logger.error("Unexpected error:%s"%e)
209 23c84263 Georgios Gousios
210 23c84263 Georgios Gousios
def dummy_proc(message):
211 23c84263 Georgios Gousios
    try:
212 9b4a18b9 Georgios Gousios
        msg = _logger.debug(message.body)
213 7ca9e930 Vangelis Koukis
        _logger.debug("Msg (exchange:%s) ", msg)
214 23c84263 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
215 23c84263 Georgios Gousios
    except Exception as e:
216 23c84263 Georgios Gousios
        _logger.error("Could not receive message %s" % e.message)
217 23c84263 Georgios Gousios
        pass
218 9b4a18b9 Georgios Gousios
219 9b4a18b9 Georgios Gousios
def _parse_json(data):
220 9b4a18b9 Georgios Gousios
    try:
221 9b4a18b9 Georgios Gousios
        return json.loads(data)
222 9b4a18b9 Georgios Gousios
    except Exception as e:
223 9b4a18b9 Georgios Gousios
        _logger.error("Could not parse JSON file: %s", e)
224 9b4a18b9 Georgios Gousios
        raise