Revision 23752136
b/lib/jqueue.py | ||
---|---|---|
290 | 290 |
assert self._last_serial is not None, ("Serial file was modified between" |
291 | 291 |
" check in jstore and here") |
292 | 292 |
|
293 |
# Get initial list of nodes |
|
294 |
self._nodes = self.context.cfg.GetNodeList() |
|
295 |
|
|
296 |
# TODO: Check consistency across nodes |
|
297 |
|
|
293 | 298 |
# Setup worker pool |
294 | 299 |
self._wpool = _JobQueueWorkerPool(self) |
295 | 300 |
|
... | ... | |
314 | 319 |
finally: |
315 | 320 |
self.release() |
316 | 321 |
|
322 |
def _WriteAndReplicateFileUnlocked(self, file_name, data): |
|
323 |
"""Writes a file locally and then replicates it to all nodes. |
|
324 |
|
|
325 |
""" |
|
326 |
utils.WriteFile(file_name, data=data) |
|
327 |
|
|
328 |
nodes = self._nodes[:] |
|
329 |
|
|
330 |
# Remove master node |
|
331 |
try: |
|
332 |
nodes.remove(self._my_hostname) |
|
333 |
except ValueError: |
|
334 |
pass |
|
335 |
|
|
336 |
failed_nodes = 0 |
|
337 |
result = rpc.call_upload_file(nodes, file_name) |
|
338 |
for node in nodes: |
|
339 |
if not result[node]: |
|
340 |
failed_nodes += 1 |
|
341 |
logging.error("Copy of job queue file to node %s failed", node) |
|
342 |
|
|
343 |
# TODO: check failed_nodes |
|
344 |
|
|
317 | 345 |
def _FormatJobID(self, job_id): |
318 | 346 |
if not isinstance(job_id, (int, long)): |
319 | 347 |
raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id) |
... | ... | |
334 | 362 |
serial = self._last_serial + 1 |
335 | 363 |
|
336 | 364 |
# Write to file |
337 |
utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
|
|
338 |
data="%s\n" % serial)
|
|
365 |
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
|
|
366 |
"%s\n" % serial)
|
|
339 | 367 |
|
340 | 368 |
# Keep it only if we were able to write the file |
341 | 369 |
self._last_serial = serial |
342 | 370 |
|
343 |
# Distribute the serial to the other nodes |
|
344 |
try: |
|
345 |
nodes.remove(self._my_hostname) |
|
346 |
except ValueError: |
|
347 |
pass |
|
348 |
|
|
349 |
result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE) |
|
350 |
for node in nodes: |
|
351 |
if not result[node]: |
|
352 |
logging.error("copy of job queue file to node %s failed", node) |
|
353 |
|
|
354 | 371 |
return self._FormatJobID(serial) |
355 | 372 |
|
356 | 373 |
@staticmethod |
... | ... | |
450 | 467 |
@_RequireOpenQueue |
451 | 468 |
def UpdateJobUnlocked(self, job): |
452 | 469 |
filename = self._GetJobPath(job.id) |
470 |
data = serializer.DumpJson(job.Serialize(), indent=False) |
|
453 | 471 |
logging.debug("Writing job %s to %s", job.id, filename) |
454 |
utils.WriteFile(filename, |
|
455 |
data=serializer.DumpJson(job.Serialize(), indent=False)) |
|
472 |
self._WriteAndReplicateFileUnlocked(filename, data) |
|
456 | 473 |
self._CleanCacheUnlocked([job.id]) |
457 | 474 |
|
458 | 475 |
def _CleanCacheUnlocked(self, exclude): |
Also available in: Unified diff