Revision 6a373640

b/lib/cmdlib.py
74 74
  return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
75 75

  
76 76

  
77
# End types
77
class ResultWithJobs:
78
  """Data container for LU results with jobs.
79

  
80
  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
81
  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
82
  contained in the C{jobs} attribute and include the job IDs in the opcode
83
  result.
84

  
85
  """
86
  def __init__(self, jobs, **kwargs):
87
    """Initializes this class.
88

  
89
    Additional return values can be specified as keyword arguments.
90

  
91
    @type jobs: list of lists of L{opcode.OpCode}
92
    @param jobs: A list of lists of opcode objects
93

  
94
    """
95
    self.jobs = jobs
96
    self.other = kwargs
97

  
98

  
78 99
class LogicalUnit(object):
79 100
  """Logical Unit base class.
80 101

  
b/lib/constants.py
529 529
# Disk index separator
530 530
DISK_SEPARATOR = _autoconf.DISK_SEPARATOR
531 531

  
532
#: Key for job IDs in opcode result
533
JOB_IDS_KEY = "jobs"
534

  
532 535
# runparts results
533 536
(RUNPARTS_SKIP,
534 537
 RUNPARTS_RUN,
b/lib/jqueue.py
540 540
    # Cancel here if we were asked to
541 541
    self._CheckCancel()
542 542

  
543
  def SubmitManyJobs(self, jobs):
544
    """Submits jobs for processing.
545

  
546
    See L{JobQueue.SubmitManyJobs}.
547

  
548
    """
549
    # Locking is done in job queue
550
    return self._queue.SubmitManyJobs(jobs)
551

  
543 552

  
544 553
class _JobChangesChecker(object):
545 554
  def __init__(self, fields, prev_job_info, prev_log_serial):
b/lib/mcpu.py
144 144

  
145 145
    """
146 146

  
147
  def SubmitManyJobs(self, jobs):
148
    """Submits jobs for processing.
149

  
150
    See L{jqueue.JobQueue.SubmitManyJobs}.
151

  
152
    """
153
    raise NotImplementedError
154

  
147 155

  
148 156
def _LUNameForOpName(opname):
149 157
  """Computes the LU name for a given OpCode name.
......
209 217

  
210 218
    return acquired
211 219

  
220
  def _ProcessResult(self, result):
221
    """
222

  
223
    """
224
    if isinstance(result, cmdlib.ResultWithJobs):
225
      # Submit jobs
226
      job_submission = self._cbs.SubmitManyJobs(result.jobs)
227

  
228
      # Build dictionary
229
      result = result.other
230

  
231
      assert constants.JOB_IDS_KEY not in result, \
232
        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
233

  
234
      result[constants.JOB_IDS_KEY] = job_submission
235

  
236
    return result
237

  
212 238
  def _ExecLU(self, lu):
213 239
    """Logical Unit execution sequence.
214 240

  
......
229 255
      return lu.dry_run_result
230 256

  
231 257
    try:
232
      result = lu.Exec(self.Log)
258
      result = self._ProcessResult(lu.Exec(self.Log))
233 259
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
234 260
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
235 261
                                self.Log, result)
b/lib/opcodes.py
1423 1423
    ("result", ht.NoDefault, ht.NoType, None),
1424 1424
    ("messages", ht.NoDefault, ht.NoType, None),
1425 1425
    ("fail", ht.NoDefault, ht.NoType, None),
1426
    ("submit_jobs", None, ht.NoType, None),
1426 1427
    ]
1427 1428
  WITH_LU = False
1428 1429

  
b/test/ganeti.jqueue_unittest.py
428 428
  def __init__(self):
429 429
    self._acquired = False
430 430
    self._updates = []
431
    self._submitted = []
432

  
433
    self._submit_count = itertools.count(1000)
431 434

  
432 435
  def IsAcquired(self):
433 436
    return self._acquired
......
435 438
  def GetNextUpdate(self):
436 439
    return self._updates.pop(0)
437 440

  
441
  def GetNextSubmittedJob(self):
442
    return self._submitted.pop(0)
443

  
438 444
  def acquire(self, shared=0):
439 445
    assert shared == 1
440 446
    self._acquired = True
