Revision c4e55622 snf-cyclades-app/synnefo/logic/callbacks.py

b/snf-cyclades-app/synnefo/logic/callbacks.py
31 31
# from AMQP queues.
32 32

  
33 33
import logging
34
import socket
35
import traceback
36 34
import json
37
import sys
35
from functools import wraps
36
from datetime import datetime
38 37

  
39 38
from synnefo.db.models import VirtualMachine
40 39
from synnefo.logic import utils, backend
41 40

  
41
from synnefo.lib.utils import merge_time
42

  
42 43

  
43 44
log = logging.getLogger()
44 45

  
45 46

  
46
def update_db(message):
47
def is_update_required(func):
48
    """
49
    Decorator for checking if an incoming message needs to update the db.
50

  
51
    The database will not be updated in the following cases:
52
    - The message has been redelivered and the action has already been
53
      completed. In this case the event_time will be equal with the one
54
      in the database.
55
    - The message describes a previous state in the ganeti, from the one that is
56
      described in the db. In this case the event_time will be smaller from the
57
      one in the database.
58

  
59
    This decorator is also acknowledging the messages to the AMQP broker.
60

  
61
    """
62
    @wraps(func)
63
    def wrapper(client, message, *args, **kwargs):
64
        log.debug("Checking if action is required for msg %s",  message)
65

  
66
        try:
67
            msg = json.loads(message['body'])
68

  
69
            event_time = merge_time(msg['event_time'])
70

  
71
            vm_id = utils.id_from_instance_name(msg["instance"])
72
            vm = VirtualMachine.objects.get(id=vm_id)
73

  
74
            db_time = vm.backendtime
75
            if event_time <= db_time:
76
                format_ = "%d/%m/%y %H:%M:%S:%f"
77
                log.debug("Ignoring message. event_timestamp: %s db_timestamp: %s",
78
                          event_time.strftime(format_),
79
                          db_time.strftime(format_))
80
                client.basic_ack(message)
81
                return
82

  
83
            # New message. Update the database!
84
            func(client, message)
85

  
86
        except ValueError:
87
            log.error("Incoming message not in JSON format: %s", message)
88
            client.basic_ack(message)
89
        except KeyError:
90
            log.error("Malformed incoming JSON, missing attributes: %s",
91
                      message)
92
            client.basic_ack(message)
93
        except VirtualMachine.InvalidBackendIdError:
94
            log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
95
            client.basic_ack(message)
96
        except VirtualMachine.DoesNotExist:
97
            log.error("VM for instance %s with id %d not found in DB.",
98
                      msg['instance'], vm_id)
99
            client.basic_ack(message)
100
        except Exception as e:
101
            log.exception("Unexpected error: %s, msg: %s", e, msg)
102
        else:
103
            # Acknowledge the message
104
            client.basic_ack(message)
105

  
106
    return wrapper
107

  
108

  
109
@is_update_required
110
def update_db(client, message):
47 111
    """Process a notification of type 'ganeti-op-status'"""
48
    log.debug("Processing ganeti-op-status msg: %s", message.body)
49
    msg = None
50
    try:
51
        msg = json.loads(message.body)
52

  
53
        if msg["type"] != "ganeti-op-status":
54
            log.error("Message is of unknown type %s.", msg["type"])
55
            return
56

  
57
        if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
58
            return status_job_finished(message)
59

  
60
        vmid = utils.id_from_instance_name(msg["instance"])
61
        vm = VirtualMachine.objects.get(id=vmid)
62

  
63
        backend.process_op_status(vm, msg["jobId"], msg["operation"],
64
                                  msg["status"], msg["logmsg"])
65
        log.debug("Done processing ganeti-op-status msg for vm %s.",
66
                      msg["instance"])
67
        message.channel.basic_ack(message.delivery_tag)
68
    except KeyError:
69
        log.error("Malformed incoming JSON, missing attributes: %s",
70
                      message.body)
71
    except VirtualMachine.InvalidBackendIdError:
72
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
73
    except VirtualMachine.InvalidBackendMsgError, e:
74
        log.debug("Ignoring msg of unknown type: %s.", e)
75
    except VirtualMachine.DoesNotExist:
76
        log.error("VM for instance %s with id %d not found in DB.",
77
                      msg["instance"], vmid)
78
    except Exception as e:
79
        log.exception("Unexpected error, msg: %s", msg)
112
    log.debug("Processing ganeti-op-status msg: %s", message['body'])
113

  
114
    msg = json.loads(message['body'])
80 115

  
116
    if msg['type'] != "ganeti-op-status":
