Revision 39ccdb18
b/snf-cyclades-gtools/synnefo/ganeti/eventd.py | ||
---|---|---|
73 | 73 |
status = op.status |
74 | 74 |
if status == constants.JOB_STATUS_QUEUED: |
75 | 75 |
return job.received_timestamp |
76 |
if status == constants.JOB_STATUS_WAITLOCK: |
|
77 |
#if status == constants.JOB_STATUS_WAITING: |
|
78 |
return op.start_timestamp |
|
76 |
try: # Compatibility with Ganeti version |
|
77 |
if status == constants.JOB_STATUS_WAITLOCK: |
|
78 |
return op.start_timestamp |
|
79 |
except AttributeError: |
|
80 |
if status == constants.JOB_STATUS_WAITING: |
|
81 |
return op.start_timestamp |
|
79 | 82 |
if status == constants.JOB_STATUS_CANCELING: |
80 | 83 |
return op.start_timestamp |
81 | 84 |
if status == constants.JOB_STATUS_RUNNING: |
... | ... | |
96 | 99 |
return repr("Invalid backend status: %s in job %s" |
97 | 100 |
% (self.status, self.job)) |
98 | 101 |
|
102 |
def prefix_from_name(name): |
|
103 |
return name.split('-')[0] |
|
104 |
|
|
105 |
|
|
106 |
def get_field(from_, field): |
|
107 |
try: |
|
108 |
return getattr(from_, field) |
|
109 |
except AttributeError: |
|
110 |
None |
|
111 |
|
|
99 | 112 |
|
100 | 113 |
class JobFileHandler(pyinotify.ProcessEvent): |
101 | 114 |
def __init__(self, logger): |
... | ... | |
106 | 119 |
self.client.connect() |
107 | 120 |
handler_logger.info("Connected succesfully") |
108 | 121 |
|
122 |
self.op_handlers = {"INSTANCE": self.process_instance_op, |
|
123 |
"NETWORK": self.process_network_op} |
|
124 |
# "GROUP": self.process_group_op} |
|
125 |
|
|
109 | 126 |
def process_IN_CLOSE_WRITE(self, event): |
110 | 127 |
self.process_IN_MOVED_TO(event) |
111 | 128 |
|
... | ... | |
121 | 138 |
return |
122 | 139 |
|
123 | 140 |
data = serializer.LoadJson(data) |
124 |
try: # Version compatibility issue with Ganeti
|
|
141 |
try: # Compatibility with Ganeti version
|
|
125 | 142 |
job = jqueue._QueuedJob.Restore(None, data, False) |
126 | 143 |
except TypeError: |
127 | 144 |
job = jqueue._QueuedJob.Restore(None, data) |
128 | 145 |
|
146 |
job_id = int(job.id) |
|
129 | 147 |
|
130 | 148 |
for op in job.ops: |
131 |
instances = None |
|
132 |
try: |
|
133 |
instances = " ".join(op.input.instances) |
|
134 |
except AttributeError: |
|
135 |
pass |
|
149 |
op_id = op.input.OP_ID |
|
136 | 150 |
|
151 |
msg = None |
|
137 | 152 |
try: |
138 |
instances = op.input.instance_name |
|
139 |
except AttributeError: |
|
153 |
handler_fn = self.op_handlers[op_id.split('_')[1]] |
|
154 |
msg, routekey = handler_fn(op, job_id) |
|
155 |
except KeyError: |
|
140 | 156 |
pass |
141 | 157 |
|
142 |
if not instances or len(instances.split(" ")) != 1: |
|
143 |
# Do not publish messages for jobs with no or multiple |
|
144 |
# instances. |
|
145 |
# Currently snf-dispatcher can not normally handle these messages |
|
146 |
self.logger.debug("Ignoring Job: %d: %s(%s)", int(job.id), |
|
147 |
op.input.OP_ID, instances) |
|
158 |
if not msg: |
|
159 |
self.logger.debug("Ignoring job: %s: %s", job_id, op_id) |
|
148 | 160 |
continue |
149 | 161 |
|
162 |
# Generate a unique message identifier |
|
163 |
event_time = get_time_from_status(op, job) |
|
164 |
|
|
150 | 165 |
# Get the last line of the op log as message |
151 | 166 |
try: |
152 | 167 |
logmsg = op.log[-1][-1] |
153 | 168 |
except IndexError: |
154 | 169 |
logmsg = None |
155 | 170 |
|
156 |
# Generate a unique message identifier |
|
157 |
event_time = get_time_from_status(op, job) |
|
171 |
# Add shared attributes for all operations |
|
172 |
msg.update({"event_time": event_time, |
|
173 |
"operation": op_id, |
|
174 |
"status": op.status, |
|
175 |
"logmsg": logmsg, |
|
176 |
"jobId": job_id}) |
|
158 | 177 |
|
159 |
self.logger.debug("Job: %d: %s(%s) %s %s", |
|
160 |
int(job.id), op.input.OP_ID, instances, op.status, logmsg) |
|
161 |
|
|
162 |
# Construct message |
|
163 |
msg = { |
|
164 |
"event_time" : event_time, |
|
165 |
"type": "ganeti-op-status", |
|
166 |
"instance": instances, |
|
167 |
"operation": op.input.OP_ID, |
|
168 |
"jobId": int(job.id), |
|
169 |
"status": op.status, |
|
170 |
"logmsg": logmsg |
|
171 |
} |
|
172 |
|
|
173 |
instance_prefix = instances.split('-')[0] |
|
174 |
routekey = "ganeti.%s.event.op" % instance_prefix |
|
175 |
|
|
176 |
self.logger.debug("Delivering msg: %s (key=%s)", json.dumps(msg), |
|
177 |
routekey) |
|
178 | 178 |
msg = json.dumps(msg) |
179 |
self.logger.debug("Delivering msg: %s (key=%s)", msg, routekey) |
|
179 | 180 |
|
180 | 181 |
# Send the message to RabbitMQ |
181 | 182 |
self.client.basic_publish(exchange=settings.EXCHANGE_GANETI, |
182 | 183 |
routing_key=routekey, |
183 | 184 |
body=msg) |
184 | 185 |
|
186 |
|
|
187 |
def process_instance_op(self, op, job_id): |
|
188 |
""" Process OP_INSTANCE_* opcodes. |
|
189 |
|
|
190 |
""" |
|
191 |
input = op.input |
|
192 |
op_id = input.OP_ID |
|
193 |
|
|
194 |
instances = None |
|
195 |
instances = get_field(input, 'instance_name') |
|
196 |
if not instances: |
|
197 |
instances = get_field(input, 'instances') |
|
198 |
if not instances or len(instances) > 1: |
|
199 |
# Do not publish messages for jobs with no or multiple |
|
200 |
# instances. |
|
201 |
# Currently snf-dispatcher can not normally handle these messages |
|
202 |
return None, None |
|
203 |
else: |
|
204 |
instances = instances[0] |
|
205 |
|
|
206 |
self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id, |
|
207 |
instances, op.status) |
|
208 |
|
|
209 |
msg = {"type": "ganeti-op-status", |
|
210 |
"instance": instances, |
|
211 |
"operation": op_id} |
|
212 |
|
|
213 |
routekey = "ganeti.%s.event.op" % prefix_from_name(instances) |
|
214 |
|
|
215 |
return msg, routekey |
|
216 |
|
|
217 |
def process_network_op(self, op, job_id): |
|
218 |
""" Process OP_NETWORK_* opcodes. |
|
219 |
|
|
220 |
""" |
|
221 |
|
|
222 |
input = op.input |
|
223 |
op_id = input.OP_ID |
|
224 |
network_name = get_field(input, 'network_name') |
|
225 |
|
|
226 |
if not network_name: |
|
227 |
return None, None |
|
228 |
|
|
229 |
self.logger.debug("Job: %d: %s(%s) %s", job_id, op_id, |
|
230 |
network_name, op.status) |
|
231 |
|
|
232 |
msg = {'operation': op_id, |
|
233 |
'type': "ganeti-network-status", |
|
234 |
'network': network_name, |
|
235 |
'subnet': get_field(input, 'network'), |
|
236 |
# 'network_mode': get_field(input, 'network_mode'), |
|
237 |
# 'network_link': get_field(input, 'network_link'), |
|
238 |
'gateway': get_field(input, 'gateway'), |
|
239 |
# 'reserved_ips': get_field(input, 'reserved_ips'), |
|
240 |
'group_name': get_field(input, 'group_name')} |
|
241 |
|
|
242 |
routekey = "ganeti.%s.event.network" % prefix_from_name(network_name) |
|
243 |
|
|
244 |
return msg, routekey |
|
245 |
|
|
246 |
|
|
247 |
# def process_group_op(self, op, job_id): |
|
248 |
# """ Process OP_GROUP_* opcodes. |
|
249 |
|
|
250 |
# """ |
|
251 |
# return None, None |
|
252 |
|
|
253 |
|
|
254 |
|
|
255 |
|
|
256 |
|
|
185 | 257 |
handler_logger = None |
186 | 258 |
def fatal_signal_handler(signum, frame): |
187 | 259 |
global handler_logger |
Also available in: Unified diff