Revision 86f046a8 logic/callbacks.py
b/logic/callbacks.py | ||
---|---|---|
30 | 30 |
# Callback functions used by the dispatcher to process incoming notifications |
31 | 31 |
# from AMQP queues. |
32 | 32 |
|
33 |
import logging |
|
33 | 34 |
import socket |
34 | 35 |
import traceback |
35 | 36 |
import json |
36 | 37 |
import sys |
37 | 38 |
|
38 | 39 |
from synnefo.db.models import VirtualMachine |
39 |
from synnefo.logic import utils, backend, email_send, log
|
|
40 |
from synnefo.logic import utils, backend, email_send |
|
40 | 41 |
|
41 |
_logger = log.get_logger("synnefo.dispatcher") |
|
42 |
|
|
43 |
log = logging.getLogger() |
|
42 | 44 |
|
43 | 45 |
|
44 | 46 |
def update_db(message): |
45 | 47 |
"""Process a notification of type 'ganeti-op-status'""" |
46 |
_logger.debug("Processing ganeti-op-status msg: %s", message.body)
|
|
48 |
log.debug("Processing ganeti-op-status msg: %s", message.body)
|
|
47 | 49 |
msg = None |
48 | 50 |
try: |
49 | 51 |
msg = json.loads(message.body) |
50 | 52 |
|
51 | 53 |
if msg["type"] != "ganeti-op-status": |
52 |
_logger.error("Message is of unknown type %s.", msg["type"])
|
|
54 |
log.error("Message is of unknown type %s.", msg["type"])
|
|
53 | 55 |
return |
54 | 56 |
|
55 | 57 |
if msg["operation"] == "OP_INSTANCE_QUERY_DATA": |
... | ... | |
60 | 62 |
|
61 | 63 |
backend.process_op_status(vm, msg["jobId"], msg["operation"], |
62 | 64 |
msg["status"], msg["logmsg"]) |
63 |
_logger.debug("Done processing ganeti-op-status msg for vm %s.",
|
|
65 |
log.debug("Done processing ganeti-op-status msg for vm %s.",
|
|
64 | 66 |
msg["instance"]) |
65 | 67 |
message.channel.basic_ack(message.delivery_tag) |
66 | 68 |
except KeyError: |
67 |
_logger.error("Malformed incoming JSON, missing attributes: %s",
|
|
69 |
log.error("Malformed incoming JSON, missing attributes: %s",
|
|
68 | 70 |
message.body) |
69 | 71 |
except VirtualMachine.InvalidBackendIdError: |
70 |
_logger.debug("Ignoring msg for unknown instance %s.", |
|
71 |
msg["instance"]) |
|
72 |
log.debug("Ignoring msg for unknown instance %s.", msg["instance"]) |
|
72 | 73 |
except VirtualMachine.InvalidBackendMsgError, e: |
73 |
_logger.debug("Ignoring msg of unknown type: %s.", e)
|
|
74 |
log.debug("Ignoring msg of unknown type: %s.", e)
|
|
74 | 75 |
except VirtualMachine.DoesNotExist: |
75 |
_logger.error("VM for instance %s with id %d not found in DB.",
|
|
76 |
log.error("VM for instance %s with id %d not found in DB.",
|
|
76 | 77 |
msg["instance"], vmid) |
77 | 78 |
except Exception as e: |
78 |
_logger.exception("Unexpected error, msg: %s", msg)
|
|
79 |
log.exception("Unexpected error, msg: %s", msg)
|
|
79 | 80 |
|
80 | 81 |
|
81 | 82 |
def update_net(message): |
82 | 83 |
"""Process a notification of type 'ganeti-net-status'""" |
83 |
_logger.debug("Processing ganeti-net-status msg: %s", message.body)
|
|
84 |
log.debug("Processing ganeti-net-status msg: %s", message.body)
|
|
84 | 85 |
msg = None |
85 | 86 |
try: |
86 | 87 |
msg = json.loads(message.body) |
87 | 88 |
|
88 | 89 |
if msg["type"] != "ganeti-net-status": |
89 |
_logger.error("Message is of unknown type %s", msg["type"])
|
|
90 |
log.error("Message is of unknown type %s", msg["type"])
|
|
90 | 91 |
return |
91 | 92 |
|
92 | 93 |
vmid = utils.id_from_instance_name(msg["instance"]) |
93 | 94 |
vm = VirtualMachine.objects.get(id=vmid) |
94 | 95 |
|
95 | 96 |
backend.process_net_status(vm, msg["nics"]) |
96 |
_logger.debug("Done processing ganeti-net-status msg for vm %s.",
|
|
97 |
log.debug("Done processing ganeti-net-status msg for vm %s.",
|
|
97 | 98 |
msg["instance"]) |
98 | 99 |
message.channel.basic_ack(message.delivery_tag) |
99 | 100 |
except KeyError: |
100 |
_logger.error("Malformed incoming JSON, missing attributes: %s",
|
|
101 |
log.error("Malformed incoming JSON, missing attributes: %s",
|
|
101 | 102 |
message.body) |
102 | 103 |
except VirtualMachine.InvalidBackendIdError: |
103 |
_logger.debug("Ignoring msg for unknown instance %s.", |
|
104 |
msg["instance"]) |
|
104 |
log.debug("Ignoring msg for unknown instance %s.", msg["instance"]) |
|
105 | 105 |
except VirtualMachine.DoesNotExist: |
106 |
_logger.error("VM for instance %s with id %d not found in DB.",
|
|
106 |
log.error("VM for instance %s with id %d not found in DB.",
|
|
107 | 107 |
msg["instance"], vmid) |
108 | 108 |
except Exception as e: |
109 |
_logger.exception("Unexpected error, msg: %s", msg)
|
|
109 |
log.exception("Unexpected error, msg: %s", msg)
|
|
110 | 110 |
|
111 | 111 |
|
112 | 112 |
def send_email(message): |
... | ... | |
119 | 119 |
body=msg['body'], subject=msg['subject']) |
120 | 120 |
|
121 | 121 |
if not sent: |
122 |
_logger.warn("Failed to send email to %s", msg['to'])
|
|
122 |
log.warn("Failed to send email to %s", msg['to'])
|
|
123 | 123 |
else: |
124 | 124 |
message.channel.basic_ack(message.delivery_tag) |
125 | 125 |
except KeyError: |
126 |
_logger.error("Malformed incoming JSON, missing attributes: %s",
|
|
126 |
log.error("Malformed incoming JSON, missing attributes: %s",
|
|
127 | 127 |
message.body) |
128 | 128 |
except socket.error as e: |
129 |
_logger.error("Cannot connect to SMTP server:%s\n", e)
|
|
129 |
log.error("Cannot connect to SMTP server:%s\n", e)
|
|
130 | 130 |
except Exception as e: |
131 |
_logger.exception("Unexpected error, msg: %s", msg)
|
|
131 |
log.exception("Unexpected error, msg: %s", msg)
|
|
132 | 132 |
raise |
133 | 133 |
|
134 | 134 |
|
135 | 135 |
def update_credits(message): |
136 |
_logger.debug("Request to update credits")
|
|
136 |
log.debug("Request to update credits")
|
|
137 | 137 |
message.channel.basic_ack(message.delivery_tag) |
138 | 138 |
|
139 | 139 |
|
140 | 140 |
def update_build_progress(message): |
141 | 141 |
"""Process a create progress message""" |
142 |
_logger.debug("Processing ganeti-create-progress msg: %s", message.body)
|
|
142 |
log.debug("Processing ganeti-create-progress msg: %s", message.body)
|
|
143 | 143 |
msg = None |
144 | 144 |
try: |
145 | 145 |
msg = json.loads(message.body) |
146 | 146 |
|
147 | 147 |
if msg['type'] != "ganeti-create-progress": |
148 |
_logger.error("Message is of unknown type %s", msg["type"])
|
|
148 |
log.error("Message is of unknown type %s", msg["type"])
|
|
149 | 149 |
return |
150 | 150 |
|
151 | 151 |
# XXX: The following assumes names like snf-12 |
... | ... | |
153 | 153 |
vm = VirtualMachine.objects.get(id=vmid) |
154 | 154 |
|
155 | 155 |
backend.process_create_progress(vm, msg['rprogress'], None) |
156 |
_logger.debug("Done processing ganeti-create-progress msg for vm %s.",
|
|
156 |
log.debug("Done processing ganeti-create-progress msg for vm %s.",
|
|
157 | 157 |
msg["instance"]) |
158 | 158 |
message.channel.basic_ack(message.delivery_tag) |
159 | 159 |
except KeyError: |
160 |
_logger.error("Malformed incoming JSON, missing attributes: %s",
|
|
160 |
log.error("Malformed incoming JSON, missing attributes: %s",
|
|
161 | 161 |
message.body) |
162 | 162 |
except Exception as e: |
163 |
_logger.exception("Unexpected error, msg: %s", msg)
|
|
163 |
log.exception("Unexpected error, msg: %s", msg)
|
|
164 | 164 |
raise |
165 | 165 |
|
166 | 166 |
|
167 | 167 |
def trigger_status_update(message): |
168 | 168 |
"""Triggers a status update job for a specific VM id""" |
169 |
_logger.debug("Request to trigger status update: %s", message.body)
|
|
169 |
log.debug("Request to trigger status update: %s", message.body)
|
|
170 | 170 |
msg = None |
171 | 171 |
try: |
172 | 172 |
msg = json.loads(message.body) |
173 | 173 |
|
174 | 174 |
if msg["type"] != "reconcile": |
175 |
_logger.error("Message is of unknown type %s", msg["type"])
|
|
175 |
log.error("Message is of unknown type %s", msg["type"])
|
|
176 | 176 |
return |
177 | 177 |
|
178 | 178 |
if msg["vmid"] == "": |
179 |
_logger.error("Reconciliation message does not specify a VM id")
|
|
179 |
log.error("Reconciliation message does not specify a VM id")
|
|
180 | 180 |
return |
181 | 181 |
|
182 | 182 |
vm = VirtualMachine.objects.get(id=msg["vmid"]) |
... | ... | |
184 | 184 |
|
185 | 185 |
message.channel.basic_ack(message.delivery_tag) |
186 | 186 |
except KeyError as k: |
187 |
_logger.error("Malformed incoming JSON, missing attributes: %s", k)
|
|
187 |
log.error("Malformed incoming JSON, missing attributes: %s", k)
|
|
188 | 188 |
except Exception as e: |
189 |
_logger.exception("Unexpected error, msg: %s", msg)
|
|
189 |
log.exception("Unexpected error, msg: %s", msg)
|
|
190 | 190 |
|
191 | 191 |
|
192 | 192 |
def status_job_finished(message): |
... | ... | |
196 | 196 |
msg = json.loads(message.body) |
197 | 197 |
|
198 | 198 |
if msg["operation"] != 'OP_INSTANCE_QUERY_DATA': |
199 |
_logger.error("Message is of unknown type %s", msg["operation"])
|
|
199 |
log.error("Message is of unknown type %s", msg["operation"])
|
|
200 | 200 |
return |
201 | 201 |
|
202 | 202 |
if msg["status"] != "success": |
203 |
_logger.warn("Ignoring non-success status update from job %d on VM %s",
|
|
203 |
log.warn("Ignoring non-success status update from job %d on VM %s",
|
|
204 | 204 |
msg['jobId'], msg['instance']) |
205 | 205 |
message.channel.basic_ack(message.delivery_tag) |
206 | 206 |
return |
207 | 207 |
|
208 | 208 |
status = backend.get_job_status(msg['jobId']) |
209 | 209 |
|
210 |
_logger.debug("Node status job result: %s" % status)
|
|
210 |
log.debug("Node status job result: %s", status)
|
|
211 | 211 |
|
212 | 212 |
if status['summary'][0] != u'INSTANCE_QUERY_DATA': |
213 |
_logger.error("Status update is of unknown type %s", status['summary']) |
|
213 |
log.error("Status update is of unknown type %s", |
|
214 |
status['summary']) |
|
214 | 215 |
return |
215 | 216 |
|
216 | 217 |
conf_state = status['opresult'][0][msg['instance']]['config_state'] |
... | ... | |
232 | 233 |
|
233 | 234 |
message.channel.basic_ack(message.delivery_tag) |
234 | 235 |
except KeyError as k: |
235 |
_logger.error("Malformed incoming JSON, missing attributes: %s", k)
|
|
236 |
log.error("Malformed incoming JSON, missing attributes: %s", k)
|
|
236 | 237 |
except Exception as e: |
237 |
_logger.exception("Unexpected error, msg: %s", msg)
|
|
238 |
log.exception("Unexpected error, msg: %s", msg)
|
|
238 | 239 |
|
239 | 240 |
|
240 | 241 |
def dummy_proc(message): |
241 | 242 |
try: |
242 |
_logger.debug("Msg: %s", message.body)
|
|
243 |
log.debug("Msg: %s", message.body)
|
|
243 | 244 |
message.channel.basic_ack(message.delivery_tag) |
244 | 245 |
except Exception as e: |
245 |
_logger.exception("Could not receive message")
|
|
246 |
log.exception("Could not receive message")
|
|
246 | 247 |
pass |
Also available in: Unified diff