117
        log.error("Message is of unknown type %s.", msg['type'])
118
        return
81 119

  
82
def update_net(message):
120
    if msg['operation'] == "OP_INSTANCE_QUERY_DATA":
121
        return status_job_finished(client, message)
122

  
123
    vm_id = utils.id_from_instance_name(msg['instance'])
124
    vm = VirtualMachine.objects.get(id=vm_id)
125

  
126
    event_time = merge_time(msg['event_time'])
127
    backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
128
                              msg['status'], msg['logmsg'])
129

  
130
    log.debug("Done processing ganeti-op-status msg for vm %s.",
131
              msg['instance'])
132

  
133

  
134
@is_update_required
135
def update_net(client, message):
83 136
    """Process a notification of type 'ganeti-net-status'"""
84
    log.debug("Processing ganeti-net-status msg: %s", message.body)
85
    msg = None
86
    try:
87
        msg = json.loads(message.body)
88

  
89
        if msg["type"] != "ganeti-net-status":
90
            log.error("Message is of unknown type %s", msg["type"])
91
            return
92

  
93
        vmid = utils.id_from_instance_name(msg["instance"])
94
        vm = VirtualMachine.objects.get(id=vmid)
95

  
96
        backend.process_net_status(vm, msg["nics"])
97
        log.debug("Done processing ganeti-net-status msg for vm %s.",
98
                      msg["instance"])
99
        message.channel.basic_ack(message.delivery_tag)
100
    except KeyError:
101
        log.error("Malformed incoming JSON, missing attributes: %s",
102
                      message.body)
103
    except VirtualMachine.InvalidBackendIdError:
104
        log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
105
    except VirtualMachine.DoesNotExist:
106
        log.error("VM for instance %s with id %d not found in DB.",
107
                      msg["instance"], vmid)
108
    except Exception as e:
109
        log.exception("Unexpected error, msg: %s", msg)
137
    log.debug("Processing ganeti-net-status msg: %s", message['body'])
110 138

  
139
    msg = json.loads(message['body'])
111 140

  
112
def update_build_progress(message):
113
    """Process a create progress message"""
114
    log.debug("Processing ganeti-create-progress msg: %s", message.body)
115
    msg = None
116
    try:
117
        msg = json.loads(message.body)
118

  
119
        if msg['type'] != "ganeti-create-progress":
120
            log.error("Message is of unknown type %s", msg["type"])
121
            return
122

  
123
        # XXX: The following assumes names like snf-12
124
        vmid = msg['instance'].split('-')[1]
125
        vm = VirtualMachine.objects.get(id=vmid)
126

  
127
        backend.process_create_progress(vm, msg['rprogress'], None)
128
        log.debug("Done processing ganeti-create-progress msg for vm %s.",
129
                      msg["instance"])
130
        message.channel.basic_ack(message.delivery_tag)
131
    except KeyError:
132
        log.error("Malformed incoming JSON, missing attributes: %s",
133
                      message.body)
134
    except Exception as e:
135
        log.exception("Unexpected error, msg: %s", msg)
136
        raise
141
    if msg['type'] != "ganeti-net-status":
142
        log.error("Message is of unknown type %s", msg['type'])
143
        return
137 144

  
145
    vm_id = utils.id_from_instance_name(msg['instance'])
146
    vm = VirtualMachine.objects.get(id=vm_id)
138 147

  
139
def trigger_status_update(message):
140
    """Triggers a status update job for a specific VM id"""
141
    log.debug("Request to trigger status update: %s", message.body)
142
    msg = None
143
    try:
144
        msg = json.loads(message.body)
148
    event_time = merge_time(msg['event_time'])
149
    backend.process_net_status(vm, event_time, msg['nics'])
145 150

  
146
        if msg["type"] != "reconcile":
147
             log.error("Message is of unknown type %s", msg["type"])
148
             return
151
    log.debug("Done processing ganeti-net-status msg for vm %s.",
152
              msg["instance"])
149 153

  
150
        if msg["vmid"] == "":
151
            log.error("Reconciliation message does not specify a VM id")
152
            return
153 154

  
154
        vm = VirtualMachine.objects.get(id=msg["vmid"])
155
        backend.request_status_update(vm)
155
@is_update_required
156
def update_build_progress(client, message):
157
    """Process a create progress message"""
158
    log.debug("Processing ganeti-create-progress msg: %s", message['body'])
156 159

  
157
        message.channel.basic_ack(message.delivery_tag)
158
    except KeyError as k:
159
        log.error("Malformed incoming JSON, missing attributes: %s", k)
