Statistics
| Branch: | Tag: | Revision:

root / logic / callbacks.py @ 95aee02c

History | View | Annotate | Download (8.2 kB)

1 cb409cfd Georgios Gousios
# Copyright 2011 GRNET S.A. All rights reserved.
2 ad2d6807 Vangelis Koukis
#
3 cb409cfd Georgios Gousios
# Redistribution and use in source and binary forms, with or without
4 cb409cfd Georgios Gousios
# modification, are permitted provided that the following conditions
5 cb409cfd Georgios Gousios
# are met:
6 ad2d6807 Vangelis Koukis
#
7 cb409cfd Georgios Gousios
#   1. Redistributions of source code must retain the above copyright
8 cb409cfd Georgios Gousios
#      notice, this list of conditions and the following disclaimer.
9 ad2d6807 Vangelis Koukis
#
10 cb409cfd Georgios Gousios
#  2. Redistributions in binary form must reproduce the above copyright
11 cb409cfd Georgios Gousios
#     notice, this list of conditions and the following disclaimer in the
12 cb409cfd Georgios Gousios
#     documentation and/or other materials provided with the distribution.
13 cb409cfd Georgios Gousios
#
14 cb409cfd Georgios Gousios
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15 cb409cfd Georgios Gousios
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 cb409cfd Georgios Gousios
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 cb409cfd Georgios Gousios
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18 cb409cfd Georgios Gousios
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 cb409cfd Georgios Gousios
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 cb409cfd Georgios Gousios
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 cb409cfd Georgios Gousios
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 cb409cfd Georgios Gousios
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 cb409cfd Georgios Gousios
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 cb409cfd Georgios Gousios
# SUCH DAMAGE.
25 cb409cfd Georgios Gousios
#
26 cb409cfd Georgios Gousios
# The views and conclusions contained in the software and documentation are
27 cb409cfd Georgios Gousios
# those of the authors and should not be interpreted as representing official
28 cb409cfd Georgios Gousios
# policies, either expressed or implied, of GRNET S.A.
29 cb409cfd Georgios Gousios
30 cb409cfd Georgios Gousios
# Callback functions used by the dispatcher to process incoming notifications
31 cb409cfd Georgios Gousios
# from AMQP queues.
32 cb409cfd Georgios Gousios
33 35079ab2 Georgios Gousios
import socket
34 23c84263 Georgios Gousios
import traceback
35 23c84263 Georgios Gousios
import json
36 23c84263 Georgios Gousios
import sys
37 23c84263 Georgios Gousios
38 23c84263 Georgios Gousios
from synnefo.db.models import VirtualMachine
39 c718de42 Georgios Gousios
from synnefo.logic import utils, backend, email_send, log
40 23c84263 Georgios Gousios
41 c718de42 Georgios Gousios
_logger = log.get_logger("synnefo.dispatcher")
42 23c84263 Georgios Gousios
43 95aee02c Vangelis Koukis
44 23c84263 Georgios Gousios
def update_db(message):
45 76343e61 Georgios Gousios
    """Process the status of a VM based on a ganeti status message"""
46 ad2d6807 Vangelis Koukis
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
47 23c84263 Georgios Gousios
    try:
48 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
49 23c84263 Georgios Gousios
50 23c84263 Georgios Gousios
        if msg["type"] != "ganeti-op-status":
51 ad2d6807 Vangelis Koukis
            _logger.error("Message is of unknown type %s.", msg["type"])
52 23c84263 Georgios Gousios
            return
53 23c84263 Georgios Gousios
54 0d0aa87d Georgios Gousios
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
55 0d0aa87d Georgios Gousios
            return status_job_finished(message)
56 0d0aa87d Georgios Gousios
57 23c84263 Georgios Gousios
        vmid = utils.id_from_instance_name(msg["instance"])
58 23c84263 Georgios Gousios
        vm = VirtualMachine.objects.get(id=vmid)
