Revision 031a3e57

b/daemons/ganeti-masterd
316 316
      logging.info("Received invalid request '%s'", method)
317 317
      raise ValueError("Invalid operation '%s'" % method)
318 318

  
319
  def _DummyLog(self, *args):
320
    pass
321

  
322 319
  def _Query(self, op):
323 320
    """Runs the specified opcode and returns the result.
324 321

  
325 322
    """
326 323
    proc = mcpu.Processor(self.server.context)
327
    # TODO: Where should log messages go?
328
    return proc.ExecOpCode(op, self._DummyLog, None)
324
    return proc.ExecOpCode(op, None)
329 325

  
330 326

  
331 327
class GanetiContext(object):
b/lib/jqueue.py
334 334
      not_marked = False
335 335

  
336 336

  
337
class _JobQueueWorker(workerpool.BaseWorker):
338
  """The actual job workers.
337
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
338
  def __init__(self, queue, job, op):
339
    """Initializes this class.
339 340

  
340
  """
341
  def _NotifyStart(self):
341
    @type queue: L{JobQueue}
342
    @param queue: Job queue
343
    @type job: L{_QueuedJob}
344
    @param job: Job object
345
    @type op: L{_QueuedOpCode}
346
    @param op: OpCode
347

  
348
    """
349
    assert queue, "Queue is missing"
350
    assert job, "Job is missing"
351
    assert op, "Opcode is missing"
352

  
353
    self._queue = queue
354
    self._job = job
355
    self._op = op
356

  
357
  def NotifyStart(self):
342 358
    """Mark the opcode as running, not lock-waiting.
343 359

  
344
    This is called from the mcpu code as a notifier function, when the
345
    LU is finally about to start the Exec() method. Of course, to have
346
    end-user visible results, the opcode must be initially (before
347
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
360
    This is called from the mcpu code as a notifier function, when the LU is
361
    finally about to start the Exec() method. Of course, to have end-user
362
    visible results, the opcode must be initially (before calling into
363
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
348 364

  
349 365
    """
350
    assert self.queue, "Queue attribute is missing"
351
    assert self.opcode, "Opcode attribute is missing"
352

  
353
    self.queue.acquire()
366
    self._queue.acquire()
354 367
    try:
355
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
356
                                    constants.OP_STATUS_CANCELING)
368
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
369
                                 constants.OP_STATUS_CANCELING)
357 370

  
358 371
      # Cancel here if we were asked to
359
      if self.opcode.status == constants.OP_STATUS_CANCELING:
372
      if self._op.status == constants.OP_STATUS_CANCELING:
360 373
        raise CancelJob()
361 374

  
362
      self.opcode.status = constants.OP_STATUS_RUNNING
375
      self._op.status = constants.OP_STATUS_RUNNING
363 376
    finally:
364
      self.queue.release()
377
      self._queue.release()
378

  
379
  def Feedback(self, *args):
380
    """Append a log entry.
381

  
382
    """
383
    assert len(args) < 3
365 384

  
385
    if len(args) == 1:
386
      log_type = constants.ELOG_MESSAGE
387
      log_msg = args[0]
388
    else:
389
      (log_type, log_msg) = args
390

  
391
    # The time is split to make serialization easier and not lose
392
    # precision.
393
    timestamp = utils.SplitTime(time.time())
394

  
395
    self._queue.acquire()
396
    try:
397
      self._job.log_serial += 1
398
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
399

  
400
      self._job.change.notifyAll()
401
    finally:
402
      self._queue.release()
403

  
404

  
405
class _JobQueueWorker(workerpool.BaseWorker):
406
  """The actual job workers.
407

  
408
  """
366 409
  def RunTask(self, job):
