Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 9c0ac5af

History | View | Annotate | Download (8.9 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import socket
35
import traceback
36
import json
37
import sys
38

    
39
from synnefo.db.models import VirtualMachine
40
from synnefo.logic import utils, backend, email_send
41

    
42

    
43
log = logging.getLogger()
44

    
45

    
46
def update_db(message):
47
    """Process a notification of type 'ganeti-op-status'"""
48
    log.debug("Processing ganeti-op-status msg: %s", message.body)
49
    msg = None
50
    try:
51
        msg = json.loads(message.body)
52

    
53
        if msg["type"] != "ganeti-op-status":
54
            log.error("Message is of unknown type %s.", msg["type"])
55
            return
56

    
57
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
58
            return status_job_finished(message)
59

    
60
        vmid = utils.id_from_instance_name(msg["instance"])
61
        vm = VirtualMachine.objects.get(id=vmid)
62

    
63
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
64
                                  msg["status"], msg["logmsg"])
65
        log.debug("Done processing ganeti-op-status msg for vm %s.",
66
                      msg["instance"])
67
        message.channel.basic_ack(message.delivery_tag)
68
    except KeyError:
69
        log.error("Malformed incoming JSON, missing attributes: %s",
70
                      message.body)
71
    except VirtualMachine.InvalidBackendIdError:
72
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
73
    except VirtualMachine.InvalidBackendMsgError, e:
74
        log.debug("Ignoring msg of unknown type: %s.", e)
75
    except VirtualMachine.DoesNotExist:
76
        log.error("VM for instance %s with id %d not found in DB.",
77
                      msg["instance"], vmid)
78
    except Exception as e:
79
        log.exception("Unexpected error, msg: %s", msg)
80

    
81

    
82
def update_net(message):
83
    """Process a notification of type 'ganeti-net-status'"""
84
    log.debug("Processing ganeti-net-status msg: %s", message.body)
85
    msg = None
86
    try:
87
        msg = json.loads(message.body)
88

    
89
        if msg["type"] != "ganeti-net-status":
90
            log.error("Message is of unknown type %s", msg["type"])
91
            return
92

    
93
        vmid = utils.id_from_instance_name(msg["instance"])
94
        vm = VirtualMachine.objects.get(id=vmid)
95

    
96
        backend.process_net_status(vm, msg["nics"])
97
        log.debug("Done processing ganeti-net-status msg for vm %s.",
98
                      msg["instance"])
99
        message.channel.basic_ack(message.delivery_tag)
100
    except KeyError:
101
        log.error("Malformed incoming JSON, missing attributes: %s",
102
                      message.body)
103
    except VirtualMachine.InvalidBackendIdError:
104
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
105
    except VirtualMachine.DoesNotExist:
106
        log.error("VM for instance %s with id %d not found in DB.",
107
                      msg["instance"], vmid)
108
    except Exception as e:
109
        log.exception("Unexpected error, msg: %s", msg)
110

    
111

    
112
def send_email(message):
113
    """Process an email submission request"""
114
    msg = None
115
    try:
116
        msg = json.loads(message.body)
117

    
118
        sent = email_send.send(sender=msg['frm'], recipient = msg['to'],
119
                        body=msg['body'], subject=msg['subject'])
120

    
121
        if not sent:
122
            log.warn("Failed to send email to %s", msg['to'])
123
        else:
124
            message.channel.basic_ack(message.delivery_tag)
125
    except KeyError:
126
        log.error("Malformed incoming JSON, missing attributes: %s",
127
                      message.body)
128
    except socket.error as e:
129
        log.error("Cannot connect to SMTP server:%s\n", e)
130
    except Exception as e:
131
        log.exception("Unexpected error, msg: %s", msg)
132
        raise
133

    
134

    
135
def update_credits(message):
136
    log.debug("Request to update credits")
137
    message.channel.basic_ack(message.delivery_tag)
