Revision 87ed6b79

b/lib/cmdlib/base.py
53 53
    self.other = kwargs
54 54

  
55 55

  
56
class LUWConfdClient(object):
57
  """Wrapper class for wconfd client calls from LUs.
58

  
59
  Correctly updates the cache of the LU's owned locks
60
  when leaving. Also transparently adds the context
61
  for resource requests.
62

  
63
  """
64
  def __init__(self, lu):
65
    self.lu = lu
66

  
67
  def TryUpdateLocks(self, req):
68
    jid, livelockfile = self.lu.wconfdcontext
69
    self.lu.wconfd.Client().TryUpdateLocks(jid, livelockfile, req)
70
    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile)
71

  
72
  def DownGradeLocksLevel(self, level):
73
    jid, livelockfile = self.lu.wconfdcontext
74
    self.lu.wconfd.Client().DownGradeLocksLevel(jid, livelockfile, level)
75
    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile)
76

  
77
  def FreeLocksLevel(self, level):
78
    jid, livelockfile = self.lu.wconfdcontext
79
    self.lu.wconfd.Client().FreeLocksLevel(jid, livelockfile, level)
80
    self.lu.wconfdlocks = self.lu.wconfd.Client().ListLocks(jid, livelockfile)
81

  
82

  
56 83
class LogicalUnit(object):
57 84
  """Logical Unit base class.
58 85

  
......
76 103
  HTYPE = None
77 104
  REQ_BGL = True
78 105

  
79
  def __init__(self, processor, op, context, rpc_runner):
106
  def __init__(self, processor, op, context, rpc_runner, wconfdcontext, wconfd):
80 107
    """Constructor for LogicalUnit.
81 108

  
82 109
    This needs to be overridden in derived classes in order to check op
83 110
    validity.
84 111

  
112
    @type wconfdcontext: (int, string)
113
    @param wconfdcontext: the identity of the logical unit to represent itself
114
        to wconfd when asking for resources; it is given as job id and livelock
115
        file.
116
    @param wconfd: the wconfd class to use; dependency injection to allow
117
        testability.
118

  
85 119
    """
86 120
    self.proc = processor
87 121
    self.op = op
88 122
    self.cfg = context.cfg
89
    self.glm = context.glm
90
    # readability alias
91
    self.owned_locks = context.glm.list_owned
123
    self.wconfdlocks = []
124
    self.wconfdcontext = wconfdcontext
92 125
    self.context = context
93 126
    self.rpc = rpc_runner
127
    self.wconfd = wconfd # wconfd module to use, for testing
94 128

  
95 129
    # Dictionaries used to declare locking needs to mcpu
96 130
    self.needed_locks = None
......
123 157

  
124 158
    self.CheckArguments()
125 159

  
160
  def WConfdClient(self):
161
    return LUWConfdClient(self)
162

  
163
  def owned_locks(self, level):
164
    """Return the list of locks owned by the LU at a given level.
165

  
166
    This method assumes that is field wconfdlocks is set correctly
167
    by mcpu.
168

  
169
    """
170
    levelprefix = "%s/" % (locking.LEVEL_NAMES[level],)
171
    locks = set([lock[0][len(levelprefix):]
172
                for lock in self.wconfdlocks
173
                if lock[0].startswith(levelprefix)])
174
    expand_fns = {
175
      locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
176
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
177
      locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]),
178
      locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
179
      locking.LEVEL_NODE: self.cfg.GetNodeList,
180
      locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
181
      locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
182
      }
183
    if locking.LOCKSET_NAME in locks:
184
      return expand_fns[level]()
185
    else:
186
      return locks
187

  
188
  def release_request(self, level, names):
189
    """Return a request to release the specified locks of the given level.
190

  
191
    Correctly break up the group lock to do so.
192

  
193
    """
194
    levelprefix = "%s/" % (locking.LEVEL_NAMES[level],)
195
    release = [[levelprefix + lock, "release"] for lock in names]
196

  
197
    # if we break up the set-lock, make sure we ask for the rest of it.
198
    setlock = levelprefix + locking.LOCKSET_NAME
199
    if [setlock, "exclusive"] in self.wconfdlocks:
200
      owned = self.owned_locks(level)
201
      request = [[levelprefix + lock, "exclusive"]
202
                 for lock in owned
203
                 if lock not in names]
204
    elif [setlock, "shared"] in self.wconfdlocks:
205
      owned = self.owned_locks(level)
206
      request = [[levelprefix + lock, "shared"]
207
                 for lock in owned
208
                 if lock not in names]
209
    else:
210
      request = []
211

  
212
    return release + [[setlock, "release"]] + request
213

  
126 214
  def CheckArguments(self):
127 215
    """Check syntactic validity for the opcode arguments.
