Revision e92376d7 lib/jqueue.py

b/lib/jqueue.py
159 159

  
160 160
      if op.status == constants.OP_STATUS_QUEUED:
161 161
        pass
162
      elif op.status == constants.OP_STATUS_WAITLOCK:
163
        status = constants.JOB_STATUS_WAITLOCK
162 164
      elif op.status == constants.OP_STATUS_RUNNING:
163 165
        status = constants.JOB_STATUS_RUNNING
164 166
      elif op.status == constants.OP_STATUS_ERROR:
......
188 190

  
189 191

  
190 192
class _JobQueueWorker(workerpool.BaseWorker):
193
  def _NotifyStart(self):
194
    """Mark the opcode as running, not lock-waiting.
195

  
196
    This is called from the mcpu code as a notifier function, when the
197
    LU is finally about to start the Exec() method. Of course, to have
198
    end-user visible results, the opcode must be initially (before
199
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
200

  
201
    """
202
    assert self.queue, "Queue attribute is missing"
203
    assert self.opcode, "Opcode attribute is missing"
204

  
205
    self.queue.acquire()
206
    try:
207
      self.opcode.status = constants.OP_STATUS_RUNNING
208
    finally:
209
      self.queue.release()
210

  
191 211
  def RunTask(self, job):
192 212
    """Job executor.
193 213

  
......
198 218
    logging.debug("Worker %s processing job %s",
199 219
                  self.worker_id, job.id)
200 220
    proc = mcpu.Processor(self.pool.queue.context)
201
    queue = job.queue
221
    self.queue = queue = job.queue
202 222
    try:
203 223
      try:
204 224
        count = len(job.ops)
......
209 229
            queue.acquire()
210 230
            try:
211 231
              job.run_op_index = idx
212
              op.status = constants.OP_STATUS_RUNNING
232
              op.status = constants.OP_STATUS_WAITLOCK
213 233
              op.result = None
214 234
              op.start_timestamp = TimeStampNow()
215 235
              if idx == 0: # first opcode
......
246 266
                queue.release()
247 267

  
248 268
            # Make sure not to hold lock while _Log is called
249
            result = proc.ExecOpCode(input_opcode, _Log)
269
            self.opcode = op
270
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
250 271

  
251 272
            queue.acquire()
252 273
            try:
......
365 386
        if status in (constants.JOB_STATUS_QUEUED, ):
366 387
          self._wpool.AddTask(job)
367 388

  
368
        elif status in (constants.JOB_STATUS_RUNNING, ):
389
        elif status in (constants.JOB_STATUS_RUNNING,
390
                        constants.JOB_STATUS_WAITLOCK):
369 391
          logging.warning("Unfinished job %s found: %s", job.id, job)
370 392
          try:
371 393
            for op in job.ops:
......
621 643
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
622 644

  
623 645
      if status not in (constants.JOB_STATUS_QUEUED,
624
                        constants.JOB_STATUS_RUNNING):
646
                        constants.JOB_STATUS_RUNNING,
647
                        constants.JOB_STATUS_WAITLOCK):
625 648
        # Don't even try to wait if the job is no longer running, there will be
626 649
        # no changes.
627 650
        break

Also available in: Unified diff