Revision ef2df7d3

b/daemons/ganeti-masterd
35 35
import signal
36 36
import logging
37 37

  
38
from cStringIO import StringIO
39 38
from optparse import OptionParser
40 39

  
41 40
from ganeti import config
b/lib/jqueue.py
153 153
  @ivar received_timestamp: the timestamp for when the job was received
154 154
  @ivar start_timestmap: the timestamp for start of execution
155 155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar lock_status: In-memory locking information for debugging
156 157
  @ivar change: a Condition variable we use for waiting for job changes
157 158

  
158 159
  """
159 160
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
160 161
               "received_timestamp", "start_timestamp", "end_timestamp",
161
               "change",
162
               "lock_status", "change",
162 163
               "__weakref__"]
163 164

  
164 165
  def __init__(self, queue, job_id, ops):
......
186 187
    self.start_timestamp = None
187 188
    self.end_timestamp = None
188 189

  
190
    # In-memory attributes
191
    self.lock_status = None
192

  
189 193
    # Condition to wait for changes
190 194
    self.change = threading.Condition(self.queue._lock)
191 195

  
......
209 213
    obj.start_timestamp = state.get("start_timestamp", None)
210 214
    obj.end_timestamp = state.get("end_timestamp", None)
211 215

  
216
    # In-memory attributes
217
    obj.lock_status = None
218

  
212 219
    obj.ops = []
213 220
    obj.log_serial = 0
214 221
    for op_state in state["ops"]:
......
334 341
      not_marked = False
335 342

  
336 343

  
337
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
344
class _OpExecCallbacks(mcpu.OpExecCbBase):
338 345
  def __init__(self, queue, job, op):
339 346
    """Initializes this class.
340 347

  
......
368 375
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
369 376
                                 constants.OP_STATUS_CANCELING)
370 377

  
378
      # All locks are acquired by now
379
      self._job.lock_status = None
380

  
371 381
      # Cancel here if we were asked to
372 382
      if self._op.status == constants.OP_STATUS_CANCELING:
373 383
        raise CancelJob()
......
401 411
    finally:
402 412
      self._queue.release()
403 413

  
414
  def ReportLocks(self, msg):
415
    """Write locking information to the job.
416

  
417
    Called whenever the LU processor is waiting for a lock or has acquired one.
418

  
419
    """
420
    # Not getting the queue lock because this is a single assignment
421
    self._job.lock_status = msg
422

  
404 423

  
405 424
class _JobQueueWorker(workerpool.BaseWorker):
406 425
  """The actual job workers.
......
457 476

  
458 477
            # Make sure not to hold queue lock while calling ExecOpCode
459 478
            result = proc.ExecOpCode(input_opcode,
460
                                     _OpCodeExecCallbacks(queue, job, op))
479
                                     _OpExecCallbacks(queue, job, op))
461 480

  
462 481
            queue.acquire()
463 482
            try:
......
505 524
      queue.acquire()
506 525
      try:
507 526
        try:
527
          job.lock_status = None
508 528
          job.run_op_index = -1
509 529
          job.end_timestamp = TimeStampNow()
510 530
          queue.UpdateJobUnlocked(job)
......
513 533
          status = job.CalcStatus()
514 534
      finally:
515 535
        queue.release()
536

  
516 537
      logging.info("Worker %s finished job %s, status = %s",
517 538
                   self.worker_id, job_id, status)
518 539

  
......
1081 1102

  
1082 1103
    return results
1083 1104

  
1084

  
1085 1105
  @_RequireOpenQueue
1086 1106
  def UpdateJobUnlocked(self, job):
1087 1107
    """Update a job's on disk storage.
b/lib/mcpu.py
36 36
from ganeti import rpc
37 37
from ganeti import cmdlib
38 38
from ganeti import locking
39
from ganeti import utils
39 40

  
40 41

  
41 42
class OpExecCbBase:
......
55 56

  
56 57
    """
57 58

  
59
  def ReportLocks(self, msg):
60
    """Report lock operations.
61

  
62
    """
63

  
58 64

  
59 65
class Processor(object):
60 66
  """Object which runs OpCodes"""
......
128 134
    self.rpc = rpc.RpcRunner(context.cfg)
129 135
    self.hmclass = HooksMaster
130 136

  
137
  def _ReportLocks(self, level, names, shared, acquired):
138
    """Reports lock operations.
139

  
140
    @type level: int
141
    @param level: Lock level
142
    @type names: list or string
143
    @param names: Lock names
144
    @type shared: bool
145
    @param shared: Whether the lock should be acquired in shared mode
146
    @type acquired: bool
147
    @param acquired: Whether the lock has already been acquired
148

  
149
    """
150
    parts = []
151

  
152
    # Build message
153
    if acquired:
154
      parts.append("acquired")
155
    else:
156
      parts.append("waiting")
157

  
158
    parts.append(locking.LEVEL_NAMES[level])
159

  
160
    if names == locking.ALL_SET:
161
      parts.append("ALL")
162
    elif isinstance(names, basestring):
163
      parts.append(names)
164
    else:
165
      parts.append(",".join(names))
166

  
167
    if shared:
168
      parts.append("shared")
169
    else:
170
      parts.append("exclusive")
171

  
172
    msg = "/".join(parts)
173

  
174
    logging.debug("LU locks %s", msg)
175

  
176
    if self._cbs:
177
      self._cbs.ReportLocks(msg)
178

  
131 179
  def _ExecLU(self, lu):
132 180
    """Logical Unit execution sequence.
133 181

  
......
184 232
      share = lu.share_locks[level]
185 233
      if acquiring_locks:
186 234
        needed_locks = lu.needed_locks[level]
235

  
236
        self._ReportLocks(level, needed_locks, share, False)
187 237
        lu.acquired_locks[level] = self.context.glm.acquire(level,
188 238
                                                            needed_locks,
189 239
                                                            shared=share)
240
        self._ReportLocks(level, needed_locks, share, True)
241

  
190 242
      else: # adding_locks
191 243
        add_locks = lu.add_locks[level]
192 244
        lu.remove_locks[level] = add_locks
......
234 286
      # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
235 287
      # shared fashion otherwise (to prevent concurrent run with an exclusive
236 288
      # LU.
237
      self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
238
                               shared=not lu_class.REQ_BGL)
289
      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
290
                        not lu_class.REQ_BGL, False)
291
      try:
292
        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
293
                                 shared=not lu_class.REQ_BGL)
294
      finally:
295
        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
296
                          not lu_class.REQ_BGL, True)
239 297
      try:
240 298
        self.exclusive_BGL = lu_class.REQ_BGL
241 299
        lu = lu_class(self, op, self.context, self.rpc)

Also available in: Unified diff