Revision b247c6fc
b/lib/ht.py | ||
---|---|---|
22 | 22 |
"""Module implementing the parameter types code.""" |
23 | 23 |
|
24 | 24 |
import re |
25 |
import operator |
|
25 | 26 |
|
26 | 27 |
from ganeti import compat |
27 | 28 |
from ganeti import utils |
... | ... | |
297 | 298 |
TStrictPositiveInt = \ |
298 | 299 |
TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0)) |
299 | 300 |
|
301 |
#: a strictly negative integer (0 > value) |
|
302 |
TStrictNegativeInt = \ |
|
303 |
TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0))) |
|
304 |
|
|
300 | 305 |
#: a positive float |
301 | 306 |
TPositiveFloat = \ |
302 | 307 |
TAnd(TFloat, WithDesc("EqualGreaterZero")(lambda v: v >= 0.0)) |
... | ... | |
308 | 313 |
#: Number |
309 | 314 |
TNumber = TOr(TInt, TFloat) |
310 | 315 |
|
316 |
#: Relative job ID |
|
317 |
TRelativeJobId = WithDesc("RelativeJobId")(TStrictNegativeInt) |
|
318 |
|
|
311 | 319 |
|
312 | 320 |
def TListOf(my_type): |
313 | 321 |
"""Checks if a given value is a list with all elements of the same type. |
b/lib/jqueue.py | ||
---|---|---|
1982 | 1982 |
raise errors.GenericError("Opcode %s has invalid priority %s, allowed" |
1983 | 1983 |
" are %s" % (idx, op.priority, allowed)) |
1984 | 1984 |
|
1985 |
dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) |
|
1986 |
if not opcodes.TNoRelativeJobDependencies(dependencies): |
|
1987 |
raise errors.GenericError("Opcode %s has invalid dependencies, must" |
|
1988 |
" match %s: %s" % |
|
1989 |
(idx, opcodes.TNoRelativeJobDependencies, |
|
1990 |
dependencies)) |
|
1991 |
|
|
1985 | 1992 |
# Write to disk |
1986 | 1993 |
self.UpdateJobUnlocked(job) |
1987 | 1994 |
|
... | ... | |
2000 | 2007 |
@see: L{_SubmitJobUnlocked} |
2001 | 2008 |
|
2002 | 2009 |
""" |
2003 |
job_id = self._NewSerialsUnlocked(1)[0]
|
|
2010 |
(job_id, ) = self._NewSerialsUnlocked(1)
|
|
2004 | 2011 |
self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) |
2005 | 2012 |
return job_id |
2006 | 2013 |
|
... | ... | |
2012 | 2019 |
@see: L{_SubmitJobUnlocked} |
2013 | 2020 |
|
2014 | 2021 |
""" |
2015 |
results = [] |
|
2016 |
added_jobs = [] |
|
2017 | 2022 |
all_job_ids = self._NewSerialsUnlocked(len(jobs)) |
2018 |
for job_id, ops in zip(all_job_ids, jobs): |
|
2019 |
try: |
|
2020 |
added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) |
|
2021 |
status = True |
|
2022 |
data = job_id |
|
2023 |
except errors.GenericError, err: |
|
2024 |
data = ("%s; opcodes %s" % |
|
2025 |
(err, utils.CommaJoin(op.Summary() for op in ops))) |
|
2026 |
status = False |
|
2027 |
results.append((status, data)) |
|
2023 |
|
|
2024 |
(results, added_jobs) = \ |
|
2025 |
self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) |
|
2028 | 2026 |
|
2029 | 2027 |
self._EnqueueJobs(added_jobs) |
2030 | 2028 |
|
2031 | 2029 |
return results |
2032 | 2030 |
|
2031 |
@staticmethod |
|
2032 |
def _FormatSubmitError(msg, ops): |
|
2033 |
"""Formats errors which occurred while submitting a job. |
|
2034 |
|
|
2035 |
""" |
|
2036 |
return ("%s; opcodes %s" % |
|
2037 |
(msg, utils.CommaJoin(op.Summary() for op in ops))) |
|
2038 |
|
|
2039 |
@staticmethod |
|
2040 |
def _ResolveJobDependencies(resolve_fn, deps): |
|
2041 |
"""Resolves relative job IDs in dependencies. |
|
2042 |
|
|
2043 |
@type resolve_fn: callable |
|
2044 |
@param resolve_fn: Function to resolve a relative job ID |
|
2045 |
@type deps: list |
|
2046 |
@param deps: Dependencies |
|
2047 |
@rtype: list |
|
2048 |
@return: Resolved dependencies |
|
2049 |
|
|
2050 |
""" |
|
2051 |
result = [] |
|
2052 |
|
|
2053 |
for (dep_job_id, dep_status) in deps: |
|
2054 |
if ht.TRelativeJobId(dep_job_id): |
|
2055 |
assert ht.TInt(dep_job_id) and dep_job_id < 0 |
|
2056 |
try: |
|
2057 |
job_id = resolve_fn(dep_job_id) |
|
2058 |
except IndexError: |
|
2059 |
# Abort |
|
2060 |
return (False, "Unable to resolve relative job ID %s" % dep_job_id) |
|
2061 |
else: |
|
2062 |
job_id = dep_job_id |
|
2063 |
|
|
2064 |
result.append((job_id, dep_status)) |
|
2065 |
|
|
2066 |
return (True, result) |
|
2067 |
|
|
2068 |
def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids): |
|
2069 |
"""Create and store multiple jobs. |
|
2070 |
|
|
2071 |
@see: L{_SubmitJobUnlocked} |
|
2072 |
|
|
2073 |
""" |
|
2074 |
results = [] |
|
2075 |
added_jobs = [] |
|
2076 |
|
|
2077 |
def resolve_fn(job_idx, reljobid): |
|
2078 |
assert reljobid < 0 |
|
2079 |
return (previous_job_ids + job_ids[:job_idx])[reljobid] |
|
2080 |
|
|
2081 |
for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): |
|
2082 |
for op in ops: |
|
2083 |
if getattr(op, opcodes.DEPEND_ATTR, None): |
|
2084 |
(status, data) = \ |
|
2085 |
self._ResolveJobDependencies(compat.partial(resolve_fn, idx), |
|
2086 |
op.depends) |
|
2087 |
if not status: |
|
2088 |
# Abort resolving dependencies |
|
2089 |
assert ht.TNonEmptyString(data), "No error message" |
|
2090 |
break |
|
2091 |
# Use resolved dependencies |
|
2092 |
op.depends = data |
|
2093 |
else: |
|
2094 |
try: |
|
2095 |
job = self._SubmitJobUnlocked(job_id, ops) |
|
2096 |
except errors.GenericError, err: |
|
2097 |
status = False |
|
2098 |
data = self._FormatSubmitError(str(err), ops) |
|
2099 |
else: |
|
2100 |
status = True |
|
2101 |
data = job_id |
|
2102 |
added_jobs.append(job) |
|
2103 |
|
|
2104 |
results.append((status, data)) |
|
2105 |
|
|
2106 |
return (results, added_jobs) |
|
2107 |
|
|
2033 | 2108 |
def _EnqueueJobs(self, jobs): |
2034 | 2109 |
"""Helper function to add jobs to worker pool's queue. |
2035 | 2110 |
|
b/lib/opcodes.py | ||
---|---|---|
393 | 393 |
errors.ECODE_INVAL) |
394 | 394 |
|
395 | 395 |
|
396 |
def _BuildJobDepCheck(relative): |
|
397 |
"""Builds check for job dependencies (L{DEPEND_ATTR}). |
|
398 |
|
|
399 |
@type relative: bool |
|
400 |
@param relative: Whether to accept relative job IDs (negative) |
|
401 |
@rtype: callable |
|
402 |
|
|
403 |
""" |
|
404 |
if relative: |
|
405 |
job_id = ht.TOr(ht.TJobId, ht.TRelativeJobId) |
|
406 |
else: |
|
407 |
job_id = ht.TJobId |
|
408 |
|
|
409 |
job_dep = \ |
|
410 |
ht.TAnd(ht.TIsLength(2), |
|
411 |
ht.TItems([job_id, |
|
412 |
ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))])) |
|
413 |
|
|
414 |
return ht.TOr(ht.TNone, ht.TListOf(job_dep)) |
|
415 |
|
|
416 |
|
|
417 |
TNoRelativeJobDependencies = _BuildJobDepCheck(False) |
|
418 |
|
|
419 |
|
|
396 | 420 |
class OpCode(BaseOpCode): |
397 | 421 |
"""Abstract OpCode. |
398 | 422 |
|
... | ... | |
416 | 440 |
# pylint: disable-msg=E1101 |
417 | 441 |
# as OP_ID is dynamically defined |
418 | 442 |
WITH_LU = True |
419 |
_T_JOB_DEP = \ |
|
420 |
ht.TAnd(ht.TIsLength(2), |
|
421 |
ht.TItems([ht.TJobId, |
|
422 |
ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))])) |
|
423 | 443 |
OP_PARAMS = [ |
424 | 444 |
("dry_run", None, ht.TMaybeBool, "Run checks only, don't execute"), |
425 | 445 |
("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"), |
426 | 446 |
("priority", constants.OP_PRIO_DEFAULT, |
427 | 447 |
ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"), |
428 |
(DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)), |
|
429 |
"Job dependencies"), |
|
448 |
(DEPEND_ATTR, None, _BuildJobDepCheck(True), |
|
449 |
"Job dependencies; if used through ``SubmitManyJobs`` relative (negative)" |
|
450 |
" job IDs can be used"), |
|
430 | 451 |
] |
431 | 452 |
|
432 | 453 |
def __getstate__(self): |
b/test/ganeti.ht_unittest.py | ||
---|---|---|
250 | 250 |
None, [], {}, object()]: |
251 | 251 |
self.assertFalse(ht.TJobId(i)) |
252 | 252 |
|
253 |
def testRelativeJobId(self): |
|
254 |
for i in [-1, -93, -4395]: |
|
255 |
self.assertTrue(ht.TRelativeJobId(i)) |
|
256 |
self.assertFalse(ht.TRelativeJobId(str(i))) |
|
257 |
|
|
258 |
for i in [0, 1, 2, 10, 9289, "", "0", "-1", "-999"]: |
|
259 |
self.assertFalse(ht.TRelativeJobId(i)) |
|
260 |
self.assertFalse(ht.TRelativeJobId(str(i))) |
|
261 |
|
|
253 | 262 |
def testItems(self): |
254 | 263 |
self.assertRaises(AssertionError, ht.TItems, []) |
255 | 264 |
|
b/test/ganeti.opcodes_unittest.py | ||
---|---|---|
273 | 273 |
self.assertEqual(op.debug_level, 123) |
274 | 274 |
|
275 | 275 |
|
276 |
class TestOpcodeDepends(unittest.TestCase): |
|
277 |
def test(self): |
|
278 |
check_relative = opcodes._BuildJobDepCheck(True) |
|
279 |
check_norelative = opcodes.TNoRelativeJobDependencies |
|
280 |
|
|
281 |
for fn in [check_relative, check_norelative]: |
|
282 |
self.assertTrue(fn(None)) |
|
283 |
self.assertTrue(fn([])) |
|
284 |
self.assertTrue(fn([(1, [])])) |
|
285 |
self.assertTrue(fn([(719833, [])])) |
|
286 |
self.assertTrue(fn([("24879", [])])) |
|
287 |
self.assertTrue(fn([(2028, [constants.JOB_STATUS_ERROR])])) |
|
288 |
self.assertTrue(fn([ |
|
289 |
(2028, [constants.JOB_STATUS_ERROR]), |
|
290 |
(18750, []), |
|
291 |
(5063, [constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR]), |
|
292 |
])) |
|
293 |
|
|
294 |
self.assertFalse(fn(1)) |
|
295 |
self.assertFalse(fn([(9, )])) |
|
296 |
self.assertFalse(fn([(15194, constants.JOB_STATUS_ERROR)])) |
|
297 |
|
|
298 |
for i in [ |
|
299 |
[(-1, [])], |
|
300 |
[(-27740, [constants.JOB_STATUS_CANCELED, constants.JOB_STATUS_ERROR]), |
|
301 |
(-1, [constants.JOB_STATUS_ERROR]), |
|
302 |
(9921, [])], |
|
303 |
]: |
|
304 |
self.assertTrue(check_relative(i)) |
|
305 |
self.assertFalse(check_norelative(i)) |
|
306 |
|
|
307 |
|
|
276 | 308 |
if __name__ == "__main__": |
277 | 309 |
testutils.GanetiTestProgram() |
Also available in: Unified diff