Revision 27a8e4ae
b/snf-cyclades-gtools/synnefo/ganeti/eventd.py | ||
---|---|---|
63 | 63 |
from synnefo import settings |
64 | 64 |
from synnefo.lib.amqp import AMQPClient |
65 | 65 |
|
66 |
|
|
66 | 67 |
def get_time_from_status(op, job): |
67 | 68 |
"""Generate a unique message identifier for a ganeti job. |
68 | 69 |
|
... | ... | |
74 | 75 |
status = op.status |
75 | 76 |
if status == constants.JOB_STATUS_QUEUED: |
76 | 77 |
return job.received_timestamp |
77 |
try: # Compatibility with Ganeti version |
|
78 |
try: # Compatibility with Ganeti version
|
|
78 | 79 |
if status == constants.JOB_STATUS_WAITLOCK: |
79 | 80 |
return op.start_timestamp |
80 | 81 |
except AttributeError: |
... | ... | |
88 | 89 |
# success, canceled, error |
89 | 90 |
return op.end_timestamp |
90 | 91 |
|
91 |
raise InvalidBackendState(status, job)
|
|
92 |
raise InvalidBackendStatus(status, job)
|
|
92 | 93 |
|
93 | 94 |
|
94 | 95 |
class InvalidBackendStatus(Exception): |
... | ... | |
100 | 101 |
return repr("Invalid backend status: %s in job %s" |
101 | 102 |
% (self.status, self.job)) |
102 | 103 |
|
104 |
|
|
103 | 105 |
def prefix_from_name(name): |
104 | 106 |
return name.split('-')[0] |
105 | 107 |
|
... | ... | |
112 | 114 |
|
113 | 115 |
|
114 | 116 |
class JobFileHandler(pyinotify.ProcessEvent): |
115 |
def __init__(self, logger): |
|
117 |
def __init__(self, logger, cluster_name):
|
|
116 | 118 |
pyinotify.ProcessEvent.__init__(self) |
117 | 119 |
self.logger = logger |
120 |
self.cluster_name = cluster_name |
|
121 |
|
|
118 | 122 |
self.client = AMQPClient(confirm_buffer=25) |
119 | 123 |
handler_logger.info("Attempting to connect to RabbitMQ hosts") |
124 |
|
|
120 | 125 |
self.client.connect() |
121 |
self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic') |
|
122 | 126 |
handler_logger.info("Connected succesfully") |
123 | 127 |
|
128 |
self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic') |
|
129 |
|
|
124 | 130 |
self.op_handlers = {"INSTANCE": self.process_instance_op, |
125 | 131 |
"NETWORK": self.process_network_op} |
126 | 132 |
# "GROUP": self.process_group_op} |
... | ... | |
140 | 146 |
return |
141 | 147 |
|
142 | 148 |
data = serializer.LoadJson(data) |
143 |
try: # Compatibility with Ganeti version |
|
149 |
try: # Compatibility with Ganeti version
|
|
144 | 150 |
job = jqueue._QueuedJob.Restore(None, data, False) |
145 | 151 |
except TypeError: |
146 | 152 |
job = jqueue._QueuedJob.Restore(None, data) |
... | ... | |
174 | 180 |
msg.update({"event_time": event_time, |
175 | 181 |
"operation": op_id, |
176 | 182 |
"status": op.status, |
183 |
"cluster": self.cluster_name, |
|
177 | 184 |
"logmsg": logmsg, |
178 | 185 |
"jobId": job_id}) |
179 | 186 |
|
... | ... | |
254 | 261 |
|
255 | 262 |
|
256 | 263 |
|
264 |
def find_cluster_name(): |
|
265 |
path = constants.DATA_DIR + "/ssconf_" + constants.SS_CLUSTER_NAME |
|
266 |
f = open(path) |
|
267 |
name = f.readline().rstrip() |
|
268 |
f.close() |
|
269 |
return name |
|
257 | 270 |
|
258 | 271 |
handler_logger = None |
259 | 272 |
def fatal_signal_handler(signum, frame): |
... | ... | |
335 | 348 |
wm = pyinotify.WatchManager() |
336 | 349 |
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_MOVED_TO"] | \ |
337 | 350 |
pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"] |
338 |
handler = JobFileHandler(logger) |
|
351 |
cluster_name = find_cluster_name() |
|
352 |
handler = JobFileHandler(logger, cluster_name) |
|
339 | 353 |
notifier = pyinotify.Notifier(wm, handler) |
340 | 354 |
|
341 | 355 |
try: |
... | ... | |
344 | 358 |
if res[constants.QUEUE_DIR] < 0: |
345 | 359 |
raise Exception("pyinotify add_watch returned negative descriptor") |
346 | 360 |
|
347 |
logger.info("Now watching %s" % constants.QUEUE_DIR) |
|
361 |
logger.info("Now watching %s of %s" % (constants.QUEUE_DIR, |
|
362 |
cluster_name)) |
|
348 | 363 |
|
349 | 364 |
while True: # loop forever |
350 | 365 |
# process the queue of events as explained above |
Also available in: Unified diff