128 216

  
......
515 603

  
516 604
    # caller specified names and we must keep the same order
517 605
    assert self.names
518
    assert not self.do_locking or lu.glm.is_owned(lock_level)
519 606

  
520 607
    missing = set(self.wanted).difference(names)
521 608
    if missing:
b/lib/cmdlib/cluster.py
331 331
    """Computes the list of nodes and their attributes.
332 332

  
333 333
    """
334
    # Locking is not used
335
    assert not (compat.any(lu.glm.is_owned(level)
336
                           for level in locking.LEVELS
337
                           if level != locking.LEVEL_CLUSTER) or
338
                self.do_locking or self.use_locking)
339

  
340 334
    if query.CQ_CONFIG in self.requested_data:
341 335
      cluster = lu.cfg.GetClusterInfo()
342 336
      nodes = lu.cfg.GetAllNodesInfo()
......
591 585

  
592 586
    """
593 587
    if self.wanted_names is None:
594
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
588
      self.wanted_names = \
589
          map(self.cfg.GetInstanceName,
590
              self.owned_locks(locking.LEVEL_INSTANCE))
595 591

  
596 592
    self.wanted_instances = \
597 593
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
......
634 630
        per_node_disks[pnode].append((instance, idx, disk))
635 631

  
636 632
    assert not (frozenset(per_node_disks.keys()) -
637
                self.owned_locks(locking.LEVEL_NODE_RES)), \
633
                frozenset(self.owned_locks(locking.LEVEL_NODE_RES))), \
638 634
      "Not owning correct locks"
639 635
    assert not self.owned_locks(locking.LEVEL_NODE)
640 636

  
b/lib/cmdlib/instance.py
1389 1389
    assert not (self.owned_locks(locking.LEVEL_NODE_RES) -
1390 1390
                self.owned_locks(locking.LEVEL_NODE)), \
1391 1391
      "Node locks differ from node resource locks"
1392
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1393 1392

  
1394 1393
    ht_kind = self.op.hypervisor
1395 1394
    if ht_kind in constants.HTS_REQ_PORT:
......
1732 1731
    # Otherwise the new lock would have to be added in acquired mode.
1733 1732
    assert self.REQ_BGL
1734 1733
    assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
1735
    self.glm.remove(locking.LEVEL_INSTANCE, old_name)
1736
    self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
1737 1734

  
1738 1735
    # re-read the instance from the configuration after rename
1739 1736
    renamed_inst = self.cfg.GetInstanceInfo(self.instance.uuid)
b/lib/cmdlib/instance_migration.py
347 347
        ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
348 348

  
349 349
    else:
350
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
351

  
352 350
      secondary_node_uuids = self.instance.secondary_nodes
353 351
      if not secondary_node_uuids:
354 352
        raise errors.ConfigurationError("No secondary node but using"
b/lib/cmdlib/instance_storage.py
784 784
      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
785 785
      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
786 786

  
787
    assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
788

  
789 787
    if self.op.node_uuids:
790 788
      node_uuids = self.op.node_uuids
791 789
    else:
......
1590 1588
    ReleaseLocks(self, locking.LEVEL_NODE)
1591 1589

  
1592 1590
    # Downgrade lock while waiting for sync
1593
    self.glm.downgrade(locking.LEVEL_INSTANCE)
1591
    self.WConfdClient().DownGradeLocksLevel(
1592
          locking.LEVEL_NAMES[locking.LEVEL_INSTANCE])
1594 1593

  
1595 1594
    assert wipe_disks ^ (old_disk_size is None)
1596 1595

  
......
1707 1706
           for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1708 1707
           for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1709 1708
      else:
1710
        assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1711

  
1712 1709
        self._LockInstancesNodes()
1713 1710

  
1714 1711
    elif level == locking.LEVEL_NODE_RES:
......
1748 1745
    """Check prerequisites.
