Revision d2e03a33 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
325 | 325 |
finally: |
326 | 326 |
self.release() |
327 | 327 |
|
328 |
def _WriteAndReplicateFileUnlocked(self, file_name, data): |
|
329 |
"""Writes a file locally and then replicates it to all nodes. |
|
328 |
@utils.LockedMethod |
|
329 |
@_RequireOpenQueue |
|
330 |
def AddNode(self, node_name): |
|
331 |
assert node_name != self._my_hostname |
|
330 | 332 |
|
331 |
""" |
|
332 |
utils.WriteFile(file_name, data=data) |
|
333 |
# TODO: Clean queue directory on added node |
|
333 | 334 |
|
334 |
nodes = self._nodes[:] |
|
335 |
# Upload the whole queue excluding archived jobs |
|
336 |
files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] |
|
335 | 337 |
|
336 |
# Remove master node |
|
338 |
# Upload current serial file |
|
339 |
files.append(constants.JOB_QUEUE_SERIAL_FILE) |
|
340 |
|
|
341 |
for file_name in files: |
|
342 |
result = rpc.call_upload_file([node_name], file_name) |
|
343 |
if not result[node_name]: |
|
344 |
logging.error("Failed to upload %s to %s", file_name, node_name) |
|
345 |
|
|
346 |
self._nodes.add(node_name) |
|
347 |
|
|
348 |
@utils.LockedMethod |
|
349 |
@_RequireOpenQueue |
|
350 |
def RemoveNode(self, node_name): |
|
337 | 351 |
try: |
338 |
nodes.remove(self._my_hostname) |
|
339 |
except ValueError: |
|
352 |
# The queue is removed by the "leave node" RPC call. |
|
353 |
self._nodes.remove(node_name) |
|
354 |
except KeyError: |
|
340 | 355 |
pass |
341 | 356 |
|
342 | 357 |
def _WriteAndReplicateFileUnlocked(self, file_name, data): |
Also available in: Unified diff