Revision a06c6ae8

b/lib/jqueue.py
57 57
from ganeti import netutils
58 58
from ganeti import compat
59 59
from ganeti import ht
60
from ganeti import query
61
from ganeti import qlang
60 62

  
61 63

  
62 64
JOBQUEUE_THREADS = 25
......
83 85
  return utils.SplitTime(time.time())
84 86

  
85 87

  
88
class _SimpleJobQuery:
89
  """Wrapper for job queries.
90

  
91
  Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}.
92

  
93
  """
94
  def __init__(self, fields):
95
    """Initializes this class.
96

  
97
    """
98
    self._query = query.Query(query.JOB_FIELDS, fields)
99

  
100
  def __call__(self, job):
101
    """Executes a job query using cached field list.
102

  
103
    """
104
    return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0]
105

  
106

  
86 107
class _QueuedOpCode(object):
87 108
  """Encapsulates an opcode object.
88 109

  
......
383 404
        has been passed
384 405

  
385 406
    """
386
    row = []
387
    for fname in fields:
388
      if fname == "id":
389
        row.append(self.id)
390
      elif fname == "status":
391
        row.append(self.CalcStatus())
392
      elif fname == "priority":
393
        row.append(self.CalcPriority())
394
      elif fname == "ops":
395
        row.append([op.input.__getstate__() for op in self.ops])
396
      elif fname == "opresult":
397
        row.append([op.result for op in self.ops])
398
      elif fname == "opstatus":
399
        row.append([op.status for op in self.ops])
400
      elif fname == "oplog":
401
        row.append([op.log for op in self.ops])
402
      elif fname == "opstart":
403
        row.append([op.start_timestamp for op in self.ops])
404
      elif fname == "opexec":
405
        row.append([op.exec_timestamp for op in self.ops])
406
      elif fname == "opend":
407
        row.append([op.end_timestamp for op in self.ops])
408
      elif fname == "oppriority":
409
        row.append([op.priority for op in self.ops])
410
      elif fname == "received_ts":
411
        row.append(self.received_timestamp)
412
      elif fname == "start_ts":
413
        row.append(self.start_timestamp)
414
      elif fname == "end_ts":
415
        row.append(self.end_timestamp)
416
      elif fname == "summary":
417
        row.append([op.input.Summary() for op in self.ops])
418
      else:
419
        raise errors.OpExecError("Invalid self query field '%s'" % fname)
420
    return row
407
    return _SimpleJobQuery(fields)(self)
421 408

  
422 409
  def MarkUnfinishedOps(self, status, result):
423 410
    """Mark unfinished opcodes with a given status and result.
b/test/ganeti.jqueue_unittest.py
305 305
      self.assertEqual(len(job.ops), len(ops))
306 306
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
307 307
                              for (inp, op) in zip(ops, job.ops)))
308
      self.assertRaises(errors.OpExecError, job.GetInfo,
308
      self.assertRaises(errors.OpPrereqError, job.GetInfo,
309 309
                        ["unknown-field"])
310 310
      self.assertEqual(job.GetInfo(["summary"]),
311 311
                       [[op.input.Summary() for op in job.ops]])
......
674 674
             for i in range(opcount)]
675 675

  
676 676
      # Create job
677
      job = self._CreateJob(queue, job_id, ops)
677
      job = self._CreateJob(queue, str(job_id), ops)
678 678

  
679 679
      opexec = _FakeExecOpCodeForProc(queue, None, None)
680 680

  
......
702 702

  
703 703
      # Check job status
704 704
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
705
      self.assertEqual(job.GetInfo(["id"]), [job_id])
705
      self.assertEqual(job.GetInfo(["id"]), [str(job_id)])
706 706
      self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR])
707 707

  
708 708
      # Check opcode status
......
926 926
           for i in range(3)]
927 927

  
928 928
    # Create job
929
    job_id = 28492
929
    job_id = str(28492)
930 930
    job = self._CreateJob(queue, job_id, ops)
931 931

  
932 932
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)

Also available in: Unified diff