367 410
    """Job executor.
368 411

  
......
376 419
    logging.info("Worker %s processing job %s",
377 420
                  self.worker_id, job.id)
378 421
    proc = mcpu.Processor(self.pool.queue.context)
379
    self.queue = queue = job.queue
422
    queue = job.queue
380 423
    try:
381 424
      try:
382 425
        count = len(job.ops)
......
412 455
            finally:
413 456
              queue.release()
414 457

  
415
            def _Log(*args):
416
              """Append a log entry.
417

  
418
              """
419
              assert len(args) < 3
420

  
421
              if len(args) == 1:
422
                log_type = constants.ELOG_MESSAGE
423
                log_msg = args[0]
424
              else:
425
                log_type, log_msg = args
426

  
427
              # The time is split to make serialization easier and not lose
428
              # precision.
429
              timestamp = utils.SplitTime(time.time())
430

  
431
              queue.acquire()
432
              try:
433
                job.log_serial += 1
434
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
435

  
436
                job.change.notifyAll()
437
              finally:
438
                queue.release()
439

  
440
            # Make sure not to hold lock while _Log is called
441
            self.opcode = op
442
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
458
            # Make sure not to hold queue lock while calling ExecOpCode
459
            result = proc.ExecOpCode(input_opcode,
460
                                     _OpCodeExecCallbacks(queue, job, op))
443 461

  
444 462
            queue.acquire()
445 463
            try:
b/lib/mcpu.py
38 38
from ganeti import locking
39 39

  
40 40

  
41
class OpExecCbBase:
42
  """Base class for OpCode execution callbacks.
43

  
44
  """
45
  def NotifyStart(self):
46
    """Called when we are about to execute the LU.
47

  
48
    This function is called when we're about to start the lu's Exec() method,
49
    that is, after we have acquired all locks.
50

  
51
    """
52

  
53
  def Feedback(self, *args):
54
    """Sends feedback from the LU code to the end-user.
55

  
56
    """
57

  
58

  
41 59
class Processor(object):
42 60
  """Object which runs OpCodes"""
43 61
  DISPATCH_TABLE = {
......
103 121
  def __init__(self, context):
104 122
    """Constructor for Processor
105 123

  
106
    Args:
107
     - feedback_fn: the feedback function (taking one string) to be run when
108
                    interesting events are happening
109 124
    """
110 125
    self.context = context
111
    self._feedback_fn = None
126
    self._cbs = None
112 127
    self.exclusive_BGL = False
113 128
    self.rpc = rpc.RpcRunner(context.cfg)
114 129
    self.hmclass = HooksMaster
......
122 137
    hm = HooksMaster(self.rpc.call_hooks_runner, lu)
123 138
    h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
124 139
    lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
125
                     self._feedback_fn, None)
140
                     self._Feedback, None)
126 141

  
127 142
    if getattr(lu.op, "dry_run", False):
128 143
      # in this mode, no post-hooks are run, and the config is not
......
133 148
      return lu.dry_run_result
134 149

  
135 150
    try:
136
      result = lu.Exec(self._feedback_fn)
151
      result = lu.Exec(self._Feedback)
137 152
      h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
138 153
      result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
139
                                self._feedback_fn, result)
154
                                self._Feedback, result)
140 155
    finally:
141 156
      # FIXME: This needs locks if not lu_class.REQ_BGL
142 157
      if write_count != self.context.cfg.write_count:
......
155 170
    adding_locks = level in lu.add_locks
156 171
    acquiring_locks = level in lu.needed_locks
157 172
    if level not in locking.LEVELS:
158
      if callable(self._run_notifier):
159
        self._run_notifier()
173
      if self._cbs:
174
        self._cbs.NotifyStart()
175

  
160 176
      result = self._ExecLU(lu)
161 177
    elif adding_locks and acquiring_locks:
162 178
      # We could both acquire and add locks at the same level, but for now we
......
196 212

  
197 213
    return result
198 214

  
199
  def ExecOpCode(self, op, feedback_fn, run_notifier):
215
  def ExecOpCode(self, op, cbs):
200 216
    """Execute an opcode.
201 217

  
202 218
    @type op: an OpCode instance
