Include hvparams in ssconf files
[ganeti-local] / lib / jqueue.py
index 069884b..110d386 100644 (file)
@@ -35,6 +35,7 @@ import time
 import weakref
 import threading
 import itertools
+import operator
 
 try:
   # pylint: disable=E0611
@@ -69,6 +70,9 @@ JOBQUEUE_THREADS = 25
 _LOCK = "_lock"
 _QUEUE = "_queue"
 
+#: Retrieves "id" attribute
+_GetIdAttr = operator.attrgetter("id")
+
 
 class CancelJob(Exception):
   """Special exception to cancel a job.
@@ -76,6 +80,12 @@ class CancelJob(Exception):
   """
 
 
+class QueueShutdown(Exception):
+  """Special exception to abort a job when the job queue is shutting down.
+
+  """
+
+
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -210,7 +220,23 @@ class _QueuedJob(object):
   # pylint: disable=W0212
   __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "__weakref__", "processor_lock", "writable"]
+               "__weakref__", "processor_lock", "writable", "archived"]
+
+  def _AddReasons(self):
+    """Extend the reason trail
+
+    Add the reason for all the opcodes of this job to be executed.
+
+    """
+    count = 0
+    for queued_op in self.ops:
+      op = queued_op.input
+      reason_src = opcodes.NameToReasonSrc(op.__class__.__name__)
+      reason_text = "job=%d;index=%d" % (self.id, count)
+      reason = getattr(op, "reason", [])
+      reason.append((reason_src, reason_text, utils.EpochNano()))
+      op.reason = reason
+      count = count + 1
 
   def __init__(self, queue, job_id, ops, writable):
     """Constructor for the _QueuedJob.
@@ -232,13 +258,17 @@ class _QueuedJob(object):
     self.queue = queue
     self.id = int(job_id)
     self.ops = [_QueuedOpCode(op) for op in ops]
+    self._AddReasons()
     self.log_serial = 0
     self.received_timestamp = TimeStampNow()
     self.start_timestamp = None
     self.end_timestamp = None
+    self.archived = False
 
     self._InitInMemory(self, writable)
 
+    assert not self.archived, "New jobs can not be marked as archived"
+
   @staticmethod
   def _InitInMemory(obj, writable):
     """Initializes in-memory variables.
@@ -262,7 +292,7 @@ class _QueuedJob(object):
     return "<%s at %#x>" % (" ".join(status), id(self))
 
   @classmethod