1749 1746

  
1750 1747
    """
1751
    assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1752
            self.op.iallocator is None)
1753

  
1754 1748
    # Verify if node group locks are still correct
1755 1749
    owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1756 1750
    if owned_groups:
......
2170 2164
           (owned_nodes, self.node_secondary_ip.keys()))
2171 2165
      assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2172 2166
              self.lu.owned_locks(locking.LEVEL_NODE_RES))
2173
      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2174 2167

  
2175 2168
      owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2176 2169
      assert list(owned_instances) == [self.instance_name], \
2177 2170
          "Instance '%s' not locked" % self.instance_name
2178 2171

  
2179
      assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2180
          "Should not own any node group lock at this point"
2181

  
2182 2172
    if not self.disks:
2183 2173
      feedback_fn("No disks need replacement for instance '%s'" %
2184 2174
                  self.instance.name)
b/lib/cmdlib/instance_utils.py
378 378
  @param keep: Names of locks to retain
379 379

  
380 380
  """
381
  logging.debug("Lu %s ReleaseLocks %s names=%s, keep=%s",
382
                lu.wconfdcontext, level, names, keep)
381 383
  assert not (keep is not None and names is not None), \
382 384
         "Only one of the 'names' and the 'keep' parameters can be given"
383 385

  
......
388 390
  else:
389 391
    should_release = None
390 392

  
393
  levelname = locking.LEVEL_NAMES[level]
394

  
391 395
  owned = lu.owned_locks(level)
392 396
  if not owned:
393 397
    # Not owning any lock at this level, do nothing
......
407 411
    assert len(lu.owned_locks(level)) == (len(retain) + len(release))
408 412

  
409 413
    # Release just some locks
410
    lu.glm.release(level, names=release)
411

  
414
    lu.WConfdClient().TryUpdateLocks(
415
      lu.release_request(level, release))
412 416
    assert frozenset(lu.owned_locks(level)) == frozenset(retain)
413 417
  else:
414
    # Release everything
415
    lu.glm.release(level)
416

  
417
    assert not lu.glm.is_owned(level), "No locks should be owned"
418
    lu.WConfdClient().FreeLocksLevel(levelname)
418 419

  
419 420

  
420 421
def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
b/lib/cmdlib/misc.py
24 24
import logging
25 25
import time
26 26

  
27
from ganeti import compat
28 27
from ganeti import constants
29 28
from ganeti import errors
30 29
from ganeti import locking
......
286 285
    """Computes the list of nodes and their attributes.
287 286

  
288 287
    """
289
    # Locking is not used
290
    assert not (compat.any(lu.glm.is_owned(level)
291
                           for level in locking.LEVELS
292
                           if level != locking.LEVEL_CLUSTER) or
293
                self.do_locking or self.use_locking)
294

  
295 288
    valid_nodes = [node.uuid
296 289
                   for node in lu.cfg.GetAllNodesInfo().values()
297 290
                   if not node.offline and node.vm_capable]
b/lib/cmdlib/operating_system.py
21 21

  
22 22
"""Logical units dealing with OS."""
23 23

  
24
from ganeti import compat
25 24
from ganeti import locking
26 25
from ganeti import qlang
27 26
from ganeti import query
......
94 93
    """Computes the list of nodes and their attributes.
95 94

  
96 95
    """
97
    # Locking is not used
98
    assert not (compat.any(lu.glm.is_owned(level)
99
                           for level in locking.LEVELS
100
                           if level != locking.LEVEL_CLUSTER) or
101
                self.do_locking or self.use_locking)
102

  
103 96
    valid_node_uuids = [node.uuid
104 97
                        for node in lu.cfg.GetAllNodesInfo().values()
105 98
                        if not node.offline and node.vm_capable]
b/lib/locking.py
916 916
# to acquire.  Hide this behind this nicely named constant.
917 917
ALL_SET = None
918 918

  
919
LOCKSET_NAME = "[lockset]"
920

  
919 921

  
920 922
def _TimeoutZero():
921 923
  """Returns the number zero.
b/lib/mcpu.py
44 44
from ganeti import locking
45 45
from ganeti import utils
46 46
from ganeti import compat
47
from ganeti import wconfd
47 48

  
48 49

  
49 50
_OP_PREFIX = "Op"
......
250 251
                               " queries) can not submit jobs")
251 252

  
252 253

  
253
def _VerifyLocks(lu, glm, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
254
def _VerifyLocks(lu, _mode_whitelist=_NODE_ALLOC_MODE_WHITELIST,
254 255
                 _nal_whitelist=_NODE_ALLOC_WHITELIST):
255 256
  """Performs consistency checks on locks acquired by a logical unit.
