Revision b247c6fc

b/lib/ht.py
22 22
"""Module implementing the parameter types code."""
23 23

  
24 24
import re
25
import operator
25 26

  
26 27
from ganeti import compat
27 28
from ganeti import utils
......
297 298
TStrictPositiveInt = \
298 299
  TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0))
299 300

  
301
#: a strictly negative integer (0 > value)
302
TStrictNegativeInt = \
303
  TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0)))
304

  
300 305
#: a positive float
301 306
TPositiveFloat = \
302 307
  TAnd(TFloat, WithDesc("EqualGreaterZero")(lambda v: v >= 0.0))
......
308 313
#: Number
309 314
TNumber = TOr(TInt, TFloat)
310 315

  
316
#: Relative job ID
317
TRelativeJobId = WithDesc("RelativeJobId")(TStrictNegativeInt)
318

  
311 319

  
312 320
def TListOf(my_type):
313 321
  """Checks if a given value is a list with all elements of the same type.
b/lib/jqueue.py
1982 1982
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
1983 1983
                                  " are %s" % (idx, op.priority, allowed))
1984 1984

  
1985
      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
1986
      if not opcodes.TNoRelativeJobDependencies(dependencies):
1987
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
1988
                                  " match %s: %s" %
1989
                                  (idx, opcodes.TNoRelativeJobDependencies,
1990
                                   dependencies))
1991

  
1985 1992
    # Write to disk
1986 1993
    self.UpdateJobUnlocked(job)
1987 1994

  
......
2000 2007
    @see: L{_SubmitJobUnlocked}
2001 2008

  
2002 2009
    """
2003
    job_id = self._NewSerialsUnlocked(1)[0]
2010
    (job_id, ) = self._NewSerialsUnlocked(1)
2004 2011
    self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
2005 2012
    return job_id
2006 2013

  
......
2012 2019
    @see: L{_SubmitJobUnlocked}
2013 2020

  
2014 2021
    """
2015
    results = []
2016
    added_jobs = []
2017 2022
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2018
    for job_id, ops in zip(all_job_ids, jobs):
2019
      try:
2020
        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
2021
        status = True
2022
        data = job_id
2023
      except errors.GenericError, err:
2024
        data = ("%s; opcodes %s" %
2025
                (err, utils.CommaJoin(op.Summary() for op in ops)))
2026
        status = False
2027
      results.append((status, data))
2023

  
2024
    (results, added_jobs) = \
2025
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2028 2026

  
2029 2027
    self._EnqueueJobs(added_jobs)
2030 2028

  
2031 2029
    return results
2032 2030

  
2031
  @staticmethod
2032
  def _FormatSubmitError(msg, ops):
2033
    """Formats errors which occurred while submitting a job.
2034

  
2035
    """
2036
    return ("%s; opcodes %s" %
2037
            (msg, utils.CommaJoin(op.Summary() for op in ops)))
2038

  
2039
  @staticmethod
2040
  def _ResolveJobDependencies(resolve_fn, deps):
2041
    """Resolves relative job IDs in dependencies.
2042

  
2043
    @type resolve_fn: callable
2044
    @param resolve_fn: Function to resolve a relative job ID
2045
    @type deps: list
2046
    @param deps: Dependencies
2047
    @rtype: list
2048
    @return: Resolved dependencies
2049

  
2050
    """
2051
    result = []
2052

  
2053
    for (dep_job_id, dep_status) in deps:
2054
      if ht.TRelativeJobId(dep_job_id):
2055
        assert ht.TInt(dep_job_id) and dep_job_id < 0
2056
        try:
2057
          job_id = resolve_fn(dep_job_id)
2058
        except IndexError:
2059
          # Abort
2060
          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
2061
      else:
2062
        job_id = dep_job_id
2063

  
2064
      result.append((job_id, dep_status))
2065

  
2066
    return (True, result)
2067

  
2068
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2069
    """Create and store multiple jobs.
2070

  
2071
    @see: L{_SubmitJobUnlocked}
2072

  
2073
    """
2074
    results = []
2075
    added_jobs = []
2076

  
2077
    def resolve_fn(job_idx, reljobid):
2078
      assert reljobid < 0
2079
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2080

  
2081
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2082
      for op in ops:
2083
        if getattr(op, opcodes.DEPEND_ATTR, None):
2084
          (status, data) = \
2085
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2086
                                         op.depends)
2087
          if not status:
2088
            # Abort resolving dependencies
2089
            assert ht.TNonEmptyString(data), "No error message"
2090
            break
2091
          # Use resolved dependencies
2092
          op.depends = data
2093
      else:
2094
        try:
2095
          job = self._SubmitJobUnlocked(job_id, ops)
2096
        except errors.GenericError, err:
2097
          status = False
2098
          data = self._FormatSubmitError(str(err), ops)
2099
        else:
2100
          status = True
2101
          data = job_id
2102
          added_jobs.append(job)
2103

  
2104
      results.append((status, data))
2105

  
2106
    return (results, added_jobs)
2107

  
2033 2108
  def _EnqueueJobs(self, jobs):
2034 2109
    """Helper function to add jobs to worker pool's queue.
