From d2e03a334659764781902d55fdd6428eb1747148 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Wed, 6 Aug 2008 13:36:23 +0000 Subject: [PATCH] jqueue: Implement {Add,Remove}Node These functions will be used to notify the queue about newly added or removed nodes. Reviewed-by: iustinp --- lib/jqueue.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index dfaf615..a969478 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -325,18 +325,33 @@ class JobQueue(object): finally: self.release() - def _WriteAndReplicateFileUnlocked(self, file_name, data): - """Writes a file locally and then replicates it to all nodes. + @utils.LockedMethod + @_RequireOpenQueue + def AddNode(self, node_name): + assert node_name != self._my_hostname - """ - utils.WriteFile(file_name, data=data) + # TODO: Clean queue directory on added node - nodes = self._nodes[:] + # Upload the whole queue excluding archived jobs + files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()] - # Remove master node + # Upload current serial file + files.append(constants.JOB_QUEUE_SERIAL_FILE) + + for file_name in files: + result = rpc.call_upload_file([node_name], file_name) + if not result[node_name]: + logging.error("Failed to upload %s to %s", file_name, node_name) + + self._nodes.add(node_name) + + @utils.LockedMethod + @_RequireOpenQueue + def RemoveNode(self, node_name): try: - nodes.remove(self._my_hostname) - except ValueError: + # The queue is removed by the "leave node" RPC call. + self._nodes.remove(node_name) + except KeyError: pass def _WriteAndReplicateFileUnlocked(self, file_name, data): -- 1.7.10.4