Revision 8f5c488d
b/lib/jqueue.py | ||
---|---|---|
95 | 95 |
@ivar stop_timestamp: timestamp for the end of the execution |
96 | 96 |
|
97 | 97 |
""" |
98 |
__slots__ = ["input", "status", "result", "log", |
|
98 |
__slots__ = ["input", "status", "result", "log", "priority",
|
|
99 | 99 |
"start_timestamp", "exec_timestamp", "end_timestamp", |
100 | 100 |
"__weakref__"] |
101 | 101 |
|
... | ... | |
114 | 114 |
self.exec_timestamp = None |
115 | 115 |
self.end_timestamp = None |
116 | 116 |
|
117 |
# Get initial priority (it might change during the lifetime of this opcode) |
|
118 |
self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT) |
|
119 |
|
|
117 | 120 |
@classmethod |
118 | 121 |
def Restore(cls, state): |
119 | 122 |
"""Restore the _QueuedOpCode from the serialized form. |
... | ... | |
132 | 135 |
obj.start_timestamp = state.get("start_timestamp", None) |
133 | 136 |
obj.exec_timestamp = state.get("exec_timestamp", None) |
134 | 137 |
obj.end_timestamp = state.get("end_timestamp", None) |
138 |
obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT) |
|
135 | 139 |
return obj |
136 | 140 |
|
137 | 141 |
def Serialize(self): |
... | ... | |
149 | 153 |
"start_timestamp": self.start_timestamp, |
150 | 154 |
"exec_timestamp": self.exec_timestamp, |
151 | 155 |
"end_timestamp": self.end_timestamp, |
156 |
"priority": self.priority, |
|
152 | 157 |
} |
153 | 158 |
|
154 | 159 |
|
... | ... | |
302 | 307 |
|
303 | 308 |
return status |
304 | 309 |
|
310 |
def CalcPriority(self): |
|
311 |
"""Gets the current priority for this job. |
|
312 |
|
|
313 |
Only unfinished opcodes are considered. When all are done, the default |
|
314 |
priority is used. |
|
315 |
|
|
316 |
@rtype: int |
|
317 |
|
|
318 |
""" |
|
319 |
priorities = [op.priority for op in self.ops |
|
320 |
if op.status not in constants.OPS_FINALIZED] |
|
321 |
|
|
322 |
if not priorities: |
|
323 |
# All opcodes are done, assume default priority |
|
324 |
return constants.OP_PRIO_DEFAULT |
|
325 |
|
|
326 |
return min(priorities) |
|
327 |
|
|
305 | 328 |
def GetLogEntries(self, newer_than): |
306 | 329 |
"""Selectively returns the log entries. |
307 | 330 |
|
b/lib/opcodes.py | ||
---|---|---|
117 | 117 |
children of this class. |
118 | 118 |
@ivar dry_run: Whether the LU should be run in dry-run mode, i.e. just |
119 | 119 |
the check steps |
120 |
@ivar priority: Opcode priority for queue |
|
120 | 121 |
|
121 | 122 |
""" |
122 | 123 |
OP_ID = "OP_ABSTRACT" |
123 |
__slots__ = ["dry_run", "debug_level"] |
|
124 |
__slots__ = ["dry_run", "debug_level", "priority"]
|
|
124 | 125 |
|
125 | 126 |
def __getstate__(self): |
126 | 127 |
"""Specialized getstate for opcodes. |
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
32 | 32 |
from ganeti import utils |
33 | 33 |
from ganeti import errors |
34 | 34 |
from ganeti import jqueue |
35 |
from ganeti import opcodes |
|
36 |
from ganeti import compat |
|
35 | 37 |
|
36 | 38 |
import testutils |
37 | 39 |
|
... | ... | |
239 | 241 |
self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr) |
240 | 242 |
|
241 | 243 |
|
244 |
class TestQueuedOpCode(unittest.TestCase): |
|
245 |
def testDefaults(self): |
|
246 |
def _Check(op): |
|
247 |
self.assertFalse(hasattr(op.input, "dry_run")) |
|
248 |
self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT) |
|
249 |
self.assertFalse(op.log) |
|
250 |
self.assert_(op.start_timestamp is None) |
|
251 |
self.assert_(op.exec_timestamp is None) |
|
252 |
self.assert_(op.end_timestamp is None) |
|
253 |
self.assert_(op.result is None) |
|
254 |
self.assertEqual(op.status, constants.OP_STATUS_QUEUED) |
|
255 |
|
|
256 |
op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay()) |
|
257 |
_Check(op1) |
|
258 |
op2 = jqueue._QueuedOpCode.Restore(op1.Serialize()) |
|
259 |
_Check(op2) |
|
260 |
self.assertEqual(op1.Serialize(), op2.Serialize()) |
|
261 |
|
|
262 |
def testPriority(self): |
|
263 |
def _Check(op): |
|
264 |
assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \ |
|
265 |
"Default priority equals high priority; test can't work" |
|
266 |
self.assertEqual(op.priority, constants.OP_PRIO_HIGH) |
|
267 |
self.assertEqual(op.status, constants.OP_STATUS_QUEUED) |
|
268 |
|
|
269 |
inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH) |
|
270 |
op1 = jqueue._QueuedOpCode(inpop) |
|
271 |
_Check(op1) |
|
272 |
op2 = jqueue._QueuedOpCode.Restore(op1.Serialize()) |
|
273 |
_Check(op2) |
|
274 |
self.assertEqual(op1.Serialize(), op2.Serialize()) |
|
275 |
|
|
276 |
|
|
277 |
class TestQueuedJob(unittest.TestCase): |
|
278 |
def testDefaults(self): |
|
279 |
job_id = 4260 |
|
280 |
ops = [ |
|
281 |
opcodes.OpGetTags(), |
|
282 |
opcodes.OpTestDelay(), |
|
283 |
] |
|
284 |
|
|
285 |
def _Check(job): |
|
286 |
self.assertEqual(job.id, job_id) |
|
287 |
self.assertEqual(job.log_serial, 0) |
|
288 |
self.assert_(job.received_timestamp) |
|
289 |
self.assert_(job.start_timestamp is None) |
|
290 |
self.assert_(job.end_timestamp is None) |
|
291 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
292 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) |
|
293 |
self.assert_(repr(job).startswith("<")) |
|
294 |
self.assertEqual(len(job.ops), len(ops)) |
|
295 |
self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__() |
|
296 |
for (inp, op) in zip(ops, job.ops))) |
|
297 |
|
|
298 |
job1 = jqueue._QueuedJob(None, job_id, ops) |
|
299 |
_Check(job1) |
|
300 |
job2 = jqueue._QueuedJob.Restore(None, job1.Serialize()) |
|
301 |
_Check(job2) |
|
302 |
self.assertEqual(job1.Serialize(), job2.Serialize()) |
|
303 |
|
|
304 |
def testPriority(self): |
|
305 |
job_id = 4283 |
|
306 |
ops = [ |
|
307 |
opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT), |
|
308 |
opcodes.OpTestDelay(), |
|
309 |
] |
|
310 |
|
|
311 |
def _Check(job): |
|
312 |
self.assertEqual(job.id, job_id) |
|
313 |
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) |
|
314 |
self.assert_(repr(job).startswith("<")) |
|
315 |
|
|
316 |
job = jqueue._QueuedJob(None, job_id, ops) |
|
317 |
_Check(job) |
|
318 |
self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT |
|
319 |
for op in job.ops)) |
|
320 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) |
|
321 |
|
|
322 |
# Increase first |
|
323 |
job.ops[0].priority -= 1 |
|
324 |
_Check(job) |
|
325 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1) |
|
326 |
|
|
327 |
# Mark opcode as finished |
|
328 |
job.ops[0].status = constants.OP_STATUS_SUCCESS |
|
329 |
_Check(job) |
|
330 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT) |
|
331 |
|
|
332 |
# Increase second |
|
333 |
job.ops[1].priority -= 10 |
|
334 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10) |
|
335 |
|
|
336 |
# Test increasing first |
|
337 |
job.ops[0].status = constants.OP_STATUS_RUNNING |
|
338 |
job.ops[0].priority -= 19 |
|
339 |
self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20) |
|
340 |
|
|
341 |
|
|
242 | 342 |
if __name__ == "__main__": |
243 | 343 |
testutils.GanetiTestProgram() |
Also available in: Unified diff