Revision ef2df7d3 lib/jqueue.py

b/lib/jqueue.py
153 153
  @ivar received_timestamp: the timestamp for when the job was received
154 154
  @ivar start_timestmap: the timestamp for start of execution
155 155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar lock_status: In-memory locking information for debugging
156 157
  @ivar change: a Condition variable we use for waiting for job changes
157 158

  
158 159
  """
159 160
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160 161
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "lock_status", "change",
162 163
               "__weakref__"]
163 164

  
164 165
  def __init__(self, queue, job_id, ops):
......
186 187
    self.start_timestamp = None
187 188
    self.end_timestamp = None
188 189

  
190
    # In-memory attributes
191
    self.lock_status = None
192

  
189 193
    # Condition to wait for changes
190 194
    self.change = threading.Condition(self.queue._lock)
191 195

  
......
209 213
    obj.start_timestamp = state.get("start_timestamp", None)
210 214
    obj.end_timestamp = state.get("end_timestamp", None)
211 215

  
216
    # In-memory attributes
217
    obj.lock_status = None
218

  
212 219
    obj.ops = []
213 220
    obj.log_serial = 0
214 221
    for op_state in state["ops"]:
......
334 341
      not_marked = False
335 342

  
336 343

  
337
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
344
class _OpExecCallbacks(mcpu.OpExecCbBase):
338 345
  def __init__(self, queue, job, op):
339 346
    """Initializes this class.
340 347

  
......
368 375
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
369 376
                                 constants.OP_STATUS_CANCELING)
370 377

  
378
      # All locks are acquired by now
379
      self._job.lock_status = None
380

  
371 381
      # Cancel here if we were asked to
372 382
      if self._op.status == constants.OP_STATUS_CANCELING:
373 383
        raise CancelJob()
......
401 411
    finally:
402 412
      self._queue.release()
403 413

  
414
  def ReportLocks(self, msg):
415
    """Write locking information to the job.
416

  
417
    Called whenever the LU processor is waiting for a lock or has acquired one.
418

  
419
    """
420
    # Not getting the queue lock because this is a single assignment
421
    self._job.lock_status = msg
422

  
404 423

  
405 424
class _JobQueueWorker(workerpool.BaseWorker):
406 425
  """The actual job workers.
......
457 476

  
458 477
            # Make sure not to hold queue lock while calling ExecOpCode
459 478
            result = proc.ExecOpCode(input_opcode,
460
                                     _OpCodeExecCallbacks(queue, job, op))
479
                                     _OpExecCallbacks(queue, job, op))
461 480

  
462 481
            queue.acquire()
463 482
            try:
......
505 524
      queue.acquire()
506 525
      try:
507 526
        try:
527
          job.lock_status = None
508 528
          job.run_op_index = -1
509 529
          job.end_timestamp = TimeStampNow()
510 530
          queue.UpdateJobUnlocked(job)
......
513 533
          status = job.CalcStatus()
514 534
      finally:
515 535
        queue.release()
536

  
516 537
      logging.info("Worker %s finished job %s, status = %s",
517 538
                   self.worker_id, job_id, status)
518 539

  
......
1081 1102

  
1082 1103
    return results
1083 1104

  
1084

  
1085 1105
  @_RequireOpenQueue
1086 1106
  def UpdateJobUnlocked(self, job):
1087 1107
    """Update a job's on disk storage.

Also available in: Unified diff