Revision e74798c1
b/lib/jqueue.py | ||
---|---|---|
401 | 401 |
except KeyError: |
402 | 402 |
pass |
403 | 403 |
|
404 |
def _CheckRpcResult(self, result, nodes, failmsg): |
|
405 |
failed = [] |
|
406 |
success = [] |
|
407 |
|
|
408 |
for node in nodes: |
|
409 |
if result[node]: |
|
410 |
success.append(node) |
|
411 |
else: |
|
412 |
failed.append(node) |
|
413 |
|
|
414 |
if failed: |
|
415 |
logging.error("%s failed on %s", failmsg, ", ".join(failed)) |
|
416 |
|
|
417 |
# +1 for the master node |
|
418 |
if (len(success) + 1) < len(failed): |
|
419 |
# TODO: Handle failing nodes |
|
420 |
logging.error("More than half of the nodes failed") |
|
421 |
|
|
404 | 422 |
def _WriteAndReplicateFileUnlocked(self, file_name, data): |
405 | 423 |
"""Writes a file locally and then replicates it to all nodes. |
406 | 424 |
|
407 | 425 |
""" |
408 | 426 |
utils.WriteFile(file_name, data=data) |
409 | 427 |
|
410 |
failed_nodes = 0 |
|
411 | 428 |
result = rpc.call_jobqueue_update(self._nodes, file_name, data) |
412 |
for node in self._nodes: |
|
413 |
if not result[node]: |
|
414 |
failed_nodes += 1 |
|
415 |
logging.error("Copy of job queue file to node %s failed", node) |
|
416 |
|
|
417 |
# TODO: check failed_nodes |
|
429 |
self._CheckRpcResult(result, self._nodes, |
|
430 |
"Updating %s" % file_name) |
|
418 | 431 |
|
419 | 432 |
def _RenameFileUnlocked(self, old, new): |
420 | 433 |
os.rename(old, new) |
421 | 434 |
|
422 | 435 |
result = rpc.call_jobqueue_rename(self._nodes, old, new) |
423 |
for node in self._nodes: |
|
424 |
if not result[node]: |
|
425 |
logging.error("Moving %s to %s failed on %s", old, new, node) |
|
426 |
|
|
427 |
# TODO: check failed nodes |
|
436 |
self._CheckRpcResult(result, self._nodes, |
|
437 |
"Moving %s to %s" % (old, new)) |
|
428 | 438 |
|
429 | 439 |
def _FormatJobID(self, job_id): |
430 | 440 |
if not isinstance(job_id, (int, long)): |
Also available in: Unified diff