44 |
44 |
log = logging.getLogger()
|
45 |
45 |
|
46 |
46 |
|
47 |
|
def is_update_required(func):
|
48 |
|
"""
|
49 |
|
Decorator for checking if an incoming message needs to update the db.
|
50 |
|
|
51 |
|
The database will not be updated in the following cases:
|
52 |
|
- The message has been redelivered and the action has already been
|
53 |
|
completed. In this case the event_time will be equal with the one
|
54 |
|
in the database.
|
55 |
|
- The message describes a previous state in the ganeti, from the one that is
|
56 |
|
described in the db. In this case the event_time will be smaller from the
|
57 |
|
one in the database.
|
|
47 |
def handle_message_delivery(func):
|
|
48 |
""" Generic decorator for handling messages.
|
58 |
49 |
|
59 |
|
This decorator is also acknowledging the messages to the AMQP broker.
|
|
50 |
This decorator is responsible for converting the message into json format,
|
|
51 |
handling of common exceptions and acknowledment of message if needed.
|
60 |
52 |
|
61 |
53 |
"""
|
62 |
54 |
@wraps(func)
|
63 |
55 |
def wrapper(client, message, *args, **kwargs):
|
64 |
56 |
try:
|
65 |
57 |
msg = json.loads(message['body'])
|
66 |
|
|
67 |
|
event_time = merge_time(msg['event_time'])
|
68 |
|
|
69 |
|
vm_id = utils.id_from_instance_name(msg["instance"])
|
70 |
|
vm = VirtualMachine.objects.get(id=vm_id)
|
71 |
|
|
72 |
|
db_time = vm.backendtime
|
73 |
|
if event_time <= db_time:
|
74 |
|
format_ = "%d/%m/%y %H:%M:%S:%f"
|
75 |
|
log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
|
76 |
|
message,
|
77 |
|
event_time.strftime(format_),
|
78 |
|
db_time.strftime(format_))
|
79 |
|
client.basic_ack(message)
|
80 |
|
return
|
81 |
|
|
82 |
|
# New message. Update the database!
|
83 |
|
func(vm, msg)
|
84 |
|
|
|
58 |
func(msg)
|
|
59 |
client.basic_ack(message)
|
85 |
60 |
except ValueError:
|
86 |
61 |
log.error("Incoming message not in JSON format: %s", message)
|
87 |
62 |
client.basic_ack(message)
|
... | ... | |
89 |
64 |
log.error("Malformed incoming JSON, missing attributes: %s",
|
90 |
65 |
message)
|
91 |
66 |
client.basic_ack(message)
|
|
67 |
except Exception as e:
|
|
68 |
log.exception("Unexpected error: %s, msg: %s", e, msg)
|
|
69 |
|
|
70 |
return wrapper
|
|
71 |
|
|
72 |
def instance_from_msg(func):
|
|
73 |
""" Decorator for getting the VirtualMachine object of the msg.
|
|
74 |
|
|
75 |
"""
|
|
76 |
@handle_message_delivery
|
|
77 |
@wraps(func)
|
|
78 |
def wrapper(msg):
|
|
79 |
try:
|
|
80 |
vm_id = utils.id_from_instance_name(msg["instance"])
|
|
81 |
vm = VirtualMachine.objects.get(id=vm_id)
|
|
82 |
func(vm, msg)
|
92 |
83 |
except VirtualMachine.InvalidBackendIdError:
|
93 |
84 |
log.debug("Ignoring msg for unknown instance %s.", msg['instance'])
|
94 |
|
client.basic_ack(message)
|
95 |
85 |
except VirtualMachine.DoesNotExist:
|
96 |
86 |
log.error("VM for instance %s with id %d not found in DB.",
|
97 |
87 |
msg['instance'], vm_id)
|
98 |
|
client.basic_ack(message)
|
99 |
|
except Exception as e:
|
100 |
|
log.exception("Unexpected error: %s, msg: %s", e, msg)
|
101 |
|
else:
|
102 |
|
# Acknowledge the message
|
103 |
|
client.basic_ack(message)
|
|
88 |
return wrapper
|
|
89 |
|
|
90 |
def network_from_msg(func):
|
|
91 |
""" Decorator for getting the Network object of the msg.
|
|
92 |
|
|
93 |
"""
|
|
94 |
@handle_message_delivery
|
|
95 |
@wraps(func)
|
|
96 |
def wrapper(msg):
|
|
97 |
try:
|
|
98 |
network_id = utils.id_from_network_name(msg["network"])
|
|
99 |
network = Network.objects.get(id=network_id)
|
|
100 |
func(network, msg)
|
|
101 |
except Network.InvalidBackendIdError:
|
|
102 |
log.debug("Ignoring msg for unknown network %s.", msg['network'])
|
|
103 |
except Network.DoesNotExist:
|
|
104 |
log.error("Network %s with id %d not found in DB.",
|
|
105 |
msg['network'], vm_id)
|
|
106 |
return wrapper
|
|
107 |
|
|
108 |
def if_update_required(func):
|
|
109 |
"""
|
|
110 |
Decorator for checking if an incoming message needs to update the db.
|
|
111 |
|
|
112 |
The database will not be updated in the following cases:
|
|
113 |
- The message has been redelivered and the action has already been
|
|
114 |
completed. In this case the event_time will be equal with the one
|
|
115 |
in the database.
|
|
116 |
- The message describes a previous state in the ganeti, from the one that is
|
|
117 |
described in the db. In this case the event_time will be smaller from the
|
|
118 |
one in the database.
|
|
119 |
|
|
120 |
"""
|
|
121 |
@wraps(func)
|
|
122 |
def wrapper(target, msg):
|
|
123 |
event_time = merge_time(msg['event_time'])
|
|
124 |
db_time = target.backendtime
|
|
125 |
|
|
126 |
if event_time <= db_time:
|
|
127 |
format_ = "%d/%m/%y %H:%M:%S:%f"
|
|
128 |
log.debug("Ignoring message %s.\nevent_timestamp: %s db_timestamp: %s",
|
|
129 |
msg,
|
|
130 |
event_time.strftime(format_),
|
|
131 |
db_time.strftime(format_))
|
|
132 |
return
|
|
133 |
# New message. Update the database!
|
|
134 |
func(target, msg, event_time)
|
104 |
135 |
|
105 |
136 |
return wrapper
|
106 |
137 |
|
107 |
138 |
|
108 |
|
@is_update_required
|
109 |
|
def update_db(vm, msg):
|
|
139 |
@instance_from_msg
|
|
140 |
@if_update_required
|
|
141 |
def update_db(vm, msg, event_time):
|
110 |
142 |
"""Process a notification of type 'ganeti-op-status'"""
|
111 |
143 |
log.debug("Processing ganeti-op-status msg: %s", msg)
|
112 |
144 |
|
... | ... | |
114 |
146 |
log.error("Message is of unknown type %s.", msg['type'])
|
115 |
147 |
return
|
116 |
148 |
|
117 |
|
event_time = merge_time(msg['event_time'])
|
118 |
149 |
backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
|
119 |
150 |
msg['status'], msg['logmsg'])
|
120 |
151 |
|
... | ... | |
122 |
153 |
msg['instance'])
|
123 |
154 |
|
124 |
155 |
|
125 |
|
@is_update_required
|
126 |
|
def update_net(vm, msg):
|
|
156 |
@instance_from_msg
|
|
157 |
@if_update_required
|
|
158 |
def update_net(vm, msg, event_time):
|
127 |
159 |
"""Process a notification of type 'ganeti-net-status'"""
|
128 |
160 |
log.debug("Processing ganeti-net-status msg: %s", msg)
|
129 |
161 |
|
... | ... | |
131 |
163 |
log.error("Message is of unknown type %s", msg['type'])
|
132 |
164 |
return
|
133 |
165 |
|
134 |
|
event_time = merge_time(msg['event_time'])
|
135 |
166 |
backend.process_net_status(vm, event_time, msg['nics'])
|
136 |
167 |
|
137 |
168 |
log.debug("Done processing ganeti-net-status msg for vm %s.",
|
138 |
169 |
msg["instance"])
|
139 |
170 |
|
140 |
171 |
|
141 |
|
@is_update_required
|
142 |
|
def update_build_progress(vm, msg):
|
|
172 |
@network_from_msg
|
|
173 |
@if_update_required
|
|
174 |
def update_network(network, msg, event_time):
|
|
175 |
"""Process a notification of type 'ganeti-network-status'"""
|
|
176 |
log.debug("Processing ganeti-network-status msg: %s", msg)
|
|
177 |
|
|
178 |
if msg['type'] != "ganeti-network-status":
|
|
179 |
log.error("Message is of unknown type %s.", msg['type'])
|
|
180 |
return
|
|
181 |
|
|
182 |
|
|
183 |
log.debug("Done processing ganeti-network-status msg for vm %s.",
|
|
184 |
msg['instance'])
|
|
185 |
|
|
186 |
|
|
187 |
@instance_from_msg
|
|
188 |
@if_update_required
|
|
189 |
def update_build_progress(vm, msg, event_time):
|
143 |
190 |
"""Process a create progress message"""
|
144 |
191 |
log.debug("Processing ganeti-create-progress msg: %s", msg)
|
145 |
192 |
|
... | ... | |
147 |
194 |
log.error("Message is of unknown type %s", msg['type'])
|
148 |
195 |
return
|
149 |
196 |
|
150 |
|
event_time = merge_time(msg['event_time'])
|
151 |
197 |
backend.process_create_progress(vm, event_time, msg['rprogress'], None)
|
152 |
198 |
|
153 |
199 |
log.debug("Done processing ganeti-create-progress msg for vm %s.",
|
154 |
200 |
msg['instance'])
|
155 |
201 |
|
156 |
202 |
|
157 |
|
def dummy_proc(client, message):
|
|
203 |
def dummy_proc(client, message, *args, **kwargs):
|
158 |
204 |
try:
|
159 |
205 |
log.debug("Msg: %s", message['body'])
|
160 |
206 |
client.basic_ack(message)
|