59 23c84263 Georgios Gousios
60 ad2d6807 Vangelis Koukis
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
61 ad2d6807 Vangelis Koukis
                                  msg["status"], msg["logmsg"])
62 ad2d6807 Vangelis Koukis
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
63 ad2d6807 Vangelis Koukis
                      msg["instance"])
64 23c84263 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
65 23c84263 Georgios Gousios
    except KeyError:
66 ad2d6807 Vangelis Koukis
        _logger.error("Malformed incoming JSON, missing attributes: %s",
67 ad2d6807 Vangelis Koukis
                      message.body)
68 23c84263 Georgios Gousios
    except VirtualMachine.InvalidBackendIdError:
69 ad2d6807 Vangelis Koukis
        _logger.debug("Ignoring msg for unknown instance %s.",
70 ad2d6807 Vangelis Koukis
                      msg["instance"])
71 95aee02c Vangelis Koukis
    except VirtualMachine.InvalidBackendMsgError, e:
72 95aee02c Vangelis Koukis
        _logger.debug("Ignoring msg of unknown type: %s.", e)
73 23c84263 Georgios Gousios
    except VirtualMachine.DoesNotExist:
74 ad2d6807 Vangelis Koukis
        _logger.error("VM for instance %s with id %d not found in DB.",
75 ad2d6807 Vangelis Koukis
                      msg["instance"], vmid)
76 23c84263 Georgios Gousios
    except Exception as e:
77 f2bdb9ab Georgios Gousios
        _logger.exception("Unexpected error")
78 23c84263 Georgios Gousios
79 7ca9e930 Vangelis Koukis
80 7ca9e930 Vangelis Koukis
def update_net(message):
81 7ca9e930 Vangelis Koukis
    """Process a network status update notification from Ganeti"""
82 ad2d6807 Vangelis Koukis
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
83 ad2d6807 Vangelis Koukis
    try:
84 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
85 ad2d6807 Vangelis Koukis
86 ad2d6807 Vangelis Koukis
        if msg["type"] != "ganeti-net-status":
87 ad2d6807 Vangelis Koukis
            _logger.error("Message is of unknown type %s", msg["type"])
88 ad2d6807 Vangelis Koukis
            return
89 ad2d6807 Vangelis Koukis
90 ad2d6807 Vangelis Koukis
        vmid = utils.id_from_instance_name(msg["instance"])
91 ad2d6807 Vangelis Koukis
        vm = VirtualMachine.objects.get(id=vmid)
92 ad2d6807 Vangelis Koukis
93 ad2d6807 Vangelis Koukis
        backend.process_net_status(vm, msg["nics"])
94 ad2d6807 Vangelis Koukis
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
95 ad2d6807 Vangelis Koukis
                      msg["instance"])
96 ad2d6807 Vangelis Koukis
        message.channel.basic_ack(message.delivery_tag)
97 ad2d6807 Vangelis Koukis
    except KeyError:
98 ad2d6807 Vangelis Koukis
        _logger.error("Malformed incoming JSON, missing attributes: %s",
99 ad2d6807 Vangelis Koukis
                      message.body)
100 ad2d6807 Vangelis Koukis
    except VirtualMachine.InvalidBackendIdError:
101 ad2d6807 Vangelis Koukis
        _logger.debug("Ignoring msg for unknown instance %s.",
102 ad2d6807 Vangelis Koukis
                      msg["instance"])
103 ad2d6807 Vangelis Koukis
    except VirtualMachine.DoesNotExist:
104 ad2d6807 Vangelis Koukis
        _logger.error("VM for instance %s with id %d not found in DB.",
105 ad2d6807 Vangelis Koukis
                      msg["instance"], vmid)
106 ad2d6807 Vangelis Koukis
    except Exception as e:
107 f2bdb9ab Georgios Gousios
        _logger.exception("Unexpected error")
108 7ca9e930 Vangelis Koukis
109 7ca9e930 Vangelis Koukis
110 23c84263 Georgios Gousios
def send_email(message):
111 f8fd9e7b Georgios Gousios
    """Process an email submission request"""