256 257

  
257 258
  @type lu: L{cmdlib.LogicalUnit}
258 259
  @param lu: Logical unit instance
259
  @type glm: L{locking.GanetiLockManager}
260
  @param glm: Lock manager
261 260

  
262 261
  """
263 262
  if not __debug__:
264 263
    return
265 264

  
266
  have_nal = glm.check_owned(locking.LEVEL_NODE_ALLOC, locking.NAL)
265
  allocset = lu.owned_locks(locking.LEVEL_NODE_ALLOC)
266
  have_nal = locking.NAL in allocset
267 267

  
268 268
  for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
269 269
    # TODO: Verify using actual lock mode, not using LU variables
......
282 282
      if lu.__class__ in _nal_whitelist:
283 283
        assert not have_nal, \
284 284
          "LU is whitelisted for not acquiring the node allocation lock"
285
      elif lu.needed_locks[level] == locking.ALL_SET or glm.owning_all(level):
285
      elif lu.needed_locks[level] == locking.ALL_SET:
286 286
        assert have_nal, \
287 287
          ("Node allocation lock must be used if an LU acquires all nodes"
288 288
           " or node resources")
......
307 307
    self.rpc = context.rpc
308 308
    self.hmclass = hooksmaster.HooksMaster
309 309
    self._enable_locks = enable_locks
310
    self.wconfd = wconfd # Indirection to allow testing
310 311

  
311 312
  def _CheckLocksEnabled(self):
312 313
    """Checks if locking is enabled.
......
336 337
    """
337 338
    self._CheckLocksEnabled()
338 339

  
340
    # TODO: honor priority in lock allocation
339 341
    if self._cbs:
340
      priority = self._cbs.CurrentPriority()
342
      priority = self._cbs.CurrentPriority() # pylint: disable=W0612
341 343
    else:
342 344
      priority = None
343 345

  
344
    acquired = self.context.glm.acquire(level, names, shared=shared,
345
                                        timeout=timeout, priority=priority,
346
                                        opportunistic=opportunistic)
346
    if names == locking.ALL_SET:
347
      if opportunistic:
348
        expand_fns = {
349
          locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
350
          locking.LEVEL_INSTANCE: self.context.cfg.GetInstanceList,
351
          locking.LEVEL_NODE_ALLOC: (lambda: [locking.NAL]),
352
          locking.LEVEL_NODEGROUP: self.context.cfg.GetNodeGroupList,
353
          locking.LEVEL_NODE: self.context.cfg.GetNodeList,
354
          locking.LEVEL_NODE_RES: self.context.cfg.GetNodeList,
355
          locking.LEVEL_NETWORK: self.context.cfg.GetNetworkList,
356
          }
357
        names = expand_fns[level]()
358
      else:
359
        names = locking.LOCKSET_NAME
360

  
361
    if isinstance(names, str):
362
      names = [names]
363

  
364
    levelname = locking.LEVEL_NAMES[level]
365
    jid = int(self.GetECId())
366
    livelockfile = self.context.livelock.lockfile.name
367

  
368
    locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
347 369

  
348
    if acquired is None:
349
      raise LockAcquireTimeout()
370
    if not names:
371
      logging.debug("Acquiring no locks for %d (%s) at level %s",
372
                    jid, livelockfile, levelname)
373
      return []
350 374

  
351
    return acquired
375
    if shared:
376
      request = [[lock, "shared"] for lock in locks]
377
    else:
378
      request = [[lock, "exclusive"] for lock in locks]
379

  
380
    if opportunistic:
381
      logging.debug("Opportunistically acquring some of %s for %d (%s).",
382
                    locks, jid, livelockfile)
383
      locks = self.wconfd.Client().OpportunisticLockUnion(jid, livelockfile,
384
                                                          request)
385
    elif timeout is None:
386
      while True:
387
        ## TODO: use asynchronous wait instead of polling
388
        blockedon = self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
389
                                                        request)
390
        logging.debug("Requesting %s for %d (%s) blocked on %s",
391
                      request, jid, livelockfile, blockedon)
392
        if not blockedon:
393
          break
394
        time.sleep(random.random())
395
    else:
396
      logging.debug("Trying %ss to request %s for %d (%s)",
397
                    timeout, request, jid, livelockfile)
398
      ## TODO: use blocking wait instead of polling
399
      blocked = utils.SimpleRetry([], self.wconfd.Client().TryUpdateLocks, 0.1,
400
                                  timeout, args=[jid, livelockfile, request])
401
      if blocked:
402
        raise LockAcquireTimeout()
403

  
404
    return locks
352 405

  
353 406
  def _ExecLU(self, lu):
354 407
    """Logical Unit execution sequence.
