Revision a06c6ae8
b/lib/jqueue.py | ||
---|---|---|
57 | 57 |
from ganeti import netutils |
58 | 58 |
from ganeti import compat |
59 | 59 |
from ganeti import ht |
60 |
from ganeti import query |
|
61 |
from ganeti import qlang |
|
60 | 62 |
|
61 | 63 |
|
62 | 64 |
JOBQUEUE_THREADS = 25 |
... | ... | |
83 | 85 |
return utils.SplitTime(time.time()) |
84 | 86 |
|
85 | 87 |
|
88 |
class _SimpleJobQuery: |
|
89 |
"""Wrapper for job queries. |
|
90 |
|
|
91 |
Instance keeps list of fields cached, useful e.g. in L{_JobChangesChecker}. |
|
92 |
|
|
93 |
""" |
|
94 |
def __init__(self, fields): |
|
95 |
"""Initializes this class. |
|
96 |
|
|
97 |
""" |
|
98 |
self._query = query.Query(query.JOB_FIELDS, fields) |
|
99 |
|
|
100 |
def __call__(self, job): |
|
101 |
"""Executes a job query using cached field list. |
|
102 |
|
|
103 |
""" |
|
104 |
return self._query.OldStyleQuery([(job.id, job)], sort_by_name=False)[0] |
|
105 |
|
|
106 |
|
|
86 | 107 |
class _QueuedOpCode(object): |
87 | 108 |
"""Encapsulates an opcode object. |
88 | 109 |
|
... | ... | |
383 | 404 |
has been passed |
384 | 405 |
|
385 | 406 |
""" |
386 |
row = [] |
|
387 |
for fname in fields: |
|
388 |
if fname == "id": |
|
389 |
row.append(self.id) |
|
390 |
elif fname == "status": |
|
391 |
row.append(self.CalcStatus()) |
|
392 |
elif fname == "priority": |
|
393 |
row.append(self.CalcPriority()) |
|
394 |
elif fname == "ops": |
|
395 |
row.append([op.input.__getstate__() for op in self.ops]) |
|
396 |
elif fname == "opresult": |
|
397 |
row.append([op.result for op in self.ops]) |
|
398 |
elif fname == "opstatus": |
|
399 |
row.append([op.status for op in self.ops]) |
|
400 |
elif fname == "oplog": |
|
401 |
row.append([op.log for op in self.ops]) |
|
402 |
elif fname == "opstart": |
|
403 |
row.append([op.start_timestamp for op in self.ops]) |
|
404 |
elif fname == "opexec": |
|
405 |
row.append([op.exec_timestamp for op in self.ops]) |
|
406 |
elif fname == "opend": |
|
407 |
row.append([op.end_timestamp for op in self.ops]) |
|
408 |
elif fname == "oppriority": |
|
409 |
row.append([op.priority for op in self.ops]) |
|
410 |
elif fname == "received_ts": |
|
411 |
row.append(self.received_timestamp) |
|
412 |
elif fname == "start_ts": |
|
413 |
row.append(self.start_timestamp) |
|
414 |
elif fname == "end_ts": |
|
415 |
row.append(self.end_timestamp) |
|
416 |
elif fname == "summary": |
|
417 |
row.append([op.input.Summary() for op in self.ops]) |
|
418 |
else: |
|
419 |
raise errors.OpExecError("Invalid self query field '%s'" % fname) |
|
420 |
return row |
|
407 |
return _SimpleJobQuery(fields)(self) |
|
421 | 408 |
|
422 | 409 |
def MarkUnfinishedOps(self, status, result): |
423 | 410 |
"""Mark unfinished opcodes with a given status and result. |
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
305 | 305 |
self.assertEqual(len(job.ops), len(ops)) |
306 | 306 |
self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__() |
307 | 307 |
for (inp, op) in zip(ops, job.ops))) |
308 |
self.assertRaises(errors.OpExecError, job.GetInfo,
|
|
308 |
self.assertRaises(errors.OpPrereqError, job.GetInfo,
|
|
309 | 309 |
["unknown-field"]) |
310 | 310 |
self.assertEqual(job.GetInfo(["summary"]), |
311 | 311 |
[[op.input.Summary() for op in job.ops]]) |
... | ... | |
674 | 674 |
for i in range(opcount)] |
675 | 675 |
|
676 | 676 |
# Create job |
677 |
job = self._CreateJob(queue, job_id, ops)
|
|
677 |
job = self._CreateJob(queue, str(job_id), ops)
|
|
678 | 678 |
|
679 | 679 |
opexec = _FakeExecOpCodeForProc(queue, None, None) |
680 | 680 |
|
... | ... | |
702 | 702 |
|
703 | 703 |
# Check job status |
704 | 704 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR) |
705 |
self.assertEqual(job.GetInfo(["id"]), [job_id])
|
|
705 |
self.assertEqual(job.GetInfo(["id"]), [str(job_id)])
|
|
706 | 706 |
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_ERROR]) |
707 | 707 |
|
708 | 708 |
# Check opcode status |
... | ... | |
926 | 926 |
for i in range(3)] |
927 | 927 |
|
928 | 928 |
# Create job |
929 |
job_id = 28492
|
|
929 |
job_id = str(28492)
|
|
930 | 930 |
job = self._CreateJob(queue, job_id, ops) |
931 | 931 |
|
932 | 932 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
Also available in: Unified diff