finally:
self.release()
- def _WriteAndReplicateFileUnlocked(self, file_name, data):
- """Writes a file locally and then replicates it to all nodes.
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def AddNode(self, node_name):
+ assert node_name != self._my_hostname
- """
- utils.WriteFile(file_name, data=data)
+ # TODO: Clean queue directory on added node
- nodes = self._nodes[:]
+ # Upload the whole queue excluding archived jobs
+ files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
- # Remove master node
+ # Upload current serial file
+ files.append(constants.JOB_QUEUE_SERIAL_FILE)
+
+ for file_name in files:
+ result = rpc.call_upload_file([node_name], file_name)
+ if not result[node_name]:
+ logging.error("Failed to upload %s to %s", file_name, node_name)
+
+ self._nodes.add(node_name)
+
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def RemoveNode(self, node_name):
try:
- nodes.remove(self._my_hostname)
- except ValueError:
+ # The queue is removed by the "leave node" RPC call.
+ self._nodes.remove(node_name)
+ except KeyError:
pass
def _WriteAndReplicateFileUnlocked(self, file_name, data):