......
398 451
    given LU and its opcodes.
399 452

  
400 453
    """
401
    glm = self.context.glm
402 454
    adding_locks = level in lu.add_locks
403 455
    acquiring_locks = level in lu.needed_locks
404 456

  
405 457
    if level not in locking.LEVELS:
406
      _VerifyLocks(lu, glm)
458
      _VerifyLocks(lu)
407 459

  
408 460
      if self._cbs:
409 461
        self._cbs.NotifyStart()
......
445 497

  
446 498
          self._AcquireLocks(level, needed_locks, share, opportunistic,
447 499
                             calc_timeout())
500
          (jid, livelockfile) = lu.wconfdcontext
501
          lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
448 502
        else:
449 503
          # Adding locks
450 504
          add_locks = lu.add_locks[level]
505
          if isinstance(add_locks, str):
506
            add_locks = [add_locks]
451 507
          lu.remove_locks[level] = add_locks
452 508

  
453 509
          try:
454
            glm.add(level, add_locks, acquired=1, shared=share)
455
          except errors.LockError:
510
            jid = int(self.GetECId())
511
            livelockfile = self.context.livelock.lockfile.name
512
            levelname = locking.LEVEL_NAMES[level]
513

  
514
            if share:
515
              mode = "shared"
516
            else:
517
              mode = "exclusive"
518

  
519
            request = [["%s/%s" % (levelname, lock), mode]
520
                       for lock in add_locks]
521

  
522
            logging.debug("Requesting %s for %d (%s)",
523
                          request, jid, livelockfile)
524
            blocked = \
525
              self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request)
526
            assert blocked == [], "Allocating newly 'created' locks failed"
527
            (jid, livelockfile) = lu.wconfdcontext
528
            lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
529
          except errors.GenericError:
530
            # TODO: verify what actually caused the error
456 531
            logging.exception("Detected lock error in level %s for locks"
457 532
                              " %s, shared=%s", level, add_locks, share)
458 533
            raise errors.OpPrereqError(
......
464 539
          result = self._LockAndExecLU(lu, level + 1, calc_timeout)
465 540
        finally:
466 541
          if level in lu.remove_locks:
467
            glm.remove(level, lu.remove_locks[level])
542
            jid = int(self.GetECId())
543
            livelockfile = self.context.livelock.lockfile.name
544
            levelname = locking.LEVEL_NAMES[level]
545
            request = [["%s/%s" % (levelname, lock), "release"]
546
                       for lock in lu.remove_locks[level]]
547
            blocked = \
548
              self.wconfd.Client().TryUpdateLocks(jid, livelockfile, request)
549
            assert blocked == [], "Release may not fail"
468 550
      finally:
469
        if glm.is_owned(level):
470
          glm.release(level)
471

  
551
        jid = int(self.GetECId())
552
        livelockfile = self.context.livelock.lockfile.name
553
        levelname = locking.LEVEL_NAMES[level]
554
        logging.debug("Freeing locks at level %s for %d (%s)",
555
                      levelname, jid, livelockfile)
556
        self.wconfd.Client().FreeLocksLevel(jid, livelockfile, levelname)
472 557
    else:
473 558
      result = self._LockAndExecLU(lu, level + 1, calc_timeout)
474 559

  
......
529 614
                                     " disabled" % op.OP_ID)
530 615

  
531 616
      try:
532
        lu = lu_class(self, op, self.context, self.rpc)
617
        jid = int(self.GetECId())
618
        livelockfile = self.context.livelock.lockfile.name
619
        lu = lu_class(self, op, self.context, self.rpc, (jid, livelockfile),
620
                      self.wconfd)
621
        lu.wconfdlocks = self.wconfd.Client().ListLocks(jid, livelockfile)
533 622
        lu.ExpandNames()
534 623
        assert lu.needed_locks is not None, "needed_locks not set by LU"
535 624

  
......
541 630
            self.context.cfg.DropECReservations(self._ec_id)
542 631
      finally:
543 632
        # Release BGL if owned
544
        if self.context.glm.is_owned(locking.LEVEL_CLUSTER):
545
          assert self._enable_locks
546
          self.context.glm.release(locking.LEVEL_CLUSTER)
633
        jid = int(self.GetECId())
634
        livelockfile = self.context.livelock.lockfile.name
635
        bglname = "%s/%s" % (locking.LEVEL_NAMES[locking.LEVEL_CLUSTER],
636
                             locking.BGL)
637
        self.wconfd.Client().TryUpdateLocks(jid, livelockfile,
638
                                            [[bglname, "release"]])
547 639
    finally:
548 640
      self._cbs = None
549 641

  
b/lib/server/masterd.py
489 489
    object.__setattr__(self, name, value)
490 490

  
491 491
  def AddNode(self, node, ec_id):
492
    """Adds a node to the configuration and lock manager.