......
447 453
    assert self._acquired, "Lock not acquired while updating job"
448 454
    self._updates.append((job, bool(replicate)))
449 455

  
456
  def SubmitManyJobs(self, jobs):
457
    assert not self._acquired, "Lock acquired while submitting jobs"
458
    job_ids = [self._submit_count.next() for _ in jobs]
459
    self._submitted.extend(zip(job_ids, jobs))
460
    return job_ids
461

  
450 462

  
451 463
class _FakeExecOpCodeForProc:
452 464
  def __init__(self, queue, before_start, after_start):
......
473 485
    if op.fail:
474 486
      raise errors.OpExecError("Error requested (%s)" % op.result)
475 487

  
488
    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
489
      return cbs.SubmitManyJobs(op.submit_jobs)
490

  
476 491
    return op.result
477 492

  
478 493

  
......
1065 1080
    self.assertFalse(job.GetLogEntries(count))
1066 1081
    self.assertFalse(job.GetLogEntries(count + 3))
1067 1082

  
1083
  def testSubmitManyJobs(self):
1084
    queue = _FakeQueueForProc()
1085

  
1086
    job_id = 15656
1087
    ops = [
1088
      opcodes.OpTestDummy(result="Res0", fail=False,
1089
                          submit_jobs=[]),
1090
      opcodes.OpTestDummy(result="Res1", fail=False,
1091
                          submit_jobs=[
1092
                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
1093
                            ]),
1094
      opcodes.OpTestDummy(result="Res2", fail=False,
1095
                          submit_jobs=[
1096
                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
1097
                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
1098
                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
1099
                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
1100
                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
1101
                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
1102
                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
1103
                            ]),
1104
      ]
1105

  
1106
    # Create job
1107
    job = self._CreateJob(queue, job_id, ops)
1108

  
1109
    def _BeforeStart(timeout, priority):
1110
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1111
      self.assertRaises(IndexError, queue.GetNextUpdate)
1112
      self.assertFalse(queue.IsAcquired())
1113
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
1114
      self.assertFalse(job.cur_opctx)
1115

  
1116
    def _AfterStart(op, cbs):
1117
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1118
      self.assertRaises(IndexError, queue.GetNextUpdate)
1119

  
1120
      self.assertFalse(queue.IsAcquired())
1121
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
1122
      self.assertFalse(job.cur_opctx)
1123

  
1124
      # Job is running, cancelling shouldn't be possible
1125
      (success, _) = job.Cancel()
1126
      self.assertFalse(success)
1127

  
1128
    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
1129

  
1130
    for idx in range(len(ops)):
1131
      self.assertRaises(IndexError, queue.GetNextUpdate)
1132
      result = jqueue._JobProcessor(queue, opexec, job)()
1133
      self.assertEqual(queue.GetNextUpdate(), (job, True))
1134
      self.assertRaises(IndexError, queue.GetNextUpdate)
1135
      if idx == len(ops) - 1:
1136
        # Last opcode
1137
        self.assert_(result)
1138
      else:
1139
        self.assertFalse(result)
1140

  
1141
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
1142
        self.assert_(job.start_timestamp)
1143
        self.assertFalse(job.end_timestamp)
1144

  
1145
    self.assertRaises(IndexError, queue.GetNextUpdate)
1146

  
1147
    for idx, submitted_ops in enumerate(job_ops
1148
                                        for op in ops
1149
                                        for job_ops in op.submit_jobs):
1150
      self.assertEqual(queue.GetNextSubmittedJob(),
1151
                       (1000 + idx, submitted_ops))
1152
    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
1153

  
1154
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
1155
    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
1156
    self.assertEqual(job.GetInfo(["opresult"]),
1157
                     [[[], [1000], [1001, 1002, 1003]]])
1158
    self.assertEqual(job.GetInfo(["opstatus"]),
1159
                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
1160

  
1161
    self._GenericCheckJob(job)
1162

  
1163
    # Finished jobs can't be processed any further
1164
    self.assertRaises(errors.ProgrammerError,
1165
                      jqueue._JobProcessor(queue, opexec, job))
1166

  
1068 1167

  
1069 1168
class _FakeTimeoutStrategy:
1070 1169
  def __init__(self, timeouts):

Also available in: Unified diff