Statistics
| Branch: | Tag: | Revision:

root / snf-cyclades-app / synnefo / logic / callbacks.py @ 3c755209

History | View | Annotate | Download (8 kB)

1
# Copyright 2011 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or without
4
# modification, are permitted provided that the following conditions
5
# are met:
6
#
7
#   1. Redistributions of source code must retain the above copyright
8
#      notice, this list of conditions and the following disclaimer.
9
#
10
#  2. Redistributions in binary form must reproduce the above copyright
11
#     notice, this list of conditions and the following disclaimer in the
12
#     documentation and/or other materials provided with the distribution.
13
#
14
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
15
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17
# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
18
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24
# SUCH DAMAGE.
25
#
26
# The views and conclusions contained in the software and documentation are
27
# those of the authors and should not be interpreted as representing official
28
# policies, either expressed or implied, of GRNET S.A.
29

    
30
# Callback functions used by the dispatcher to process incoming notifications
31
# from AMQP queues.
32

    
33
import logging
34
import socket
35
import traceback
36
import json
37
import sys
38

    
39
from synnefo.db.models import VirtualMachine
40
from synnefo.logic import utils, backend
41

    
42

    
43
log = logging.getLogger()
44

    
45

    
46
def update_db(message):
47
    """Process a notification of type 'ganeti-op-status'"""
48
    log.debug("Processing ganeti-op-status msg: %s", message.body)
49
    msg = None
50
    try:
51
        msg = json.loads(message.body)
52

    
53
        if msg["type"] != "ganeti-op-status":
54
            log.error("Message is of unknown type %s.", msg["type"])
55
            return
56

    
57
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
58
            return status_job_finished(message)
59

    
60
        vmid = utils.id_from_instance_name(msg["instance"])
61
        vm = VirtualMachine.objects.get(id=vmid)
62

    
63
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
64
                                  msg["status"], msg["logmsg"])
65
        log.debug("Done processing ganeti-op-status msg for vm %s.",
66
                      msg["instance"])
67
        message.channel.basic_ack(message.delivery_tag)
68
    except KeyError:
69
        log.error("Malformed incoming JSON, missing attributes: %s",
70
                      message.body)
71
    except VirtualMachine.InvalidBackendIdError:
72
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
73
    except VirtualMachine.InvalidBackendMsgError, e:
74
        log.debug("Ignoring msg of unknown type: %s.", e)
75
    except VirtualMachine.DoesNotExist:
76
        log.error("VM for instance %s with id %d not found in DB.",
77
                      msg["instance"], vmid)
78
    except Exception as e:
79
        log.exception("Unexpected error, msg: %s", msg)
80

    
81

    
82
def update_net(message):
83
    """Process a notification of type 'ganeti-net-status'"""
84
    log.debug("Processing ganeti-net-status msg: %s", message.body)
85
    msg = None
86
    try:
87
        msg = json.loads(message.body)
88

    
89
        if msg["type"] != "ganeti-net-status":
90
            log.error("Message is of unknown type %s", msg["type"])
91
            return
92

    
93
        vmid = utils.id_from_instance_name(msg["instance"])
94
        vm = VirtualMachine.objects.get(id=vmid)
95

    
96
        backend.process_net_status(vm, msg["nics"])
97
        log.debug("Done processing ganeti-net-status msg for vm %s.",
98
                      msg["instance"])
99
        message.channel.basic_ack(message.delivery_tag)
100
    except KeyError:
101
        log.error("Malformed incoming JSON, missing attributes: %s",
102
                      message.body)
103
    except VirtualMachine.InvalidBackendIdError:
104
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
105
    except VirtualMachine.DoesNotExist:
106
        log.error("VM for instance %s with id %d not found in DB.",
107
                      msg["instance"], vmid)
108
    except Exception as e:
109
        log.exception("Unexpected error, msg: %s", msg)
110

    
111

    
112
def update_build_progress(message):
113
    """Process a create progress message"""
114
    log.debug("Processing ganeti-create-progress msg: %s", message.body)
115
    msg = None
116
    try:
117
        msg = json.loads(message.body)
118

    
119
        if msg['type'] != "ganeti-create-progress":
120
            log.error("Message is of unknown type %s", msg["type"])
121
            return
122

    
123
        # XXX: The following assumes names like snf-12
124
        vmid = msg['instance'].split('-')[1]
125
        vm = VirtualMachine.objects.get(id=vmid)
126

    
127
        backend.process_create_progress(vm, msg['rprogress'], None)
128
        log.debug("Done processing ganeti-create-progress msg for vm %s.",
129
                      msg["instance"])
130
        message.channel.basic_ack(message.delivery_tag)
131
    except KeyError:
132
        log.error("Malformed incoming JSON, missing attributes: %s",
133
                      message.body)
134
    except Exception as e:
135
        log.exception("Unexpected error, msg: %s", msg)
136
        raise
137

    
138

    
139
def trigger_status_update(message):
140
    """Triggers a status update job for a specific VM id"""
141
    log.debug("Request to trigger status update: %s", message.body)
142
    msg = None
143
    try:
144
        msg = json.loads(message.body)
145

    
146
        if msg["type"] != "reconcile":
147
             log.error("Message is of unknown type %s", msg["type"])
148
             return
149

    
150
        if msg["vmid"] == "":
151
            log.error("Reconciliation message does not specify a VM id")
152
            return
153

    
154
        vm = VirtualMachine.objects.get(id=msg["vmid"])
155
        backend.request_status_update(vm)
156

    
157
        message.channel.basic_ack(message.delivery_tag)
158
    except KeyError as k:
159
        log.error("Malformed incoming JSON, missing attributes: %s", k)
160
    except Exception as e:
161
        log.exception("Unexpected error, msg: %s", msg)
162

    
163

    
164
def status_job_finished(message):
165
    """Updates VM status based on a previously sent status update request"""
166
    msg = None
167
    try:
168
        msg = json.loads(message.body)
169

    
170
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
171
            log.error("Message is of unknown type %s", msg["operation"])
172
            return
173

    
174
        if msg["status"] != "success":
175
            log.warn("Ignoring non-success status update from job %d on VM %s",
176
                          msg['jobId'], msg['instance'])
177
            message.channel.basic_ack(message.delivery_tag)
178
            return
179

    
180
        status = backend.get_job_status(msg['jobId'])
181

    
182
        log.debug("Node status job result: %s", status)
183

    
184
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
185
             log.error("Status update is of unknown type %s",
186
                        status['summary'])
187
             return
188

    
189
        conf_state = status['opresult'][0][msg['instance']]['config_state']
190
        run_state = status['opresult'][0][msg['instance']]['run_state']
191

    
192
        # XXX: The following assumes names like snf-12
193
        instid = msg['instance'].split('-')[1]
194

    
195
        vm = VirtualMachine.objects.get(id = instid)
196

    
197
        if run_state == "up":
198
            opcode = "OP_INSTANCE_REBOOT"
199
        else:
200
            opcode = "OP_INSTANCE_SHUTDOWN"
201

    
202
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
203
                                  status="success",
204
                                  logmsg="Reconciliation: simulated event")
205

    
206
        message.channel.basic_ack(message.delivery_tag)
207
    except KeyError as k:
208
        log.error("Malformed incoming JSON, missing attributes: %s", k)
209
    except Exception as e:
210
        log.exception("Unexpected error, msg: %s", msg)
211

    
212

    
213
def dummy_proc(message):
214
    try:
215
        log.debug("Msg: %s", message.body)
216
        message.channel.basic_ack(message.delivery_tag)
217
    except Exception as e:
218
        log.exception("Could not receive message")
219
        pass