492
    """Adds a node to the configuration.
493 493

  
494 494
    """
495 495
    # Add it to the configuration
......
498 498
    # If preseeding fails it'll not be added
499 499
    self.jobqueue.AddNode(node)
500 500

  
501
    # Add the new node to the Ganeti Lock Manager
502
    self.glm.add(locking.LEVEL_NODE, node.uuid)
503
    self.glm.add(locking.LEVEL_NODE_RES, node.uuid)
504

  
505 501
  def ReaddNode(self, node):
506 502
    """Updates a node that's already in the configuration
507 503

  
......
519 515
    # Notify job queue
520 516
    self.jobqueue.RemoveNode(node.name)
521 517

  
522
    # Remove the node from the Ganeti Lock Manager
523
    self.glm.remove(locking.LEVEL_NODE, node.uuid)
524
    self.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
525

  
526 518

  
527 519
def _SetWatcherPause(context, until):
528 520
  """Creates or removes the watcher pause file.
b/test/py/cmdlib/testsupport/cmdlib_testcase.py
29 29

  
30 30
from cmdlib.testsupport.config_mock import ConfigMock
31 31
from cmdlib.testsupport.iallocator_mock import patchIAllocator
32
from cmdlib.testsupport.lock_manager_mock import LockManagerMock
32
from cmdlib.testsupport.livelock_mock import LiveLockMock
33 33
from cmdlib.testsupport.netutils_mock import patchNetutils, \
34 34
  SetupDefaultNetutilsMock
35 35
from cmdlib.testsupport.processor_mock import ProcessorMock
36 36
from cmdlib.testsupport.rpc_runner_mock import CreateRpcRunnerMock, \
37 37
  RpcResultsBuilder, patchRpc, SetupDefaultRpcModuleMock
38 38
from cmdlib.testsupport.ssh_mock import patchSsh
39
from cmdlib.testsupport.wconfd_mock import WConfdMock
39 40

  
40 41
from ganeti.cmdlib.base import LogicalUnit
41 42
from ganeti import errors
42
from ganeti import locking
43 43
from ganeti import objects
44 44
from ganeti import opcodes
45 45
from ganeti import runtime
......
51 51
  # pylint: disable=W0212
52 52
  cfg = property(fget=lambda self: self._test_case.cfg)
53 53
  # pylint: disable=W0212
54
  glm = property(fget=lambda self: self._test_case.glm)
55
  # pylint: disable=W0212
56 54
  rpc = property(fget=lambda self: self._test_case.rpc)
57 55

  
58 56
  def __init__(self, test_case):
59 57
    self._test_case = test_case
58
    self.livelock = LiveLockMock()
60 59

  
61 60
  def AddNode(self, node, ec_id):
62 61
    self._test_case.cfg.AddNode(node, ec_id)
63
    self._test_case.glm.add(locking.LEVEL_NODE, node.uuid)
64
    self._test_case.glm.add(locking.LEVEL_NODE_RES, node.uuid)
65 62

  
66 63
  def ReaddNode(self, node):
67 64
    pass
68 65

  
69 66
  def RemoveNode(self, node):
70 67
    self._test_case.cfg.RemoveNode(node.uuid)
71
    self._test_case.glm.remove(locking.LEVEL_NODE, node.uuid)