-  def Restore(cls, queue, state, writable):
+  def Restore(cls, queue, state, writable, archived):
     """Restore a _QueuedJob from serialized state:
 
     @type queue: L{JobQueue}
@@ -271,6 +301,8 @@ class _QueuedJob(object):
     @param state: the serialized state
     @type writable: bool
     @param writable: Whether job can be modified
+    @type archived: bool
+    @param archived: Whether job was already archived
     @rtype: _JobQueue
     @return: the restored _JobQueue instance
 
@@ -281,6 +313,7 @@ class _QueuedJob(object):
     obj.received_timestamp = state.get("received_timestamp", None)
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
+    obj.archived = archived
 
     obj.ops = []
     obj.log_serial = 0
@@ -466,6 +499,50 @@ class _QueuedJob(object):
       logging.debug("Job %s is no longer waiting in the queue", self.id)
       return (False, "Job %s is no longer waiting in the queue" % self.id)
 
+  def ChangePriority(self, priority):
+    """Changes the job priority.
+
+    @type priority: int
+    @param priority: New priority
+    @rtype: tuple; (bool, string)
+    @return: Boolean describing whether job's priority was successfully changed
+      and a text message
+
+    """
+    status = self.CalcStatus()
+
+    if status in constants.JOBS_FINALIZED:
+      return (False, "Job %s is finished" % self.id)
+    elif status == constants.JOB_STATUS_CANCELING:
+      return (False, "Job %s is cancelling" % self.id)
+    else:
+      assert status in (constants.JOB_STATUS_QUEUED,
+                        constants.JOB_STATUS_WAITING,
+                        constants.JOB_STATUS_RUNNING)
+
+      changed = False
+      for op in self.ops:
+        if (op.status == constants.OP_STATUS_RUNNING or
+            op.status in constants.OPS_FINALIZED):
+          assert not changed, \
+            ("Found opcode for which priority should not be changed after"
+             " priority has been changed for previous opcodes")
+          continue
+
+        assert op.status in (constants.OP_STATUS_QUEUED,
+                             constants.OP_STATUS_WAITING)
+
+        changed = True
+
+        # Set new priority (doesn't modify opcode input)
+        op.priority = priority
+
+      if changed:
+        return (True, ("Priorities of pending opcodes for job %s have been"
+                       " changed to %s" % (self.id, priority)))
+      else:
+        return (False, "Job %s had no pending opcodes" % self.id)
+
 
 class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
@@ -496,6 +573,11 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
+    # See if queue is shutting down
+    if not self._queue.AcceptingJobsUnlocked():
+      logging.debug("Queue is shutting down")
+      raise QueueShutdown()
+
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
@@ -547,8 +629,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     timestamp = utils.SplitTime(time.time())
     self._AppendFeedback(timestamp, log_type, log_msg)
 
-  def CheckCancel(self):
-    """Check whether job has been cancelled.
+  def CurrentPriority(self):
+    """Returns current priority for opcode.
 
     """
     assert self._op.status in (constants.OP_STATUS_WAITING,
@@ -557,6 +639,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+    return self._op.priority
+
   def SubmitManyJobs(self, jobs):
     """Submits jobs for processing.
 
@@ -622,7 +706,7 @@ class _JobChangesChecker(object):
 
 
 class _JobFileChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
     """Initializes this class.
 
     @type filename: string
@@ -630,7 +714,7 @@ class _JobFileChangesWaiter(object):
     @raises errors.InotifyError: if the notifier cannot be setup
 
     """
-    self._wm = pyinotify.WatchManager()
+    self._wm = _inotify_wm_cls()
     self._inotify_handler = \
       asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
     self._notifier = \
@@ -672,7 +756,7 @@ class _JobFileChangesWaiter(object):
 
 
 class _JobChangesWaiter(object):
-  def __init__(self, filename):
+  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
     """Initializes this class.
 
     @type filename: string
@@ -681,6 +765,7 @@ class _JobChangesWaiter(object):
     """
     self._filewaiter = None
     self._filename = filename
+    self._waiter_cls = _waiter_cls
 
   def Wait(self, timeout):
     """Waits for a job to change.
@@ -697,7 +782,7 @@ class _JobChangesWaiter(object):
     # If this point is reached, return immediately and let caller check the job
     # file again in case there were changes since the last check. This avoids a
     # race condition.
-    self._filewaiter = _JobFileChangesWaiter(self._filename)
+    self._filewaiter = self._waiter_cls(self._filename)
 
     return True
 
@@ -735,7 +820,8 @@ class _WaitForJobChangesHelper(object):
     return result
 
   def __call__(self, filename, job_load_fn,
-               fields, prev_job_info, prev_log_serial, timeout):
+               fields, prev_job_info, prev_log_serial, timeout,
+               _waiter_cls=_JobChangesWaiter):
     """Waits for changes on a job.
 
     @type filename: string
@@ -755,7 +841,7 @@ class _WaitForJobChangesHelper(object):
     counter = itertools.count()
     try:
       check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
-      waiter = _JobChangesWaiter(filename)
+      waiter = _waiter_cls(filename)
       try:
         return utils.Retry(compat.partial(self._CheckForChanges,
                                           counter, job_load_fn, check_fn),
@@ -763,7 +849,7 @@ class _WaitForJobChangesHelper(object):
                            wait_fn=waiter.Wait)
       finally:
         waiter.Close()
-    except (errors.InotifyError, errors.JobLost):
+    except errors.JobLost:
       return None
     except utils.RetryTimeout:
       return constants.JOB_NOTCHANGED
@@ -1026,7 +1112,7 @@ class _JobProcessor(object):
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
                               _OpExecCallbacks(self.queue, self.job, op),
-                              timeout=timeout, priority=op.priority)
+                              timeout=timeout)
     except mcpu.LockAcquireTimeout:
       assert timeout is not None, "Received timeout for blocking acquire"
       logging.debug("Couldn't acquire locks in %0.6fs", timeout)
@@ -1038,12 +1124,25 @@ class _JobProcessor(object):
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
+      # Queue is shutting down, return to queued
+      if not self.queue.AcceptingJobsUnlocked():
+        return (constants.OP_STATUS_QUEUED, None)
+
       # Stay in waitlock while trying to re-acquire lock
       return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
+
+    except QueueShutdown:
+      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
+
+      assert op.status == constants.OP_STATUS_WAITING
+
+      # Job hadn't been started yet, so it should return to the queue
+      return (constants.OP_STATUS_QUEUED, None)
+
     except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
@@ -1141,8 +1240,10 @@ class _JobProcessor(object):
 
           assert not waitjob
 
-        if op.status == constants.OP_STATUS_WAITING:
-          # Couldn't get locks in time
+        if op.status in (constants.OP_STATUS_WAITING,
+                         constants.OP_STATUS_QUEUED):
+          # waiting: Couldn't get locks in time
+          # queued: Queue is shutting down
           assert not op.end_timestamp
         else:
           # Finalize opcode
@@ -1154,7 +1255,19 @@ class _JobProcessor(object):
           else:
             assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_WAITING or waitjob:
+      if op.status == constants.OP_STATUS_QUEUED:
+        # Queue is shutting down
+        assert not waitjob
+
+        finalize = False
+
+        # Reset context
+        job.cur_opctx = None
+
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
+
+      elif op.status == constants.OP_STATUS_WAITING or waitjob:
         finalize = False
 
         if not waitjob and opctx.CheckPriorityIncrease():
@@ -1706,6 +1819,15 @@ class JobQueue(object):
         logging.error("Failed to upload file %s to node %s: %s",
                       file_name, node_name, msg)
 
+    # Set queue drained flag
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name],
+                                                       self._drained)
+    msg = result[node_name].fail_msg
+    if msg:
+      logging.error("Failed to set queue drained flag on node %s: %s",
+                    node_name, msg)
+
     self._nodes[node_name] = node.primary_ip
 
   @locking.ssynchronized(_LOCK)
@@ -1780,7 +1902,8 @@ class JobQueue(object):
     """
     getents = runtime.GetEnts()
     utils.WriteFile(file_name, data=data, uid=getents.masterd_uid,
-                    gid=getents.masterd_gid)
+                    gid=getents.daemons_gid,
+                    mode=constants.JOB_QUEUE_FILES_PERMS)
 
     if replicate:
       names, addrs = self._GetNodeIp()