112 f8fd9e7b Georgios Gousios
113 f8fd9e7b Georgios Gousios
    try:
114 f8fd9e7b Georgios Gousios
        msg = json.loads(message.body)
115 f8fd9e7b Georgios Gousios
116 583bfaa0 Georgios Gousios
        sent = email_send.send(sender=msg['frm'], recipient = msg['to'],
117 44837d17 Georgios Gousios
                        body=msg['body'], subject=msg['subject'])
118 583bfaa0 Georgios Gousios
119 583bfaa0 Georgios Gousios
        if not sent:
120 583bfaa0 Georgios Gousios
            _logger.warn("Failed to send email to %s", msg['to'])
121 583bfaa0 Georgios Gousios
        else:
122 583bfaa0 Georgios Gousios
            message.channel.basic_ack(message.delivery_tag)
123 f8fd9e7b Georgios Gousios
    except KeyError:
124 f8fd9e7b Georgios Gousios
        _logger.error("Malformed incoming JSON, missing attributes: %s",
125 f8fd9e7b Georgios Gousios
                      message.body)
126 35079ab2 Georgios Gousios
    except socket.error as e:
127 35079ab2 Georgios Gousios
        _logger.error("Cannot connect to SMTP server:%s\n", e)
128 f8fd9e7b Georgios Gousios
    except Exception as e:
129 f2bdb9ab Georgios Gousios
        _logger.exception("Unexpected error")
130 35079ab2 Georgios Gousios
        raise
131 23c84263 Georgios Gousios
132 23c84263 Georgios Gousios
133 23c84263 Georgios Gousios
def update_credits(message):
134 23c84263 Georgios Gousios
    _logger.debug("Request to update credits")
135 23c84263 Georgios Gousios
    message.channel.basic_ack(message.delivery_tag)
136 23c84263 Georgios Gousios
137 604b2bf8 Georgios Gousios
def trigger_status_update(message):
138 882c0c1f Georgios Gousios
    """
139 882c0c1f Georgios Gousios
        Triggers a status update job for a specific VM id.
140 882c0c1f Georgios Gousios
    """
141 0d0aa87d Georgios Gousios
    _logger.debug("Request to trigger status update: %s", message.body)
142 604b2bf8 Georgios Gousios
143 604b2bf8 Georgios Gousios
    try:
144 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
145 604b2bf8 Georgios Gousios
146 c4367587 Vangelis Koukis
        if msg["type"] != "reconcile":
147 604b2bf8 Georgios Gousios
             _logger.error("Message is of unknown type %s", msg["type"])
148 604b2bf8 Georgios Gousios
             return
149 604b2bf8 Georgios Gousios
150 c4367587 Vangelis Koukis
        if msg["vmid"] == "":
151 95aee02c Vangelis Koukis
            _logger.error("Reconciliation message does not specify a VM id")
152 604b2bf8 Georgios Gousios
            return
153 604b2bf8 Georgios Gousios
154 0d0aa87d Georgios Gousios
        vm = VirtualMachine.objects.get(id=msg["vmid"])
155 604b2bf8 Georgios Gousios
        backend.request_status_update(vm)
156 604b2bf8 Georgios Gousios
157 604b2bf8 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
158 0d0aa87d Georgios Gousios
    except KeyError as k:
159 0d0aa87d Georgios Gousios
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
160 604b2bf8 Georgios Gousios
    except Exception as e:
161 f2bdb9ab Georgios Gousios
        _logger.exception("Unexpected error")
162 604b2bf8 Georgios Gousios
163 c4367587 Vangelis Koukis
def status_job_finished (message):
164 882c0c1f Georgios Gousios
    """
165 882c0c1f Georgios Gousios
        Updates VM status based on a previously sent status update request
166 882c0c1f Georgios Gousios
    """
167 604b2bf8 Georgios Gousios
    try:
168 9b4a18b9 Georgios Gousios
        msg = _parse_json(message.body)