203 219
    @param op: the opcode to be executed
204
    @type feedback_fn: a function that takes a single argument
205
    @param feedback_fn: this function will be used as feedback from the LU
206
                        code to the end-user
207
    @type run_notifier: callable (no arguments) or None
208
    @param run_notifier:  this function (if callable) will be called when
209
                          we are about to call the lu's Exec() method, that
210
                          is, after we have acquired all locks
220
    @type cbs: L{OpExecCbBase}
221
    @param cbs: Runtime callbacks
211 222

  
212 223
    """
213 224
    if not isinstance(op, opcodes.OpCode):
214 225
      raise errors.ProgrammerError("Non-opcode instance passed"
215 226
                                   " to ExecOpcode")
216 227

  
217
    self._feedback_fn = feedback_fn
218
    self._run_notifier = run_notifier
219
    lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
220
    if lu_class is None:
221
      raise errors.OpCodeUnknown("Unknown opcode")
222

  
223
    # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
224
    # shared fashion otherwise (to prevent concurrent run with an exclusive LU.
225
    self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
226
                             shared=not lu_class.REQ_BGL)
228
    self._cbs = cbs
227 229
    try:
228
      self.exclusive_BGL = lu_class.REQ_BGL
229
      lu = lu_class(self, op, self.context, self.rpc)
230
      lu.ExpandNames()
231
      assert lu.needed_locks is not None, "needed_locks not set by LU"
232
      result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
230
      lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
231
      if lu_class is None:
232
        raise errors.OpCodeUnknown("Unknown opcode")
233

  
234
      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
235
      # shared fashion otherwise (to prevent concurrent run with an exclusive
236
      # LU.
237
      self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
238
                               shared=not lu_class.REQ_BGL)
239
      try:
240
        self.exclusive_BGL = lu_class.REQ_BGL
241
        lu = lu_class(self, op, self.context, self.rpc)
242
        lu.ExpandNames()
243
        assert lu.needed_locks is not None, "needed_locks not set by LU"
244
        result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
245
      finally:
246
        self.context.glm.release(locking.LEVEL_CLUSTER)
247
        self.exclusive_BGL = False
233 248
    finally:
234
      self.context.glm.release(locking.LEVEL_CLUSTER)
235
      self.exclusive_BGL = False
249
      self._cbs = None
236 250

  
237 251
    return result
238 252

  
253
  def _Feedback(self, *args):
254
    """Forward call to feedback callback function.
255

  
256
    """
257
    if self._cbs:
258
      self._cbs.Feedback(*args)
259

  
239 260
  def LogStep(self, current, total, message):
240 261
    """Log a change in LU execution progress.
241 262

  
242 263
    """
243 264
    logging.debug("Step %d/%d %s", current, total, message)
244
    self._feedback_fn("STEP %d/%d %s" % (current, total, message))
265
    self._Feedback("STEP %d/%d %s" % (current, total, message))
245 266

  
246 267
  def LogWarning(self, message, *args, **kwargs):
247 268
    """Log a warning to the logs and the user.
......
258 279
      message = message % tuple(args)
259 280
    if message:
260 281
      logging.warning(message)
261
      self._feedback_fn(" - WARNING: %s" % message)
282
      self._Feedback(" - WARNING: %s" % message)
262 283
    if "hint" in kwargs:
263
      self._feedback_fn("      Hint: %s" % kwargs["hint"])
284
      self._Feedback("      Hint: %s" % kwargs["hint"])
264 285

  
265 286
  def LogInfo(self, message, *args):
266 287
    """Log an informational message to the logs and the user.
......
269 290
    if args:
270 291
      message = message % tuple(args)
271 292
    logging.info(message)
272
    self._feedback_fn(" - INFO: %s" % message)
293
    self._Feedback(" - INFO: %s" % message)
273 294

  
274 295

  
275 296
class HooksMaster(object):

Also available in: Unified diff