Statistics
| Branch: | Tag: | Revision:

root / logic / dispatcher_callbacks.py @ 8007ba7b

History | View | Annotate | Download (5.5 kB)

1
#
2
# Callback functions used by the dispatcher
3
# to process incoming notifications from AMQP queues.
4
#
5
# Copyright 2010 Greek Research and Technology Network
6
#
7
import traceback
8
import json
9
import logging
10
import sys
11

    
12
from synnefo.db.models import VirtualMachine
13
from synnefo.logic import utils, backend
14

    
15
_logger = logging.getLogger("synnefo.dispatcher")
16

    
17
def update_db(message):
18
    """Process the status of a VM based on a ganeti status message"""
19
    _logger.debug("Processing ganeti-op-status msg: %s", message.body)
20
    try:
21
        msg = json.loads(message.body)
22

    
23
        if msg["type"] != "ganeti-op-status":
24
            _logger.error("Message is of unknown type %s.", msg["type"])
25
            return
26

    
27
        vmid = utils.id_from_instance_name(msg["instance"])
28
        vm = VirtualMachine.objects.get(id=vmid)
29

    
30
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
31
                                  msg["status"], msg["logmsg"])
32
        _logger.debug("Done processing ganeti-op-status msg for vm %s.",
33
                      msg["instance"])
34
        message.channel.basic_ack(message.delivery_tag)
35
    except KeyError:
36
        _logger.error("Malformed incoming JSON, missing attributes: %s",
37
                      message.body)
38
    except VirtualMachine.InvalidBackendIdError:
39
        _logger.debug("Ignoring msg for unknown instance %s.",
40
                      msg["instance"])
41
    except VirtualMachine.DoesNotExist:
42
        _logger.error("VM for instance %s with id %d not found in DB.",
43
                      msg["instance"], vmid)
44
    except Exception as e:
45
        _logger.error("Unexpected error:\n%s" %
46
            "".join(traceback.format_exception(*sys.exc_info())))
47

    
48

    
49
def update_net(message):
50
    """Process a network status update notification from Ganeti"""
51
    _logger.debug("Processing ganeti-net-status msg: %s", message.body)
52
    try:
53
        msg = json.loads(message.body)
54

    
55
        if msg["type"] != "ganeti-net-status":
56
            _logger.error("Message is of unknown type %s", msg["type"])
57
            return
58

    
59
        vmid = utils.id_from_instance_name(msg["instance"])
60
        vm = VirtualMachine.objects.get(id=vmid)
61

    
62
        backend.process_net_status(vm, msg["nics"])
63
        _logger.debug("Done processing ganeti-net-status msg for vm %s.",
64
                      msg["instance"])
65
        message.channel.basic_ack(message.delivery_tag)
66
    except KeyError:
67
        _logger.error("Malformed incoming JSON, missing attributes: %s",
68
                      message.body)
69
    except VirtualMachine.InvalidBackendIdError:
70
        _logger.debug("Ignoring msg for unknown instance %s.",
71
                      msg["instance"])
72
    except VirtualMachine.DoesNotExist:
73
        _logger.error("VM for instance %s with id %d not found in DB.",
74
                      msg["instance"], vmid)
75
    except Exception as e:
76
        _logger.error("Unexpected error:\n%s" %
77
            "".join(traceback.format_exception(*sys.exc_info())))
78

    
79

    
80
def send_email(message):
81
    _logger.debug("Request to send email message")
82
    message.channel.basic_ack(message.delivery_tag)
83

    
84

    
85
def update_credits(message):
86
    _logger.debug("Request to update credits")
87
    message.channel.basic_ack(message.delivery_tag)
88

    
89
def trigger_status_update(message):
90
    _logger.debug("Request to trigger status update:", message.body)
91

    
92
    try:
93
        msg = json.loads(message.body)
94

    
95
        if msg["type"] != "reconciliate" :
96
             _logger.error("Message is of unknown type %s", msg["type"])
97
             return
98

    
99
        if msg["vm-id"] == "" :
100
            _logger.error("Message does not specify a VM id")
101
            return
102

    
103
        vm = VirtualMachine.objects.get(id=msg["vm-id"])
104
        backend.request_status_update(vm)
105

    
106
        message.channel.basic_ack(message.delivery_tag)
107
    except KeyError:
108
        _logger.error("Malformed incoming JSON, missing attributes: %s",
109
                      message.body)
110
    except Exception as e:
111
        _logger.error("Unexpected error:\n%s" %
112
                      "".join(traceback.format_exception(*sys.exc_info())))
113

    
114
def status_job_finished (message) :
115
    _logger.debug("Job status message received:", message.body)
116
    try:
117

    
118
        msg = message.body;
119

    
120
        if msg['operation'] != u'OP_INSTANCE_QUERY_DATA':
121
            _logger.error("Message is of unknown type %s", msg["operation"])
122
            return
123

    
124
        if msg["status"] != "success" :
125
            _logger.error("Status job %d for %s did not finish properly",
126
                          msg['jobId'], msg['instance'])
127
            return
128

    
129
        status = backend.get_job_status(msg['jobid'])
130

    
131
        if status["summary"] != "INSTANCE_QUERY_DATA" or type(status["opresult"]) is not list:
132
             _logger.error("Status is of unknown type %s", msg["summary"])
133
             return
134

    
135
        req_state = status['opresult'][msg['instance']]['config_state']
136
        run_state = status['opresult'][msg['instance']]['run_state']
137
        vm = VirtualMachine.objects.get(name=msg['instance'])
138
        backend.update_status(vm, run_state)
139
        
140
        message.channel.basic_ack(message.delivery_tag)
141
    except KeyError:
142
        _logger.error("Malformed incoming JSON, missing attributes: %s",
143
                      message.body)
144
    except Exception as e:
145
        _logger.error("Unexpected error:\n%s" %
146
                      "".join(traceback.format_exception(*sys.exc_info())))
147

    
148
def dummy_proc(message):
149
    try:
150
        msg = json.loads(message.body)
151
        _logger.debug("Msg (exchange:%s) ", msg)
152
        message.channel.basic_ack(message.delivery_tag)
153
    except Exception as e:
154
        _logger.error("Could not receive message %s" % e.message)
155
        pass