169 604b2bf8 Georgios Gousios
170 0d0aa87d Georgios Gousios
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
171 604b2bf8 Georgios Gousios
            _logger.error("Message is of unknown type %s", msg["operation"])
172 604b2bf8 Georgios Gousios
            return
173 604b2bf8 Georgios Gousios
174 c4367587 Vangelis Koukis
        if msg["status"] != "success":
175 0d0aa87d Georgios Gousios
            _logger.warn("Ignoring non-success status update from job %d on VM %s",
176 604b2bf8 Georgios Gousios
                          msg['jobId'], msg['instance'])
177 2f355fb5 Georgios Gousios
            message.channel.basic_ack(message.delivery_tag)
178 604b2bf8 Georgios Gousios
            return
179 604b2bf8 Georgios Gousios
180 0d0aa87d Georgios Gousios
        status = backend.get_job_status(msg['jobId'])
181 604b2bf8 Georgios Gousios
182 0d0aa87d Georgios Gousios
        _logger.debug("Node status job result: %s" % status)
183 0d0aa87d Georgios Gousios
184 c4367587 Vangelis Koukis
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
185 2f355fb5 Georgios Gousios
             _logger.error("Status update is of unknown type %s", status['summary'])
186 604b2bf8 Georgios Gousios
             return
187 604b2bf8 Georgios Gousios
188 2f355fb5 Georgios Gousios
        conf_state = status['opresult'][0][msg['instance']]['config_state']
189 2f355fb5 Georgios Gousios
        run_state = status['opresult'][0][msg['instance']]['run_state']
190 2f355fb5 Georgios Gousios
191 2f355fb5 Georgios Gousios
        # XXX: The following assumes names like snf-12
192 2f355fb5 Georgios Gousios
        instid = msg['instance'].split('-')[1]
193 2f355fb5 Georgios Gousios
194 2f355fb5 Georgios Gousios
        vm = VirtualMachine.objects.get(id = instid)
195 2f355fb5 Georgios Gousios
196 2f355fb5 Georgios Gousios
        if run_state == "up":
197 2f355fb5 Georgios Gousios
            opcode = "OP_INSTANCE_REBOOT"
198 c4367587 Vangelis Koukis
        else:
199 2f355fb5 Georgios Gousios
            opcode = "OP_INSTANCE_SHUTDOWN"
200 2f355fb5 Georgios Gousios
201 2f355fb5 Georgios Gousios
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
202 2f355fb5 Georgios Gousios
                                  status="success",
203 2f355fb5 Georgios Gousios
                                  logmsg="Reconciliation: simulated event")
204 2f355fb5 Georgios Gousios
205 604b2bf8 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
206 0d0aa87d Georgios Gousios
    except KeyError as k:
207 0d0aa87d Georgios Gousios
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
208 604b2bf8 Georgios Gousios
    except Exception as e:
209 f2bdb9ab Georgios Gousios
        _logger.exception("Unexpected error")
210 23c84263 Georgios Gousios
211 23c84263 Georgios Gousios
def dummy_proc(message):
212 23c84263 Georgios Gousios
    try:
213 9b4a18b9 Georgios Gousios
        msg = _logger.debug(message.body)
214 7ca9e930 Vangelis Koukis
        _logger.debug("Msg (exchange:%s) ", msg)
215 23c84263 Georgios Gousios
        message.channel.basic_ack(message.delivery_tag)
216 23c84263 Georgios Gousios
    except Exception as e:
217 23c84263 Georgios Gousios
        _logger.error("Could not receive message %s" % e.message)
218 23c84263 Georgios Gousios
        pass
219 9b4a18b9 Georgios Gousios
220 9b4a18b9 Georgios Gousios
def _parse_json(data):
221 9b4a18b9 Georgios Gousios
    try:
222 9b4a18b9 Georgios Gousios
        return json.loads(data)
223 9b4a18b9 Georgios Gousios
    except Exception as e:
224 9b4a18b9 Georgios Gousios
        _logger.error("Could not parse JSON file: %s", e)
225 9b4a18b9 Georgios Gousios
        raise