Revision 031a3e57 lib/jqueue.py

b/lib/jqueue.py
334 334
      not_marked = False
335 335

  
336 336

  
337
class _JobQueueWorker(workerpool.BaseWorker):
338
  """The actual job workers.
337
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
338
  def __init__(self, queue, job, op):
339
    """Initializes this class.
339 340

  
340
  """
341
  def _NotifyStart(self):
341
    @type queue: L{JobQueue}
342
    @param queue: Job queue
343
    @type job: L{_QueuedJob}
344
    @param job: Job object
345
    @type op: L{_QueuedOpCode}
346
    @param op: OpCode
347

  
348
    """
349
    assert queue, "Queue is missing"
350
    assert job, "Job is missing"
351
    assert op, "Opcode is missing"
352

  
353
    self._queue = queue
354
    self._job = job
355
    self._op = op
356

  
357
  def NotifyStart(self):
342 358
    """Mark the opcode as running, not lock-waiting.
343 359

  
344
    This is called from the mcpu code as a notifier function, when the
345
    LU is finally about to start the Exec() method. Of course, to have
346
    end-user visible results, the opcode must be initially (before
347
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
360
    This is called from the mcpu code as a notifier function, when the LU is
361
    finally about to start the Exec() method. Of course, to have end-user
362
    visible results, the opcode must be initially (before calling into
363
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348 364

  
349 365
    """
350
    assert self.queue, "Queue attribute is missing"
351
    assert self.opcode, "Opcode attribute is missing"
352

  
353
    self.queue.acquire()
366
    self._queue.acquire()
354 367
    try:
355
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356
                                    constants.OP_STATUS_CANCELING)
368
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
369
                                 constants.OP_STATUS_CANCELING)
357 370

  
358 371
      # Cancel here if we were asked to
359
      if self.opcode.status == constants.OP_STATUS_CANCELING:
372
      if self._op.status == constants.OP_STATUS_CANCELING:
360 373
        raise CancelJob()
361 374

  
362
      self.opcode.status = constants.OP_STATUS_RUNNING
375
      self._op.status = constants.OP_STATUS_RUNNING
363 376
    finally:
364
      self.queue.release()
377
      self._queue.release()
378

  
379
  def Feedback(self, *args):
380
    """Append a log entry.
381

  
382
    """
383
    assert len(args) < 3
365 384

  
385
    if len(args) == 1:
386
      log_type = constants.ELOG_MESSAGE
387
      log_msg = args[0]
388
    else:
389
      (log_type, log_msg) = args
390

  
391
    # The time is split to make serialization easier and not lose
392
    # precision.
393
    timestamp = utils.SplitTime(time.time())
394

  
395
    self._queue.acquire()
396
    try:
397
      self._job.log_serial += 1
398
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
399

  
400
      self._job.change.notifyAll()
401
    finally:
402
      self._queue.release()
403

  
404

  
405
class _JobQueueWorker(workerpool.BaseWorker):
406
  """The actual job workers.
407

  
408
  """
366 409
  def RunTask(self, job):
367 410
    """Job executor.
368 411

  
......
376 419
    logging.info("Worker %s processing job %s",
377 420
                  self.worker_id, job.id)
378 421
    proc = mcpu.Processor(self.pool.queue.context)
379
    self.queue = queue = job.queue
422
    queue = job.queue
380 423
    try:
381 424
      try:
382 425
        count = len(job.ops)
......
412 455
            finally:
413 456
              queue.release()
414 457

  
415
            def _Log(*args):
416
              """Append a log entry.
417

  
418
              """
419
              assert len(args) < 3
420

  
421
              if len(args) == 1:
422
                log_type = constants.ELOG_MESSAGE
423
                log_msg = args[0]
424
              else:
425
                log_type, log_msg = args
426

  
427
              # The time is split to make serialization easier and not lose
428
              # precision.
429
              timestamp = utils.SplitTime(time.time())
430

  
431
              queue.acquire()
432
              try:
433
                job.log_serial += 1
434
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
435

  
436
                job.change.notifyAll()
437
              finally:
438
                queue.release()
439

  
440
            # Make sure not to hold lock while _Log is called
441
            self.opcode = op
442
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
458
            # Make sure not to hold queue lock while calling ExecOpCode
459
            result = proc.ExecOpCode(input_opcode,
460
                                     _OpCodeExecCallbacks(queue, job, op))
443 461

  
444 462
            queue.acquire()
445 463
            try:

Also available in: Unified diff