138

    
139

    
140
def update_build_progress(message):
141
    """Process a create progress message"""
142
    log.debug("Processing ganeti-create-progress msg: %s", message.body)
143
    msg = None
144
    try:
145
        msg = json.loads(message.body)
146

    
147
        if msg['type'] != "ganeti-create-progress":
148
            log.error("Message is of unknown type %s", msg["type"])
149
            return
150

    
151
        # XXX: The following assumes names like snf-12
152
        vmid = msg['instance'].split('-')[1]
153
        vm = VirtualMachine.objects.get(id=vmid)
154

    
155
        backend.process_create_progress(vm, msg['rprogress'], None)
156
        log.debug("Done processing ganeti-create-progress msg for vm %s.",
157
                      msg["instance"])
158
        message.channel.basic_ack(message.delivery_tag)
159
    except KeyError:
160
        log.error("Malformed incoming JSON, missing attributes: %s",
161
                      message.body)
162
    except Exception as e:
163
        log.exception("Unexpected error, msg: %s", msg)
164
        raise
165

    
166

    
167
def trigger_status_update(message):
168
    """Triggers a status update job for a specific VM id"""
169
    log.debug("Request to trigger status update: %s", message.body)
170
    msg = None
171
    try:
172
        msg = json.loads(message.body)
173

    
174
        if msg["type"] != "reconcile":
175
             log.error("Message is of unknown type %s", msg["type"])
176
             return
177

    
178
        if msg["vmid"] == "":
179
            log.error("Reconciliation message does not specify a VM id")
180
            return
181

    
182
        vm = VirtualMachine.objects.get(id=msg["vmid"])
183
        backend.request_status_update(vm)
184

    
185
        message.channel.basic_ack(message.delivery_tag)
186
    except KeyError as k:
187
        log.error("Malformed incoming JSON, missing attributes: %s", k)
188
    except Exception as e:
189
        log.exception("Unexpected error, msg: %s", msg)
190

    
191

    
192
def status_job_finished(message):
193
    """Updates VM status based on a previously sent status update request"""
194
    msg = None
195
    try:
196
        msg = json.loads(message.body)
197

    
198
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
199
            log.error("Message is of unknown type %s", msg["operation"])
200
            return
201

    
202
        if msg["status"] != "success":
203
            log.warn("Ignoring non-success status update from job %d on VM %s",
204
                          msg['jobId'], msg['instance'])
205
            message.channel.basic_ack(message.delivery_tag)
206
            return
207

    
208
        status = backend.get_job_status(msg['jobId'])
209

    
210
        log.debug("Node status job result: %s", status)
211

    
212
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
213
             log.error("Status update is of unknown type %s",
214
                        status['summary'])
215
             return
216

    
217
        conf_state = status['opresult'][0][msg['instance']]['config_state']
218
        run_state = status['opresult'][0][msg['instance']]['run_state']
219

    
220
        # XXX: The following assumes names like snf-12
221
        instid = msg['instance'].split('-')[1]
222

    
223
        vm = VirtualMachine.objects.get(id = instid)
224

    
225
        if run_state == "up":
226
            opcode = "OP_INSTANCE_REBOOT"
227
        else:
228
            opcode = "OP_INSTANCE_SHUTDOWN"
229

    
230
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
231
                                  status="success",
232
                                  logmsg="Reconciliation: simulated event")
233

    
234
        message.channel.basic_ack(message.delivery_tag)
235
    except KeyError as k:
236
        log.error("Malformed incoming JSON, missing attributes: %s", k)
237
    except Exception as e:
238
        log.exception("Unexpected error, msg: %s", msg)
239

    
240

    
241
def dummy_proc(message):
242
    try:
243
        log.debug("Msg: %s", message.body)
244
        message.channel.basic_ack(message.delivery_tag)
245
    except Exception as e:
246
        log.exception("Could not receive message")
247
        pass