Distribute the queue serial file after each update
authorIustin Pop <iustin@google.com>
Wed, 23 Jul 2008 10:06:19 +0000 (10:06 +0000)
committerIustin Pop <iustin@google.com>
Wed, 23 Jul 2008 10:06:19 +0000 (10:06 +0000)
This patch adds distribution of the queue serial file after each write
to it (but before a new job is created and written with that ID, and
before a response is returned, so we should be safe from crashes in
between).

Currently it only logs if a node cannot be contacted, it should abort if
> 50% errors are seen.

Reviewed-by: imsnah

daemons/ganeti-masterd
lib/backend.py
lib/jqueue.py

index 0f7f67c..fd1f023 100755 (executable)
@@ -208,7 +208,11 @@ class ClientOps:
 
     if method == luxi.REQ_SUBMIT_JOB:
       ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
-      return queue.SubmitJob(ops)
+      # we need to compute the node list here, since from now on all
+      # operations require locks on the queue or the storage, and we
+      # shouldn't get another lock
+      node_list = self.server.context.cfg.GetNodeList()
+      return queue.SubmitJob(ops, node_list)
 
     elif method == luxi.REQ_CANCEL_JOB:
       (job_id, ) = args
index 1054108..ca9ac7c 100644 (file)
@@ -986,6 +986,7 @@ def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
     constants.ETC_HOSTS,
     constants.SSH_KNOWN_HOSTS_FILE,
     constants.VNC_PASSWORD_FILE,
+    constants.JOB_QUEUE_SERIAL_FILE,
     ]
   allowed_files.extend(ssconf.SimpleStore().GetFileList())
   if file_name not in allowed_files:
index 01ba10e..c7cd7f2 100644 (file)
@@ -35,6 +35,7 @@ from ganeti import opcodes
 from ganeti import errors
 from ganeti import mcpu
 from ganeti import utils
+from ganeti import rpc
 
 
 JOBQUEUE_THREADS = 5
@@ -269,6 +270,7 @@ class JobStorage(object):
   def __init__(self):
     self._lock = threading.Lock()
     self._memcache = {}
+    self._my_hostname = utils.HostInfo().name
 
     # Make sure our directory exists
     try:
@@ -350,7 +352,7 @@ class JobStorage(object):
       utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
                       data="%s\n" % 0)
 
-  def _NewSerialUnlocked(self):
+  def _NewSerialUnlocked(self, nodes):
     """Generates a new job identifier.
 
     Job identifiers are unique during the lifetime of a cluster.
@@ -370,6 +372,17 @@ class JobStorage(object):
     # Keep it only if we were able to write the file
     self._last_serial = serial
 
+    # Distribute the serial to the other nodes
+    try:
+      nodes.remove(self._my_hostname)
+    except ValueError:
+      pass
+
+    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
+    for node in nodes:
+      if not result[node]:
+        logging.error("copy of job queue file to node %s failed", node)
+
     return serial
 
   def _GetJobPath(self, job_id):
@@ -434,11 +447,20 @@ class JobStorage(object):
     return self._GetJobsUnlocked(job_ids)
 
   @utils.LockedMethod
-  def AddJob(self, ops):
+  def AddJob(self, ops, nodes):
+    """Create and store on disk a new job.
+
+    @type ops: list
+    @param ops: The list of OpCodes that will becom the new job.
+    @type nodes: list
+    @param nodes: The list of nodes to which the new job serial will be
+                  distributed.
+
+    """
     assert self.lock_fd, "Queue should be open"
 
     # Get job identifier
-    job_id = self._NewSerialUnlocked()
+    job_id = self._NewSerialUnlocked(nodes)
     job = _QueuedJob(self, job_id, ops)
 
     # Write to disk
@@ -504,17 +526,20 @@ class JobQueue:
         job.SetUnclean("Unclean master daemon shutdown")
 
   @utils.LockedMethod
-  def SubmitJob(self, ops):
+  def SubmitJob(self, ops, nodes):
     """Add a new job to the queue.
 
     This enters the job into our job queue and also puts it on the new
     queue, in order for it to be picked up by the queue processors.
 
-    Args:
-    - ops: Sequence of opcodes
+    @type ops: list
+    @param ops: the sequence of opcodes that will become the new job
+    @type nodes: list
+    @param nodes: the list of nodes to which the queue should be
+                  distributed
 
     """
-    job = self._jobs.AddJob(ops)
+    job = self._jobs.AddJob(ops, nodes)
 
     # Add to worker pool
     self._wpool.AddTask(job)