2035 2110

  
b/lib/opcodes.py
393 393
                                     errors.ECODE_INVAL)
394 394

  
395 395

  
396
def _BuildJobDepCheck(relative):
397
  """Builds check for job dependencies (L{DEPEND_ATTR}).
398

  
399
  @type relative: bool
400
  @param relative: Whether to accept relative job IDs (negative)
401
  @rtype: callable
402

  
403
  """
404
  if relative:
405
    job_id = ht.TOr(ht.TJobId, ht.TRelativeJobId)
406
  else:
407
    job_id = ht.TJobId
408

  
409
  job_dep = \
410
    ht.TAnd(ht.TIsLength(2),
411
            ht.TItems([job_id,
412
                       ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))]))
413

  
414
  return ht.TOr(ht.TNone, ht.TListOf(job_dep))
415

  
416

  
417
TNoRelativeJobDependencies = _BuildJobDepCheck(False)
418

  
419

  
396 420
class OpCode(BaseOpCode):
397 421
  """Abstract OpCode.
398 422

  
......
416 440
  # pylint: disable-msg=E1101
417 441
  # as OP_ID is dynamically defined
418 442
  WITH_LU = True
419
  _T_JOB_DEP = \
420
    ht.TAnd(ht.TIsLength(2),
421
            ht.TItems([ht.TJobId,
422
                       ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))]))
423 443
  OP_PARAMS = [
424 444
    ("dry_run", None, ht.TMaybeBool, "Run checks only, don't execute"),
425 445
    ("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"),
426 446
    ("priority", constants.OP_PRIO_DEFAULT,
427 447
     ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"),
428
    (DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
429
     "Job dependencies"),
448
    (DEPEND_ATTR, None, _BuildJobDepCheck(True),
449
     "Job dependencies; if used through ``SubmitManyJobs`` relative (negative)"
450
     " job IDs can be used"),
430 451
    ]
431 452

  
432 453
  def __getstate__(self):
b/test/ganeti.ht_unittest.py
250 250
              None, [], {}, object()]:
251 251
      self.assertFalse(ht.TJobId(i))
252 252

  
253
  def testRelativeJobId(self):
254
    for i in [-1, -93, -4395]:
255
      self.assertTrue(ht.TRelativeJobId(i))
256
      self.assertFalse(ht.TRelativeJobId(str(i)))
257

  
258
    for i in [0, 1, 2, 10, 9289, "", "0", "-1", "-999"]:
259
      self.assertFalse(ht.TRelativeJobId(i))
260
      self.assertFalse(ht.TRelativeJobId(str(i)))
261

  
253 262
  def testItems(self):
254 263
    self.assertRaises(AssertionError, ht.TItems, [])
255 264

  
b/test/ganeti.opcodes_unittest.py
273 273
    self.assertEqual(op.debug_level, 123)
274 274

  
275 275

  
276
class TestOpcodeDepends(unittest.TestCase):
277
  def test(self):
278
    check_relative = opcodes._BuildJobDepCheck(True)
279
    check_norelative = opcodes.TNoRelativeJobDependencies
280

  
281
    for fn in [check_relative, check_norelative]:
282
      self.assertTrue(fn(None))
283
      self.assertTrue(fn([]))
284
      self.assertTrue(fn([(1, [])]))
285
      self.assertTrue(fn([(719833, [])]))
286
      self.assertTrue(fn([("24879", [])]))
287
      self.assertTrue(fn([(2028, [constants.JOB_STATUS_ERROR])]))
288
      self.assertTrue(fn([
289
        (2028, [constants.JOB_STATUS_ERROR]),
290
        (18750, []),
291
        (5063, [constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR]),
292
        ]))
293

  
294
      self.assertFalse(fn(1))
295
      self.assertFalse(fn([(9, )]))
296
      self.assertFalse(fn([(15194, constants.JOB_STATUS_ERROR)]))
297

  
298
    for i in [
299
      [(-1, [])],
300
      [(-27740, [constants.JOB_STATUS_CANCELED, constants.JOB_STATUS_ERROR]),
301
       (-1, [constants.JOB_STATUS_ERROR]),
302
       (9921, [])],
303
      ]:
304
      self.assertTrue(check_relative(i))
305
      self.assertFalse(check_norelative(i))
306

  
307

  
276 308
if __name__ == "__main__":
277 309
  testutils.GanetiTestProgram()

Also available in: Unified diff