Statistics
| Branch: | Tag: | Revision:

root / logic / callbacks.py @ c25cc9ec

History | View | Annotate | Download (8.8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import socket
34
import traceback
35
import json
36
import sys
37

    
38
from synnefo.db.models import VirtualMachine
39
from synnefo.logic import utils, backend, email_send, log
40

    
41
_logger = log.get_logger("synnefo.dispatcher")
42

    
43

    
44
def update_db(message):
45
    """Process a notification of type 'ganeti-op-status'"""
46
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
47
    try:
48
        msg = json.loads(message.body)
49

    
50
        if msg["type"] != "ganeti-op-status":
51
            _logger.error("Message is of unknown type %s.", msg["type"])
52
            return
53

    
54
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
55
            return status_job_finished(message)
56

    
57
        vmid = utils.id_from_instance_name(msg["instance"])
58
        vm = VirtualMachine.objects.get(id=vmid)
59

    
60
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
61
                                  msg["status"], msg["logmsg"])
62
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
63
                      msg["instance"])
64
        message.channel.basic_ack(message.delivery_tag)
65
    except KeyError:
66
        _logger.error("Malformed incoming JSON, missing attributes: %s",
67
                      message.body)
68
    except VirtualMachine.InvalidBackendIdError:
69
        _logger.debug("Ignoring msg for unknown instance %s.",
70
                      msg["instance"])
71
    except VirtualMachine.InvalidBackendMsgError, e:
72
        _logger.debug("Ignoring msg of unknown type: %s.", e)
73
    except VirtualMachine.DoesNotExist:
74
        _logger.error("VM for instance %s with id %d not found in DB.",
75
                      msg["instance"], vmid)
76
    except Exception as e:
77
        _logger.exception("Unexpected error")
78

    
79

    
80
def update_net(message):
81
    """Process a notification of type 'ganeti-net-status'"""
82
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
83
    try:
84
        msg = json.loads(message.body)
85

    
86
        if msg["type"] != "ganeti-net-status":
87
            _logger.error("Message is of unknown type %s", msg["type"])
88
            return
89

    
90
        vmid = utils.id_from_instance_name(msg["instance"])
91
        vm = VirtualMachine.objects.get(id=vmid)
92

    
93
        backend.process_net_status(vm, msg["nics"])
94
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
95
                      msg["instance"])
96
        message.channel.basic_ack(message.delivery_tag)
97
    except KeyError:
98
        _logger.error("Malformed incoming JSON, missing attributes: %s",
99
                      message.body)
100
    except VirtualMachine.InvalidBackendIdError:
101
        _logger.debug("Ignoring msg for unknown instance %s.",
102
                      msg["instance"])
103
    except VirtualMachine.DoesNotExist:
104
        _logger.error("VM for instance %s with id %d not found in DB.",
105
                      msg["instance"], vmid)
106
    except Exception as e:
107
        _logger.exception("Unexpected error")
108

    
109

    
110
def send_email(message):
111
    """Process an email submission request"""
112

    
113
    try:
114
        msg = json.loads(message.body)
115

    
116
        sent = email_send.send(sender=msg['frm'], recipient = msg['to'],
117
                        body=msg['body'], subject=msg['subject'])
118

    
119
        if not sent:
120
            _logger.warn("Failed to send email to %s", msg['to'])
121
        else:
122
            message.channel.basic_ack(message.delivery_tag)
123
    except KeyError:
124
        _logger.error("Malformed incoming JSON, missing attributes: %s",
125
                      message.body)
126
    except socket.error as e:
127
        _logger.error("Cannot connect to SMTP server:%s\n", e)
128
    except Exception as e:
129
        _logger.exception("Unexpected error")
130
        raise
131

    
132

    
133
def update_credits(message):
134
    _logger.debug("Request to update credits")
135
    message.channel.basic_ack(message.delivery_tag)
136

    
137

    
138
def update_build_progress(message):
139
    """Process a create progress message"""
140
    try:
141
        msg = json.loads(message.body)
142

    
143
        if msg['type'] != "ganeti-create-progess":
144
            _logger.error("Message is of unknown type %s", msg["type"])
145
            return
146

    
147
        # XXX: The following assumes names like snf-12
148
        vmid = msg['instance'].split('-')[1]
149
        vm = VirtualMachine.objects.get(id=vmid)
150

    
151
        backend.process_create_progress(vm, msg['rprogress'], msg['wprogress'])
152
        _logger.debug("Done processing ganeti-create-progress msg for vm %s.",
153
                      msg["instance"])
154
        message.channel.basic_ack(message.delivery_tag)
155
    except KeyError:
156
        _logger.error("Malformed incoming JSON, missing attributes: %s",
157
                      message.body)
158
    except Exception as e:
159
        _logger.exception("Unexpected error")
160
        raise
161

    
162

    
163
def trigger_status_update(message):
164
    """Triggers a status update job for a specific VM id"""
165
    _logger.debug("Request to trigger status update: %s", message.body)
166

    
167
    try:
168
        msg = json.loads(message.body)
169

    
170
        if msg["type"] != "reconcile":
171
             _logger.error("Message is of unknown type %s", msg["type"])
172
             return
173

    
174
        if msg["vmid"] == "":
175
            _logger.error("Reconciliation message does not specify a VM id")
176
            return
177

    
178
        vm = VirtualMachine.objects.get(id=msg["vmid"])
179
        backend.request_status_update(vm)
180

    
181
        message.channel.basic_ack(message.delivery_tag)
182
    except KeyError as k:
183
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
184
    except Exception as e:
185
        _logger.exception("Unexpected error")
186

    
187

    
188
def status_job_finished(message):
189
    """Updates VM status based on a previously sent status update request"""
190
    try:
191
        msg = json.loads(message.body)
192

    
193
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
194
            _logger.error("Message is of unknown type %s", msg["operation"])
195
            return
196

    
197
        if msg["status"] != "success":
198
            _logger.warn("Ignoring non-success status update from job %d on VM %s",
199
                          msg['jobId'], msg['instance'])
200
            message.channel.basic_ack(message.delivery_tag)
201
            return
202

    
203
        status = backend.get_job_status(msg['jobId'])
204

    
205
        _logger.debug("Node status job result: %s" % status)
206

    
207
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
208
             _logger.error("Status update is of unknown type %s", status['summary'])
209
             return
210

    
211
        conf_state = status['opresult'][0][msg['instance']]['config_state']
212
        run_state = status['opresult'][0][msg['instance']]['run_state']
213

    
214
        # XXX: The following assumes names like snf-12
215
        instid = msg['instance'].split('-')[1]
216

    
217
        vm = VirtualMachine.objects.get(id = instid)
218

    
219
        if run_state == "up":
220
            opcode = "OP_INSTANCE_REBOOT"
221
        else:
222
            opcode = "OP_INSTANCE_SHUTDOWN"
223

    
224
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
225
                                  status="success",
226
                                  logmsg="Reconciliation: simulated event")
227

    
228
        message.channel.basic_ack(message.delivery_tag)
229
    except KeyError as k:
230
        _logger.error("Malformed incoming JSON, missing attributes: %s", k)
231
    except Exception as e:
232
        _logger.exception("Unexpected error")
233

    
234

    
235
def dummy_proc(message):
236
    try:
237
        _logger.debug("Msg: %s" %(message.body))
238
        message.channel.basic_ack(message.delivery_tag)
239
    except Exception as e:
240
        _logger.exception("Could not receive message")
241
        pass