Revision 8f5c488d

b/lib/jqueue.py
95 95
  @ivar stop_timestamp: timestamp for the end of the execution
96 96

  
97 97
  """
98
  __slots__ = ["input", "status", "result", "log",
98
  __slots__ = ["input", "status", "result", "log", "priority",
99 99
               "start_timestamp", "exec_timestamp", "end_timestamp",
100 100
               "__weakref__"]
101 101

  
......
114 114
    self.exec_timestamp = None
115 115
    self.end_timestamp = None
116 116

  
117
    # Get initial priority (it might change during the lifetime of this opcode)
118
    self.priority = getattr(op, "priority", constants.OP_PRIO_DEFAULT)
119

  
117 120
  @classmethod
118 121
  def Restore(cls, state):
119 122
    """Restore the _QueuedOpCode from the serialized form.
......
132 135
    obj.start_timestamp = state.get("start_timestamp", None)
133 136
    obj.exec_timestamp = state.get("exec_timestamp", None)
134 137
    obj.end_timestamp = state.get("end_timestamp", None)
138
    obj.priority = state.get("priority", constants.OP_PRIO_DEFAULT)
135 139
    return obj
136 140

  
137 141
  def Serialize(self):
......
149 153
      "start_timestamp": self.start_timestamp,
150 154
      "exec_timestamp": self.exec_timestamp,
151 155
      "end_timestamp": self.end_timestamp,
156
      "priority": self.priority,
152 157
      }
153 158

  
154 159

  
......
302 307

  
303 308
    return status
304 309

  
310
  def CalcPriority(self):
311
    """Gets the current priority for this job.
312

  
313
    Only unfinished opcodes are considered. When all are done, the default
314
    priority is used.
315

  
316
    @rtype: int
317

  
318
    """
319
    priorities = [op.priority for op in self.ops
320
                  if op.status not in constants.OPS_FINALIZED]
321

  
322
    if not priorities:
323
      # All opcodes are done, assume default priority
324
      return constants.OP_PRIO_DEFAULT
325

  
326
    return min(priorities)
327

  
305 328
  def GetLogEntries(self, newer_than):
306 329
    """Selectively returns the log entries.
307 330

  
b/lib/opcodes.py
117 117
               children of this class.
118 118
  @ivar dry_run: Whether the LU should be run in dry-run mode, i.e. just
119 119
                 the check steps
120
  @ivar priority: Opcode priority for queue
120 121

  
121 122
  """
122 123
  OP_ID = "OP_ABSTRACT"
123
  __slots__ = ["dry_run", "debug_level"]
124
  __slots__ = ["dry_run", "debug_level", "priority"]
124 125

  
125 126
  def __getstate__(self):
126 127
    """Specialized getstate for opcodes.
b/test/ganeti.jqueue_unittest.py
32 32
from ganeti import utils
33 33
from ganeti import errors
34 34
from ganeti import jqueue
35
from ganeti import opcodes
36
from ganeti import compat
35 37

  
36 38
import testutils
37 39

  
......
239 241
    self.assertRaises(errors.OpExecError, errors.MaybeRaise, encerr)
240 242

  
241 243

  
244
class TestQueuedOpCode(unittest.TestCase):
245
  def testDefaults(self):
246
    def _Check(op):
247
      self.assertFalse(hasattr(op.input, "dry_run"))
248
      self.assertEqual(op.priority, constants.OP_PRIO_DEFAULT)
249
      self.assertFalse(op.log)
250
      self.assert_(op.start_timestamp is None)
251
      self.assert_(op.exec_timestamp is None)
252
      self.assert_(op.end_timestamp is None)
253
      self.assert_(op.result is None)
254
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
255

  
256
    op1 = jqueue._QueuedOpCode(opcodes.OpTestDelay())
257
    _Check(op1)
258
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
259
    _Check(op2)
260
    self.assertEqual(op1.Serialize(), op2.Serialize())
261

  
262
  def testPriority(self):
263
    def _Check(op):
264
      assert constants.OP_PRIO_DEFAULT != constants.OP_PRIO_HIGH, \
265
             "Default priority equals high priority; test can't work"
266
      self.assertEqual(op.priority, constants.OP_PRIO_HIGH)
267
      self.assertEqual(op.status, constants.OP_STATUS_QUEUED)
268

  
269
    inpop = opcodes.OpGetTags(priority=constants.OP_PRIO_HIGH)
270
    op1 = jqueue._QueuedOpCode(inpop)
271
    _Check(op1)
272
    op2 = jqueue._QueuedOpCode.Restore(op1.Serialize())
273
    _Check(op2)
274
    self.assertEqual(op1.Serialize(), op2.Serialize())
275

  
276

  
277
class TestQueuedJob(unittest.TestCase):
278
  def testDefaults(self):
279
    job_id = 4260
280
    ops = [
281
      opcodes.OpGetTags(),
282
      opcodes.OpTestDelay(),
283
      ]
284

  
285
    def _Check(job):
286
      self.assertEqual(job.id, job_id)
287
      self.assertEqual(job.log_serial, 0)
288
      self.assert_(job.received_timestamp)
289
      self.assert_(job.start_timestamp is None)
290
      self.assert_(job.end_timestamp is None)
291
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
292
      self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
293
      self.assert_(repr(job).startswith("<"))
294
      self.assertEqual(len(job.ops), len(ops))
295
      self.assert_(compat.all(inp.__getstate__() == op.input.__getstate__()
296
                              for (inp, op) in zip(ops, job.ops)))
297

  
298
    job1 = jqueue._QueuedJob(None, job_id, ops)
299
    _Check(job1)
300
    job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
301
    _Check(job2)
302
    self.assertEqual(job1.Serialize(), job2.Serialize())
303

  
304
  def testPriority(self):
305
    job_id = 4283
306
    ops = [
307
      opcodes.OpGetTags(priority=constants.OP_PRIO_DEFAULT),
308
      opcodes.OpTestDelay(),
309
      ]
310

  
311
    def _Check(job):
312
      self.assertEqual(job.id, job_id)
313
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
314
      self.assert_(repr(job).startswith("<"))
315

  
316
    job = jqueue._QueuedJob(None, job_id, ops)
317
    _Check(job)
318
    self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
319
                            for op in job.ops))
320
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
321

  
322
    # Increase first
323
    job.ops[0].priority -= 1
324
    _Check(job)
325
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 1)
326

  
327
    # Mark opcode as finished
328
    job.ops[0].status = constants.OP_STATUS_SUCCESS
329
    _Check(job)
330
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT)
331

  
332
    # Increase second
333
    job.ops[1].priority -= 10
334
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 10)
335

  
336
    # Test increasing first
337
    job.ops[0].status = constants.OP_STATUS_RUNNING
338
    job.ops[0].priority -= 19
339
    self.assertEqual(job.CalcPriority(), constants.OP_PRIO_DEFAULT - 20)
340

  
341

  
242 342
if __name__ == "__main__":
243 343
  testutils.GanetiTestProgram()

Also available in: Unified diff