72
    self._test_case.glm.remove(locking.LEVEL_NODE_RES, node.uuid)
73 68

  
74 69

  
75 70
class MockLU(LogicalUnit):
......
90 85
  The environment can be customized via the following fields:
91 86

  
92 87
    * C{cfg}: @see L{ConfigMock}
93
    * C{glm}: @see L{LockManagerMock}
94 88
    * C{rpc}: @see L{CreateRpcRunnerMock}
95 89
    * C{iallocator_cls}: @see L{patchIAllocator}
96 90
    * C{mcpu}: @see L{ProcessorMock}
......
168 162

  
169 163
    """
170 164
    self.cfg = ConfigMock()
171
    self.glm = LockManagerMock()
172 165
    self.rpc = CreateRpcRunnerMock()
173 166
    self.ctx = GanetiContextMock(self)
174 167
    self.mcpu = ProcessorMock(self.ctx)
......
211 204
    @return: A mock LU
212 205

  
213 206
    """
214
    return MockLU(self.mcpu, mock.MagicMock(), self.ctx, self.rpc)
207
    return MockLU(self.mcpu, mock.MagicMock(), self.ctx, self.rpc,
208
                  (1234, "/tmp/mock/livelock"), WConfdMock())
215 209

  
216 210
  def RpcResultsBuilder(self, use_node_names=False):
217 211
    """Creates a pre-configured L{RpcResultBuilder}
......
231 225
    @return: the result of the LU's C{Exec} method
232 226

  
233 227
    """
234
    self.glm.AddLocksFromConfig(self.cfg)
235

  
236 228
    return self.mcpu.ExecOpCodeAndRecordOutput(opcode)
