Revision 6a373640
b/lib/cmdlib.py | ||
---|---|---|
74 | 74 |
return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM] |
75 | 75 |
|
76 | 76 |
|
77 |
# End types |
|
77 |
class ResultWithJobs: |
|
78 |
"""Data container for LU results with jobs. |
|
79 |
|
|
80 |
Instances of this class returned from L{LogicalUnit.Exec} will be recognized |
|
81 |
by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs |
|
82 |
contained in the C{jobs} attribute and include the job IDs in the opcode |
|
83 |
result. |
|
84 |
|
|
85 |
""" |
|
86 |
def __init__(self, jobs, **kwargs): |
|
87 |
"""Initializes this class. |
|
88 |
|
|
89 |
Additional return values can be specified as keyword arguments. |
|
90 |
|
|
91 |
@type jobs: list of lists of L{opcode.OpCode} |
|
92 |
@param jobs: A list of lists of opcode objects |
|
93 |
|
|
94 |
""" |
|
95 |
self.jobs = jobs |
|
96 |
self.other = kwargs |
|
97 |
|
|
98 |
|
|
78 | 99 |
class LogicalUnit(object): |
79 | 100 |
"""Logical Unit base class. |
80 | 101 |
|
b/lib/constants.py | ||
---|---|---|
529 | 529 |
# Disk index separator |
530 | 530 |
DISK_SEPARATOR = _autoconf.DISK_SEPARATOR |
531 | 531 |
|
532 |
#: Key for job IDs in opcode result |
|
533 |
JOB_IDS_KEY = "jobs" |
|
534 |
|
|
532 | 535 |
# runparts results |
533 | 536 |
(RUNPARTS_SKIP, |
534 | 537 |
RUNPARTS_RUN, |
b/lib/jqueue.py | ||
---|---|---|
540 | 540 |
# Cancel here if we were asked to |
541 | 541 |
self._CheckCancel() |
542 | 542 |
|
543 |
def SubmitManyJobs(self, jobs): |
|
544 |
"""Submits jobs for processing. |
|
545 |
|
|
546 |
See L{JobQueue.SubmitManyJobs}. |
|
547 |
|
|
548 |
""" |
|
549 |
# Locking is done in job queue |
|
550 |
return self._queue.SubmitManyJobs(jobs) |
|
551 |
|
|
543 | 552 |
|
544 | 553 |
class _JobChangesChecker(object): |
545 | 554 |
def __init__(self, fields, prev_job_info, prev_log_serial): |
b/lib/mcpu.py | ||
---|---|---|
144 | 144 |
|
145 | 145 |
""" |
146 | 146 |
|
147 |
def SubmitManyJobs(self, jobs): |
|
148 |
"""Submits jobs for processing. |
|
149 |
|
|
150 |
See L{jqueue.JobQueue.SubmitManyJobs}. |
|
151 |
|
|
152 |
""" |
|
153 |
raise NotImplementedError |
|
154 |
|
|
147 | 155 |
|
148 | 156 |
def _LUNameForOpName(opname): |
149 | 157 |
"""Computes the LU name for a given OpCode name. |
... | ... | |
209 | 217 |
|
210 | 218 |
return acquired |
211 | 219 |
|
220 |
def _ProcessResult(self, result): |
|
221 |
""" |
|
222 |
|
|
223 |
""" |
|
224 |
if isinstance(result, cmdlib.ResultWithJobs): |
|
225 |
# Submit jobs |
|
226 |
job_submission = self._cbs.SubmitManyJobs(result.jobs) |
|
227 |
|
|
228 |
# Build dictionary |
|
229 |
result = result.other |
|
230 |
|
|
231 |
assert constants.JOB_IDS_KEY not in result, \ |
|
232 |
"Key '%s' found in additional return values" % constants.JOB_IDS_KEY |
|
233 |
|
|
234 |
result[constants.JOB_IDS_KEY] = job_submission |
|
235 |
|
|
236 |
return result |
|
237 |
|
|
212 | 238 |
def _ExecLU(self, lu): |
213 | 239 |
"""Logical Unit execution sequence. |
214 | 240 |
|
... | ... | |
229 | 255 |
return lu.dry_run_result |
230 | 256 |
|
231 | 257 |
try: |
232 |
result = lu.Exec(self.Log)
|
|
258 |
result = self._ProcessResult(lu.Exec(self.Log))
|
|
233 | 259 |
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST) |
234 | 260 |
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results, |
235 | 261 |
self.Log, result) |
b/lib/opcodes.py | ||
---|---|---|
1423 | 1423 |
("result", ht.NoDefault, ht.NoType, None), |
1424 | 1424 |
("messages", ht.NoDefault, ht.NoType, None), |
1425 | 1425 |
("fail", ht.NoDefault, ht.NoType, None), |
1426 |
("submit_jobs", None, ht.NoType, None), |
|
1426 | 1427 |
] |
1427 | 1428 |
WITH_LU = False |
1428 | 1429 |
|
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
428 | 428 |
def __init__(self): |
429 | 429 |
self._acquired = False |
430 | 430 |
self._updates = [] |
431 |
self._submitted = [] |
|
432 |
|
|
433 |
self._submit_count = itertools.count(1000) |
|
431 | 434 |
|
432 | 435 |
def IsAcquired(self): |
433 | 436 |
return self._acquired |
... | ... | |
435 | 438 |
def GetNextUpdate(self): |
436 | 439 |
return self._updates.pop(0) |
437 | 440 |
|
441 |
def GetNextSubmittedJob(self): |
|
442 |
return self._submitted.pop(0) |
|
443 |
|
|
438 | 444 |
def acquire(self, shared=0): |
439 | 445 |
assert shared == 1 |
440 | 446 |
self._acquired = True |
... | ... | |
447 | 453 |
assert self._acquired, "Lock not acquired while updating job" |
448 | 454 |
self._updates.append((job, bool(replicate))) |
449 | 455 |
|
456 |
def SubmitManyJobs(self, jobs): |
|
457 |
assert not self._acquired, "Lock acquired while submitting jobs" |
|
458 |
job_ids = [self._submit_count.next() for _ in jobs] |
|
459 |
self._submitted.extend(zip(job_ids, jobs)) |
|
460 |
return job_ids |
|
461 |
|
|
450 | 462 |
|
451 | 463 |
class _FakeExecOpCodeForProc: |
452 | 464 |
def __init__(self, queue, before_start, after_start): |
... | ... | |
473 | 485 |
if op.fail: |
474 | 486 |
raise errors.OpExecError("Error requested (%s)" % op.result) |
475 | 487 |
|
488 |
if hasattr(op, "submit_jobs") and op.submit_jobs is not None: |
|
489 |
return cbs.SubmitManyJobs(op.submit_jobs) |
|
490 |
|
|
476 | 491 |
return op.result |
477 | 492 |
|
478 | 493 |
|
... | ... | |
1065 | 1080 |
self.assertFalse(job.GetLogEntries(count)) |
1066 | 1081 |
self.assertFalse(job.GetLogEntries(count + 3)) |
1067 | 1082 |
|
1083 |
def testSubmitManyJobs(self): |
|
1084 |
queue = _FakeQueueForProc() |
|
1085 |
|
|
1086 |
job_id = 15656 |
|
1087 |
ops = [ |
|
1088 |
opcodes.OpTestDummy(result="Res0", fail=False, |
|
1089 |
submit_jobs=[]), |
|
1090 |
opcodes.OpTestDummy(result="Res1", fail=False, |
|
1091 |
submit_jobs=[ |
|
1092 |
[opcodes.OpTestDummy(result="r1j0", fail=False)], |
|
1093 |
]), |
|
1094 |
opcodes.OpTestDummy(result="Res2", fail=False, |
|
1095 |
submit_jobs=[ |
|
1096 |
[opcodes.OpTestDummy(result="r2j0o0", fail=False), |
|
1097 |
opcodes.OpTestDummy(result="r2j0o1", fail=False), |
|
1098 |
opcodes.OpTestDummy(result="r2j0o2", fail=False), |
|
1099 |
opcodes.OpTestDummy(result="r2j0o3", fail=False)], |
|
1100 |
[opcodes.OpTestDummy(result="r2j1", fail=False)], |
|
1101 |
[opcodes.OpTestDummy(result="r2j3o0", fail=False), |
|
1102 |
opcodes.OpTestDummy(result="r2j3o1", fail=False)], |
|
1103 |
]), |
|
1104 |
] |
|
1105 |
|
|
1106 |
# Create job |
|
1107 |
job = self._CreateJob(queue, job_id, ops) |
|
1108 |
|
|
1109 |
def _BeforeStart(timeout, priority): |
|
1110 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1111 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1112 |
self.assertFalse(queue.IsAcquired()) |
|
1113 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) |
|
1114 |
self.assertFalse(job.cur_opctx) |
|
1115 |
|
|
1116 |
def _AfterStart(op, cbs): |
|
1117 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1118 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1119 |
|
|
1120 |
self.assertFalse(queue.IsAcquired()) |
|
1121 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) |
|
1122 |
self.assertFalse(job.cur_opctx) |
|
1123 |
|
|
1124 |
# Job is running, cancelling shouldn't be possible |
|
1125 |
(success, _) = job.Cancel() |
|
1126 |
self.assertFalse(success) |
|
1127 |
|
|
1128 |
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart) |
|
1129 |
|
|
1130 |
for idx in range(len(ops)): |
|
1131 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1132 |
result = jqueue._JobProcessor(queue, opexec, job)() |
|
1133 |
self.assertEqual(queue.GetNextUpdate(), (job, True)) |
|
1134 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1135 |
if idx == len(ops) - 1: |
|
1136 |
# Last opcode |
|
1137 |
self.assert_(result) |
|
1138 |
else: |
|
1139 |
self.assertFalse(result) |
|
1140 |
|
|
1141 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
1142 |
self.assert_(job.start_timestamp) |
|
1143 |
self.assertFalse(job.end_timestamp) |
|
1144 |
|
|
1145 |
self.assertRaises(IndexError, queue.GetNextUpdate) |
|
1146 |
|
|
1147 |
for idx, submitted_ops in enumerate(job_ops |
|
1148 |
for op in ops |
|
1149 |
for job_ops in op.submit_jobs): |
|
1150 |
self.assertEqual(queue.GetNextSubmittedJob(), |
|
1151 |
(1000 + idx, submitted_ops)) |
|
1152 |
self.assertRaises(IndexError, queue.GetNextSubmittedJob) |
|
1153 |
|
|
1154 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) |
|
1155 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) |
|
1156 |
self.assertEqual(job.GetInfo(["opresult"]), |
|
1157 |
[[[], [1000], [1001, 1002, 1003]]]) |
|
1158 |
self.assertEqual(job.GetInfo(["opstatus"]), |
|
1159 |
[len(job.ops) * [constants.OP_STATUS_SUCCESS]]) |
|
1160 |
|
|
1161 |
self._GenericCheckJob(job) |
|
1162 |
|
|
1163 |
# Finished jobs can't be processed any further |
|
1164 |
self.assertRaises(errors.ProgrammerError, |
|
1165 |
jqueue._JobProcessor(queue, opexec, job)) |
|
1166 |
|
|
1068 | 1167 |
|
1069 | 1168 |
class _FakeTimeoutStrategy: |
1070 | 1169 |
def __init__(self, timeouts): |
Also available in: Unified diff