@@ -1817,7 +1940,7 @@ class JobQueue(object):
     @return: a list of job identifiers.
 
     """
-    assert ht.TPositiveInt(count)
+    assert ht.TNonNegativeInt(count)
 
     # New number
     serial = self._last_serial + count
@@ -1863,7 +1986,25 @@ class JobQueue(object):
                           "job-%s" % job_id)
 
   @staticmethod
-  def _GetJobIDsUnlocked(sort=True):
+  def _DetermineJobDirectories(archived):
+    """Build list of directories containing job files.
+
+    @type archived: bool
+    @param archived: Whether to include directories for archived jobs
+    @rtype: list
+
+    """
+    result = [pathutils.QUEUE_DIR]
+
+    if archived:
+      archive_path = pathutils.JOB_QUEUE_ARCHIVE_DIR
+      result.extend(map(compat.partial(utils.PathJoin, archive_path),
+                        utils.ListVisibleFiles(archive_path)))
+
+    return result
+
+  @classmethod
+  def _GetJobIDsUnlocked(cls, sort=True, archived=False):
     """Return all known job IDs.
 
     The method only looks at disk because it's a requirement that all
@@ -1877,10 +2018,13 @@ class JobQueue(object):
 
     """
     jlist = []
-    for filename in utils.ListVisibleFiles(pathutils.QUEUE_DIR):
-      m = constants.JOB_FILE_RE.match(filename)
-      if m:
-        jlist.append(int(m.group(1)))
+
+    for path in cls._DetermineJobDirectories(archived):
+      for filename in utils.ListVisibleFiles(path):
+        m = constants.JOB_FILE_RE.match(filename)
+        if m:
+          jlist.append(int(m.group(1)))
+
     if sort:
       jlist.sort()
     return jlist
@@ -1939,15 +2083,15 @@ class JobQueue(object):
     @return: either None or the job object
 
     """
-    path_functions = [(self._GetJobPath, True)]
+    path_functions = [(self._GetJobPath, False)]
 
     if try_archived:
-      path_functions.append((self._GetArchivedJobPath, False))
+      path_functions.append((self._GetArchivedJobPath, True))
 
     raw_data = None
-    writable_default = None
+    archived = None
 
-    for (fn, writable_default) in path_functions:
+    for (fn, archived) in path_functions:
       filepath = fn(job_id)
       logging.debug("Loading job from %s", filepath)
       try:
@@ -1962,11 +2106,11 @@ class JobQueue(object):
       return None
 
     if writable is None:
-      writable = writable_default
+      writable = not archived
 
     try:
       data = serializer.LoadJson(raw_data)
-      job = _QueuedJob.Restore(self, data, writable)
+      job = _QueuedJob.Restore(self, data, writable, archived)
     except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
@@ -2008,10 +2152,18 @@ class JobQueue(object):
     @param drain_flag: Whether to set or unset the drain flag
 
     """
+    # Change flag locally
     jstore.SetDrainFlag(drain_flag)
 
     self._drained = drain_flag
 
+    # ... and on all nodes
+    (names, addrs) = self._GetNodeIp()
+    result = \
+      self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag)
+    self._CheckRpcResult(result, self._nodes,
+                         "Setting queue drain flag to %s" % drain_flag)
+
     return True
 
   @_RequireOpenQueue
@@ -2036,13 +2188,14 @@ class JobQueue(object):
 
     job = _QueuedJob(self, job_id, ops, True)
 
-    # Check priority
     for idx, op in enumerate(job.ops):
+      # Check priority
       if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
         allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
+      # Check job dependencies
       dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
       if not opcodes.TNoRelativeJobDependencies(dependencies):
         raise errors.GenericError("Opcode %s has invalid dependencies, must"
@@ -2107,8 +2260,10 @@ class JobQueue(object):
     @param resolve_fn: Function to resolve a relative job ID
     @type deps: list
     @param deps: Dependencies
-    @rtype: list
-    @return: Resolved dependencies
+    @rtype: tuple; (boolean, string or list)
+    @return: If successful (first tuple item), the returned list contains
+      resolved job IDs along with the requested status; if not successful,
+      the second element is an error message
 
     """
     result = []
