Revision b247c6fc lib/jqueue.py

b/lib/jqueue.py
1982 1982
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1983 1983
                                  " are %s" % (idx, op.priority, allowed))
1984 1984

  
1985
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
1986
      if not opcodes.TNoRelativeJobDependencies(dependencies):
1987
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
1988
                                  " match %s: %s" %
1989
                                  (idx, opcodes.TNoRelativeJobDependencies,
1990
                                   dependencies))
1991

  
1985 1992
    # Write to disk
1986 1993
    self.UpdateJobUnlocked(job)
1987 1994

  
......
2000 2007
    @see: L{_SubmitJobUnlocked}
2001 2008

  
2002 2009
    """
2003
    job_id = self._NewSerialsUnlocked(1)[0]
2010
    (job_id, ) = self._NewSerialsUnlocked(1)
2004 2011
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2005 2012
    return job_id
2006 2013

  
......
2012 2019
    @see: L{_SubmitJobUnlocked}
2013 2020

  
2014 2021
    """
2015
    results = []
2016
    added_jobs = []
2017 2022
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2018
    for job_id, ops in zip(all_job_ids, jobs):
2019
      try:
2020
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
2021
        status = True
2022
        data = job_id
2023
      except errors.GenericError, err:
2024
        data = ("%s; opcodes %s" %
2025
                (err, utils.CommaJoin(op.Summary() for op in ops)))
2026
        status = False
2027
      results.append((status, data))
2023

  
2024
    (results, added_jobs) = \
2025
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2028 2026

  
2029 2027
    self._EnqueueJobs(added_jobs)
2030 2028

  
2031 2029
    return results
2032 2030

  
2031
  @staticmethod
2032
  def _FormatSubmitError(msg, ops):
2033
    """Formats errors which occurred while submitting a job.
2034

  
2035
    """
2036
    return ("%s; opcodes %s" %
2037
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2038

  
2039
  @staticmethod
2040
  def _ResolveJobDependencies(resolve_fn, deps):
2041
    """Resolves relative job IDs in dependencies.
2042

  
2043
    @type resolve_fn: callable
2044
    @param resolve_fn: Function to resolve a relative job ID
2045
    @type deps: list
2046
    @param deps: Dependencies
2047
    @rtype: list
2048
    @return: Resolved dependencies
2049

  
2050
    """
2051
    result = []
2052

  
2053
    for (dep_job_id, dep_status) in deps:
2054
      if ht.TRelativeJobId(dep_job_id):
2055
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2056
        try:
2057
          job_id = resolve_fn(dep_job_id)
2058
        except IndexError:
2059
          # Abort
2060
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2061
      else:
2062
        job_id = dep_job_id
2063

  
2064
      result.append((job_id, dep_status))
2065

  
2066
    return (True, result)
2067

  
2068
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2069
    """Create and store multiple jobs.
2070

  
2071
    @see: L{_SubmitJobUnlocked}
2072

  
2073
    """
2074
    results = []
2075
    added_jobs = []
2076

  
2077
    def resolve_fn(job_idx, reljobid):
2078
      assert reljobid < 0
2079
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2080

  
2081
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2082
      for op in ops:
2083
        if getattr(op, opcodes.DEPEND_ATTR, None):
2084
          (status, data) = \
2085
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2086
                                         op.depends)
2087
          if not status:
2088
            # Abort resolving dependencies
2089
            assert ht.TNonEmptyString(data), "No error message"
2090
            break
2091
          # Use resolved dependencies
2092
          op.depends = data
2093
      else:
2094
        try:
2095
          job = self._SubmitJobUnlocked(job_id, ops)
2096
        except errors.GenericError, err:
2097
          status = False
2098
          data = self._FormatSubmitError(str(err), ops)
2099
        else:
2100
          status = True
2101
          data = job_id
2102
          added_jobs.append(job)
2103

  
2104
      results.append((status, data))
2105

  
2106
    return (results, added_jobs)
2107

  
2033 2108
  def _EnqueueJobs(self, jobs):
2034 2109
    """Helper function to add jobs to worker pool's queue.
2035 2110

  

Also available in: Unified diff