160
    except Exception as e:
161
        log.exception("Unexpected error, msg: %s", msg)
160
    msg = json.loads(message['body'])
162 161

  
162
    if msg['type'] != "ganeti-create-progress":
163
        log.error("Message is of unknown type %s", msg['type'])
164
        return
163 165

  
164
def status_job_finished(message):
165
    """Updates VM status based on a previously sent status update request"""
166
    msg = None
167
    try:
168
        msg = json.loads(message.body)
166
    vm_id = utils.id_from_instance_name(msg['instance'])
167
    vm = VirtualMachine.objects.get(id=vm_id)
169 168

  
170
        if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
171
            log.error("Message is of unknown type %s", msg["operation"])
172
            return
169
    event_time = merge_time(msg['event_time'])
170
    backend.process_create_progress(vm, event_time, msg['rprogress'], None)
173 171

  
174
        if msg["status"] != "success":
175
            log.warn("Ignoring non-success status update from job %d on VM %s",
176
                          msg['jobId'], msg['instance'])
177
            message.channel.basic_ack(message.delivery_tag)
178
            return
172
    log.debug("Done processing ganeti-create-progress msg for vm %s.",
173
              msg['instance'])
179 174

  
180
        status = backend.get_job_status(msg['jobId'])
181 175

  
182
        log.debug("Node status job result: %s", status)
176
def status_job_finished(client, message):
177
    """Updates VM status based on a previously sent status update request"""
183 178

  
184
        if status['summary'][0] != u'INSTANCE_QUERY_DATA':
185
             log.error("Status update is of unknown type %s",
186
                        status['summary'])
187
             return
179
    msg = json.loads(message['body'])
188 180

  
189
        conf_state = status['opresult'][0][msg['instance']]['config_state']
190
        run_state = status['opresult'][0][msg['instance']]['run_state']
181
    if msg['operation'] != 'OP_INSTANCE_QUERY_DATA':
182
        log.error("Message is of unknown type %s", msg['operation'])
183
        return
191 184

  
192
        # XXX: The following assumes names like snf-12
193
        instid = msg['instance'].split('-')[1]
185
    if msg['status'] != 'success':
186
        log.warn("Ignoring non-success status update from job %d on VM %s",
187
                 msg['jobId'], msg['instance'])
188
        client.basic_ack(message.delivery_tag)
189
        return
194 190

  
195
        vm = VirtualMachine.objects.get(id = instid)
191
    status = backend.get_job_status(msg['jobId'])
196 192

  
197
        if run_state == "up":
198
            opcode = "OP_INSTANCE_REBOOT"
199
        else:
200
            opcode = "OP_INSTANCE_SHUTDOWN"
193
    log.debug("Node status job result: %s", status)
201 194

  
202
        backend.process_op_status(vm=vm, jobid=msg['jobId'],opcode=opcode,
203
                                  status="success",
204
                                  logmsg="Reconciliation: simulated event")
195
    if status['summary'][0] != u'INSTANCE_QUERY_DATA':
196
         log.error("Status update is of unknown type %s",
197
                    status['summary'])
198
         return
205 199

  
206
        message.channel.basic_ack(message.delivery_tag)
207
    except KeyError as k:
208
        log.error("Malformed incoming JSON, missing attributes: %s", k)
209
    except Exception as e:
210
        log.exception("Unexpected error, msg: %s", msg)
200
    conf_state = status['opresult'][0][msg['instance']]['config_state']
201
    run_state = status['opresult'][0][msg['instance']]['run_state']
202

  
203
    vm_id = utils.id_from_instance_name(msg['instance'])
204
    vm = VirtualMachine.objects.get(id = vm_id)
211 205

  
206
    if run_state == "up":
207
        opcode = "OP_INSTANCE_REBOOT"
208
    else:
209
        opcode = "OP_INSTANCE_SHUTDOWN"
212 210

  
213
def dummy_proc(message):
211
    event_time = merge_time(msg['event_time'])
212
    backend.process_op_status(vm=vm, etime=event_time,
213
                              jobid=msg['jobId'], opcode=opcode,
214
                              status='success',
215
                              logmsg="Reconciliation: simulated event")
216

  
217
@is_update_required
218
def dummy_proc(client, message):
214 219
    try:
215
        log.debug("Msg: %s", message.body)
216
        message.channel.basic_ack(message.delivery_tag)
220
        log.debug("Msg: %s", message['body'])
221
        pass
217 222
    except Exception as e:
218 223
        log.exception("Could not receive message")
219
        pass

Also available in: Unified diff