jqueue: Implement {Add,Remove}Node
authorMichael Hanselmann <hansmi@google.com>
Wed, 6 Aug 2008 13:36:23 +0000 (13:36 +0000)
committerMichael Hanselmann <hansmi@google.com>
Wed, 6 Aug 2008 13:36:23 +0000 (13:36 +0000)
These functions will be used to notify the queue about newly added
or removed nodes.

Reviewed-by: iustinp

lib/jqueue.py

index dfaf615..a969478 100644 (file)
@@ -325,18 +325,33 @@ class JobQueue(object):
     finally:
       self.release()
 
-  def _WriteAndReplicateFileUnlocked(self, file_name, data):
-    """Writes a file locally and then replicates it to all nodes.
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def AddNode(self, node_name):
+    assert node_name != self._my_hostname
 
-    """
-    utils.WriteFile(file_name, data=data)
+    # TODO: Clean queue directory on added node
 
-    nodes = self._nodes[:]
+    # Upload the whole queue excluding archived jobs
+    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
 
-    # Remove master node
+    # Upload current serial file
+    files.append(constants.JOB_QUEUE_SERIAL_FILE)
+
+    for file_name in files:
+      result = rpc.call_upload_file([node_name], file_name)
+      if not result[node_name]:
+        logging.error("Failed to upload %s to %s", file_name, node_name)
+
+    self._nodes.add(node_name)
+
+  @utils.LockedMethod
+  @_RequireOpenQueue
+  def RemoveNode(self, node_name):
     try:
-      nodes.remove(self._my_hostname)
-    except ValueError:
+      # The queue is removed by the "leave node" RPC call.
+      self._nodes.remove(node_name)
+    except KeyError:
       pass
 
   def _WriteAndReplicateFileUnlocked(self, file_name, data):