@@ -2187,7 +2342,8 @@ class JobQueue(object):
     """
     assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
     self._wpool.AddManyTasks([(job, ) for job in jobs],
-                             priority=[job.CalcPriority() for job in jobs])
+                             priority=[job.CalcPriority() for job in jobs],
+                             task_id=map(_GetIdAttr, jobs))
 
   def _GetJobStatusForDependencies(self, job_id):
     """Gets the status of a job for dependencies.
@@ -2227,6 +2383,7 @@ class JobQueue(object):
       finalized = job.CalcStatus() in constants.JOBS_FINALIZED
       assert (finalized ^ (job.end_timestamp is None))
       assert job.writable, "Can't update read-only job"
+      assert not job.archived, "Can't update archived job"
 
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize())
@@ -2278,14 +2435,58 @@ class JobQueue(object):
     """
     logging.info("Cancelling job %s", job_id)
 
+    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
+
+  @locking.ssynchronized(_LOCK)
+  @_RequireOpenQueue
+  def ChangeJobPriority(self, job_id, priority):
+    """Changes a job's priority.
+
+    @type job_id: int
+    @param job_id: ID of the job whose priority should be changed
+    @type priority: int
+    @param priority: New priority
+
+    """
+    logging.info("Changing priority of job %s to %s", job_id, priority)
+
+    if priority not in constants.OP_PRIO_SUBMIT_VALID:
+      allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
+      raise errors.GenericError("Invalid priority %s, allowed are %s" %
+                                (priority, allowed))
+
+    def fn(job):
+      (success, msg) = job.ChangePriority(priority)
+
+      if success:
+        try:
+          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
+        except workerpool.NoSuchTask:
+          logging.debug("Job %s is not in workerpool at this time", job.id)
+
+      return (success, msg)
+
+    return self._ModifyJobUnlocked(job_id, fn)
+
+  def _ModifyJobUnlocked(self, job_id, mod_fn):
+    """Modifies a job.
+
+    @type job_id: int
+    @param job_id: Job ID
+    @type mod_fn: callable
+    @param mod_fn: Modifying function, receiving job object as parameter,
+      returning tuple of (status boolean, message string)
+
+    """
     job = self._LoadJobUnlocked(job_id)
     if not job:
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
-    assert job.writable, "Can't cancel read-only job"
+    assert job.writable, "Can't modify read-only job"
+    assert not job.archived, "Can't modify archived job"
 
-    (success, msg) = job.Cancel()
+    (success, msg) = mod_fn(job)
 
     if success:
       # If the job was finalized (e.g. cancelled), this is the final write
@@ -2308,6 +2509,7 @@ class JobQueue(object):
     rename_files = []
     for job in jobs:
       assert job.writable, "Can't archive read-only job"
+      assert not job.archived, "Can't cancel archived job"
 
       if job.CalcStatus() not in constants.JOBS_FINALIZED:
         logging.debug("Job %s is not yet done", job.id)
@@ -2414,6 +2616,11 @@ class JobQueue(object):
     qobj = query.Query(query.JOB_FIELDS, fields, qfilter=qfilter,
                        namefield="id")
 
+    # Archived jobs are only looked at if the "archived" field is referenced
+    # either as a requested field or in the filter. By default archived jobs
+    # are ignored.
+    include_archived = (query.JQ_ARCHIVED in qobj.RequestedData())
+
     job_ids = qobj.RequestedNames()
 
     list_all = (job_ids is None)
@@ -2421,7 +2628,7 @@ class JobQueue(object):
     if list_all:
       # Since files are added to/removed from the queue atomically, there's no
       # risk of getting the job ids in an inconsistent state.
-      job_ids = self._GetJobIDsUnlocked()
+      job_ids = self._GetJobIDsUnlocked(archived=include_archived)
 
     jobs = []
 
@@ -2490,6 +2697,17 @@ class JobQueue(object):
 
     return self._wpool.HasRunningTasks()
 
+  def AcceptingJobsUnlocked(self):
+    """Returns whether jobs are accepted.
+
+    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
+    queue is shutting down.
+
+    @rtype: bool
+
+    """
+    return self._accepting_jobs
+
   @locking.ssynchronized(_LOCK)
   @_RequireOpenQueue
   def Shutdown(self):