Revision 22ee6892 snf-cyclades-app/synnefo/logic/callbacks.py
b/snf-cyclades-app/synnefo/logic/callbacks.py | ||
---|---|---|
33 | 33 |
import logging |
34 | 34 |
import json |
35 | 35 |
from functools import wraps |
36 |
from datetime import datetime |
|
37 | 36 |
|
38 |
from synnefo.db.models import VirtualMachine
|
|
37 |
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
|
|
39 | 38 |
from synnefo.logic import utils, backend |
40 | 39 |
|
41 | 40 |
from synnefo.lib.utils import merge_time |
... | ... | |
57 | 56 |
msg = json.loads(message['body']) |
58 | 57 |
func(msg) |
59 | 58 |
client.basic_ack(message) |
60 |
except ValueError: |
|
61 |
log.error("Incoming message not in JSON format: %s", message)
|
|
59 |
except ValueError as e:
|
|
60 |
log.error("Incoming message not in JSON format %s: %s", e, message)
|
|
62 | 61 |
client.basic_ack(message) |
63 |
except KeyError: |
|
64 |
log.error("Malformed incoming JSON, missing attributes: %s", |
|
65 |
message) |
|
62 |
except KeyError as e:
|
|
63 |
log.error("Malformed incoming JSON, missing attribute %s: %s",
|
|
64 |
e, message)
|
|
66 | 65 |
client.basic_ack(message) |
67 | 66 |
except Exception as e: |
68 | 67 |
log.exception("Unexpected error: %s, msg: %s", e, msg) |
69 | 68 |
|
70 | 69 |
return wrapper |
71 | 70 |
|
71 |
|
|
72 | 72 |
def instance_from_msg(func): |
73 | 73 |
""" Decorator for getting the VirtualMachine object of the msg. |
74 | 74 |
|
... | ... | |
87 | 87 |
msg['instance'], vm_id) |
88 | 88 |
return wrapper |
89 | 89 |
|
90 |
|
|
90 | 91 |
def network_from_msg(func): |
91 |
""" Decorator for getting the Network object of the msg. |
|
92 |
""" Decorator for getting the BackendNetwork object of the msg.
|
|
92 | 93 |
|
93 | 94 |
""" |
94 | 95 |
@handle_message_delivery |
... | ... | |
97 | 98 |
try: |
98 | 99 |
network_id = utils.id_from_network_name(msg["network"]) |
99 | 100 |
network = Network.objects.get(id=network_id) |
100 |
func(network, msg) |
|
101 |
backend = Backend.objects.get(clustername=msg['cluster']) |
|
102 |
backend_network = BackendNetwork.objects.get(network=network, |
|
103 |
backend=backend) |
|
104 |
func(backend_network, msg) |
|
101 | 105 |
except Network.InvalidBackendIdError: |
102 | 106 |
log.debug("Ignoring msg for unknown network %s.", msg['network']) |
103 | 107 |
except Network.DoesNotExist: |
104 |
log.error("Network %s with id %d not found in DB.", |
|
105 |
msg['network'], vm_id) |
|
108 |
log.error("Network %s not found in DB.", msg['network']) |
|
109 |
except Backend.DoesNotExist: |
|
110 |
log.error("Backend %s not found in DB.", msg['cluster']) |
|
111 |
except BackendNetwork.DoesNotExist: |
|
112 |
log.error("Network %s on backend %s not found in DB.", |
|
113 |
msg['network'], msg['cluster']) |
|
106 | 114 |
return wrapper |
107 | 115 |
|
116 |
|
|
108 | 117 |
def if_update_required(func): |
109 | 118 |
""" |
110 | 119 |
Decorator for checking if an incoming message needs to update the db. |
... | ... | |
113 | 122 |
- The message has been redelivered and the action has already been |
114 | 123 |
completed. In this case the event_time will be equal with the one |
115 | 124 |
in the database. |
116 |
- The message describes a previous state in the ganeti, from the one that is
|
|
117 |
described in the db. In this case the event_time will be smaller from the
|
|
118 |
one in the database. |
|
125 |
- The message describes a previous state in the ganeti, from the one that |
|
126 |
is described in the db. In this case the event_time will be smaller from
|
|
127 |
the one in the database.
|
|
119 | 128 |
|
120 | 129 |
""" |
121 | 130 |
@wraps(func) |
... | ... | |
123 | 132 |
event_time = merge_time(msg['event_time']) |
124 | 133 |
db_time = target.backendtime |
125 | 134 |
|
126 |
if event_time <= db_time: |
|
135 |
if db_time and event_time <= db_time:
|
|
127 | 136 |
format_ = "%d/%m/%y %H:%M:%S:%f" |
128 | 137 |
log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s", |
129 | 138 |
msg, |
... | ... | |
179 | 188 |
log.error("Message is of unknown type %s.", msg['type']) |
180 | 189 |
return |
181 | 190 |
|
191 |
backend.process_network_status(network, event_time, |
|
192 |
msg['jobId'], msg['operation'], |
|
193 |
msg['status'], msg['logmsg']) |
|
182 | 194 |
|
183 |
log.debug("Done processing ganeti-network-status msg for vm %s.",
|
|
184 |
msg['instance'])
|
|
195 |
log.debug("Done processing ganeti-network-status msg for network %s.",
|
|
196 |
msg['network'])
|
|
185 | 197 |
|
186 | 198 |
|
187 | 199 |
@instance_from_msg |
... | ... | |
205 | 217 |
log.debug("Msg: %s", message['body']) |
206 | 218 |
client.basic_ack(message) |
207 | 219 |
except Exception as e: |
208 |
log.exception("Could not receive message") |
|
220 |
log.exception("Could not receive message %s" % e) |
Also available in: Unified diff