237 229

  
238 230
  def ExecOpCodeExpectException(self, opcode,
......
292 284
    @return: the result of test_func
293 285

  
294 286
    """
295
    self.glm.AddLocksFromConfig(self.cfg)
296

  
297 287
    return self.mcpu.RunWithLockedLU(opcode, test_func)
298 288

  
299 289
  def assertLogContainsMessage(self, expected_msg):
b/test/py/cmdlib/testsupport/processor_mock.py
27 27
from ganeti import constants
28 28
from ganeti import mcpu
29 29

  
30
from cmdlib.testsupport.wconfd_mock import WConfdMock
31

  
30 32

  
31 33
class LogRecordingCallback(mcpu.OpExecCbBase):
32 34
  """Helper class for log output recording.
......
68 70
    super(ProcessorMock, self).__init__(context, 1, True)
69 71
    self.log_entries = []
70 72
    self._lu_test_func = None
73
    self.wconfd = WConfdMock()
71 74

  
72 75
  def ExecOpCodeAndRecordOutput(self, op):
73 76
    """Executes the given opcode and records the output for further inspection.
b/test/py/ganeti.hooks_unittest.py
246 246
    self.context = FakeContext()
247 247
    # WARNING: here we pass None as RpcRunner instance since we know
248 248
    # our usage via HooksMaster will not use lu.rpc
249
    self.lu = FakeLU(FakeProc(), self.op, self.context, None)
249
    self.lu = FakeLU(FakeProc(), self.op, self.context, None, (123, "/foo/bar"),
250
                     None)
250 251

  
251 252
  def testTotalFalse(self):
252 253
    """Test complete rpc failure"""
......
518 519

  
519 520
    self.op = opcodes.OpTestDummy(result=False, messages=[], fail=False)
520 521
    self.lu = FakeEnvWithCustomPostHookNodesLU(FakeProc(), self.op,
521
                                               FakeContext(), None)
522
                                               FakeContext(), None,
523
                                               (123, "/foo/bar"),
524
                                               None)
522 525

  
523 526
  def _HooksRpc(self, *args):
524 527
    self._rpcs.append(args)
b/test/py/ganeti.mcpu_unittest.py
173 173
  def __init__(self, needed_locks, share_locks):
174 174
    self.needed_locks = needed_locks
175 175
    self.share_locks = share_locks
176
    self.locks = []
176 177

  
177

  
178
class _FakeGlm:
179
  def __init__(self, owning_nal):
180
    self._owning_nal = owning_nal
181

  
182
  def check_owned(self, level, names):
183
    assert level == locking.LEVEL_NODE_ALLOC
184
    assert names == locking.NAL
185
    return self._owning_nal
186

  
187
  def owning_all(self, level):
188
    return False
178
  def owned_locks(self, *_):
179
    return self.locks
189 180

  
190 181

  
191 182
class TestVerifyLocks(unittest.TestCase):
192 183
  def testNoLocks(self):
193 184
    lu = _FakeLuWithLocks({}, {})
194
    glm = _FakeGlm(False)
195
    mcpu._VerifyLocks(lu, glm,
185
    mcpu._VerifyLocks(lu,
196 186
                      _mode_whitelist=NotImplemented,
197 187
                      _nal_whitelist=NotImplemented)
198 188

  
......
204 194
        level: 0,
205 195
        locking.LEVEL_NODE_ALLOC: 0,
206 196
        })
207
      glm = _FakeGlm(False)
208
      mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
197
      mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[])
209 198

  
210 199
  def testDifferentMode(self):
211 200
    for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
......
215 204
        level: 0,
216 205
        locking.LEVEL_NODE_ALLOC: 1,
217 206
        })
218
      glm = _FakeGlm(False)
219 207
      try:
220
        mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
208
        mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[])
221 209
      except AssertionError, err:
222 210
        self.assertTrue("using the same mode as nodes" in str(err))
223 211
      else:
224 212
        self.fail("Exception not raised")
225 213

  
226 214
      # Once more with the whitelist
227
      mcpu._VerifyLocks(lu, glm, _mode_whitelist=[_FakeLuWithLocks],
215
      mcpu._VerifyLocks(lu, _mode_whitelist=[_FakeLuWithLocks],
228 216
                        _nal_whitelist=[])
229 217

  
230 218
  def testSameMode(self):
......
236 224
        level: 1,
237 225
        locking.LEVEL_NODE_ALLOC: 1,
238 226
        })
239
      glm = _FakeGlm(True)
240 227

  
241 228
      try:
242
        mcpu._VerifyLocks(lu, glm, _mode_whitelist=[_FakeLuWithLocks],
229
        mcpu._VerifyLocks(lu, _mode_whitelist=[_FakeLuWithLocks],
243 230
                          _nal_whitelist=[])
244 231
      except AssertionError, err:
245 232
        self.assertTrue("whitelisted to use different modes" in str(err))
......
247 234
        self.fail("Exception not raised")
248 235

  
249 236
      # Once more without the whitelist
250
      mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
237
      mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[])
251 238

  
252 239
  def testAllWithoutAllocLock(self):
253 240
    for level in [locking.LEVEL_NODE, locking.LEVEL_NODE_RES]:
......
257 244
        level: 0,
258 245
        locking.LEVEL_NODE_ALLOC: 0,
259 246
        })
260
      glm = _FakeGlm(False)
247

  
261 248
      try:
262
        mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
249
        mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[])
263 250
      except AssertionError, err:
264 251
        self.assertTrue("allocation lock must be used if" in str(err))
265 252
      else:
266 253
        self.fail("Exception not raised")
267 254

  
268 255
      # Once more with the whitelist
269
      mcpu._VerifyLocks(lu, glm, _mode_whitelist=[],
256
      mcpu._VerifyLocks(lu, _mode_whitelist=[],
270 257
                        _nal_whitelist=[_FakeLuWithLocks])
271 258

  
272 259
  def testAllWithAllocLock(self):
......
278 265
        level: 0,
279 266
        locking.LEVEL_NODE_ALLOC: 0,
280 267
        })
281
      glm = _FakeGlm(True)
268
      lu.locks = [locking.NAL]
282 269

  
283 270
      try:
284
        mcpu._VerifyLocks(lu, glm, _mode_whitelist=[],
271
        mcpu._VerifyLocks(lu, _mode_whitelist=[],
285 272
                          _nal_whitelist=[_FakeLuWithLocks])
286 273
      except AssertionError, err:
287 274
        self.assertTrue("whitelisted for not acquiring" in str(err))
......
289 276
        self.fail("Exception not raised")
290 277

  
291 278
      # Once more without the whitelist
292
      mcpu._VerifyLocks(lu, glm, _mode_whitelist=[], _nal_whitelist=[])
279
      mcpu._VerifyLocks(lu, _mode_whitelist=[], _nal_whitelist=[])
293 280

  
294 281

  
295 282
if __name__ == "__main__":

Also available in: Unified diff