Revision e92376d7

b/daemons/ganeti-masterd
261 261
    """
262 262
    proc = mcpu.Processor(self.server.context)
263 263
    # TODO: Where should log messages go?
264
    return proc.ExecOpCode(op, self._DummyLog)
264
    return proc.ExecOpCode(op, self._DummyLog, None)
265 265

  
266 266

  
267 267
class GanetiContext(object):
b/lib/constants.py
304 304

  
305 305
# Job status
306 306
JOB_STATUS_QUEUED = "queued"
307
JOB_STATUS_WAITLOCK = "waiting"
307 308
JOB_STATUS_RUNNING = "running"
308 309
JOB_STATUS_CANCELED = "canceled"
309 310
JOB_STATUS_SUCCESS = "success"
310 311
JOB_STATUS_ERROR = "error"
311 312

  
312 313
OP_STATUS_QUEUED = "queued"
314
OP_STATUS_WAITLOCK = "waiting"
313 315
OP_STATUS_RUNNING = "running"
314 316
OP_STATUS_CANCELED = "canceled"
315 317
OP_STATUS_SUCCESS = "success"
b/lib/jqueue.py
159 159

  
160 160
      if op.status == constants.OP_STATUS_QUEUED:
161 161
        pass
162
      elif op.status == constants.OP_STATUS_WAITLOCK:
163
        status = constants.JOB_STATUS_WAITLOCK
162 164
      elif op.status == constants.OP_STATUS_RUNNING:
163 165
        status = constants.JOB_STATUS_RUNNING
164 166
      elif op.status == constants.OP_STATUS_ERROR:
......
188 190

  
189 191

  
190 192
class _JobQueueWorker(workerpool.BaseWorker):
193
  def _NotifyStart(self):
194
    """Mark the opcode as running, not lock-waiting.
195

  
196
    This is called from the mcpu code as a notifier function, when the
197
    LU is finally about to start the Exec() method. Of course, to have
198
    end-user visible results, the opcode must be initially (before
199
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
200

  
201
    """
202
    assert self.queue, "Queue attribute is missing"
203
    assert self.opcode, "Opcode attribute is missing"
204

  
205
    self.queue.acquire()
206
    try:
207
      self.opcode.status = constants.OP_STATUS_RUNNING
208
    finally:
209
      self.queue.release()
210

  
191 211
  def RunTask(self, job):
192 212
    """Job executor.
193 213

  
......
198 218
    logging.debug("Worker %s processing job %s",
199 219
                  self.worker_id, job.id)
200 220
    proc = mcpu.Processor(self.pool.queue.context)
201
    queue = job.queue
221
    self.queue = queue = job.queue
202 222
    try:
203 223
      try:
204 224
        count = len(job.ops)
......
209 229
            queue.acquire()
210 230
            try:
211 231
              job.run_op_index = idx
212
              op.status = constants.OP_STATUS_RUNNING
232
              op.status = constants.OP_STATUS_WAITLOCK
213 233
              op.result = None
214 234
              op.start_timestamp = TimeStampNow()
215 235
              if idx == 0: # first opcode
......
246 266
                queue.release()
247 267

  
248 268
            # Make sure not to hold lock while _Log is called
249
            result = proc.ExecOpCode(input_opcode, _Log)
269
            self.opcode = op
270
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
250 271

  
251 272
            queue.acquire()
252 273
            try:
......
365 386
        if status in (constants.JOB_STATUS_QUEUED, ):
366 387
          self._wpool.AddTask(job)
367 388

  
368
        elif status in (constants.JOB_STATUS_RUNNING, ):
389
        elif status in (constants.JOB_STATUS_RUNNING,
390
                        constants.JOB_STATUS_WAITLOCK):
369 391
          logging.warning("Unfinished job %s found: %s", job.id, job)
370 392
          try:
371 393
            for op in job.ops:
......
621 643
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
622 644

  
623 645
      if status not in (constants.JOB_STATUS_QUEUED,
624
                        constants.JOB_STATUS_RUNNING):
646
                        constants.JOB_STATUS_RUNNING,
647
                        constants.JOB_STATUS_WAITLOCK):
625 648
        # Don't even try to wait if the job is no longer running, there will be
626 649
        # no changes.
627 650
        break
b/lib/mcpu.py
131 131
    adding_locks = level in lu.add_locks
132 132
    acquiring_locks = level in lu.needed_locks
133 133
    if level not in locking.LEVELS:
134
      if callable(self._run_notifier):
135
        self._run_notifier()
134 136
      result = self._ExecLU(lu)
135 137
    elif adding_locks and acquiring_locks:
136 138
      # We could both acquire and add locks at the same level, but for now we
......
170 172

  
171 173
    return result
172 174

  
173
  def ExecOpCode(self, op, feedback_fn):
175
  def ExecOpCode(self, op, feedback_fn, run_notifier):
174 176
    """Execute an opcode.
175 177

  
176
    Args:
177
      op: the opcode to be executed
178
    @type op: an OpCode instance
179
    @param op: the opcode to be executed
180
    @type feedback_fn: a function that takes a single argument
181
    @param feedback_fn: this function will be used as feedback from the LU
182
                        code to the end-user
183
    @type run_notifier: callable (no arguments) or None
184
    @param run_notifier:  this function (if callable) will be called when
185
                          we are about to call the lu's Exec() method, that
186
                          is, after we have aquired all locks
178 187

  
179 188
    """
180 189
    if not isinstance(op, opcodes.OpCode):
......
182 191
                                   " to ExecOpcode")
183 192

  
184 193
    self._feedback_fn = feedback_fn
194
    self._run_notifier = run_notifier
185 195
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
186 196
    if lu_class is None:
187 197
      raise errors.OpCodeUnknown("Unknown opcode")
b/scripts/gnt-job
38 38

  
39 39
_USER_JOB_STATUS = {
40 40
  constants.JOB_STATUS_QUEUED: "queued",
41
  constants.JOB_STATUS_WAITLOCK: "waiting",
41 42
  constants.JOB_STATUS_RUNNING: "running",
42 43
  constants.JOB_STATUS_CANCELED: "canceled",
43 44
  constants.JOB_STATUS_SUCCESS: "success",

Also available in: Unified diff