+ logging.info("Job queue inspection finished")
+ finally:
+ self.release()
+ except:
+ self._wpool.TerminateWorkers()
+ raise
+
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def AddNode(self, node):
+ """Register a new node with the queue.
+
+ @type node: L{objects.Node}
+ @param node: the node object to be added
+
+ """
+ node_name = node.name
+ assert node_name != self._my_hostname
+
+ # Clean queue directory on added node
+ rpc.RpcRunner.call_jobqueue_purge(node_name)
+
+ if not node.master_candidate:
+ # remove if existing, ignoring errors
+ self._nodes.pop(node_name, None)
+ # and skip the replication of the job ids
+ return
+
+ # Upload the whole queue excluding archived jobs
+ files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
+
+ # Upload current serial file
+ files.append(constants.JOB_QUEUE_SERIAL_FILE)
+
+ for file_name in files:
+ # Read file content
+ fd = open(file_name, "r")
+ try:
+ content = fd.read()
+ finally:
+ fd.close()
+
+ result = rpc.RpcRunner.call_jobqueue_update([node_name],
+ [node.primary_ip],
+ file_name, content)
+ if not result[node_name]:
+ logging.error("Failed to upload %s to %s", file_name, node_name)
+
+ self._nodes[node_name] = node.primary_ip
+
+ @utils.LockedMethod
+ @_RequireOpenQueue
+ def RemoveNode(self, node_name):
+ """Callback called when removing nodes from the cluster.
+
+ @type node_name: str
+ @param node_name: the name of the node to remove
+
+ """