Revision 19b9ba9a

b/daemons/ganeti-masterd
277 277
      op = opcodes.OpGetTags(kind=kind, name=name)
278 278
      return self._Query(op)
279 279

  
280
    elif method == luxi.REQ_QUERY_LOCKS:
281
      (fields, sync) = args
282
      logging.info("Received locks query request")
283
      return self.server.context.glm.QueryLocks(fields, sync)
284

  
280 285
    elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
281 286
      drain_flag = args
282 287
      logging.info("Received queue drain flag change request to %s",
b/lib/cli.py
84 84
  "IGNORE_REMOVE_FAILURES_OPT",
85 85
  "IGNORE_SECONDARIES_OPT",
86 86
  "IGNORE_SIZE_OPT",
87
  "INTERVAL_OPT",
87 88
  "MAC_PREFIX_OPT",
88 89
  "MAINTAIN_NODE_HEALTH_OPT",
89 90
  "MASTER_NETDEV_OPT",
......
929 930
                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
930 931
                         help="Maximum time to wait for instance shutdown")
931 932

  
933
INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
934
                          default=None,
935
                          help=("Number of seconds between repetions of the"
936
                                " command"))
937

  
932 938
EARLY_RELEASE_OPT = cli_option("--early-release",
933 939
                               dest="early_release", default=False,
934 940
                               action="store_true",
b/lib/locking.py
30 30
import threading
31 31
import time
32 32
import errno
33
import weakref
34
import logging
33 35

  
34 36
from ganeti import errors
35 37
from ganeti import utils
......
409 411

  
410 412
  """
411 413
  __slots__ = [
414
    "__weakref__",
412 415
    "__active_shr_c",
413 416
    "__inactive_shr_c",
414 417
    "__deleted",
......
421 424

  
422 425
  __condition_class = PipeCondition
423 426

  
424
  def __init__(self, name):
427
  def __init__(self, name, monitor=None):
425 428
    """Construct a new SharedLock.
426 429

  
427 430
    @param name: the name of the lock
431
    @type monitor: L{LockMonitor}
432
    @param monitor: Lock monitor with which to register
428 433

  
429 434
    """
430 435
    object.__init__(self)
......
448 453
    # is this lock in the deleted state?
449 454
    self.__deleted = False
450 455

  
456
    # Register with lock monitor
457
    if monitor:
458
      monitor.RegisterLock(self)
459

  
460
  def GetInfo(self, fields):
461
    """Retrieves information for querying locks.
462

  
463
    @type fields: list of strings
464
    @param fields: List of fields to return
465

  
466
    """
467
    self.__lock.acquire()
468
    try:
469
      info = []
470

  
471
      # Note: to avoid unintentional race conditions, no references to
472
      # modifiable objects should be returned unless they were created in this
473
      # function.
474
      for fname in fields:
475
        if fname == "name":
476
          info.append(self.name)
477
        elif fname == "mode":
478
          if self.__deleted:
479
            info.append("deleted")
480
            assert not (self.__exc or self.__shr)
481
          elif self.__exc:
482
            info.append("exclusive")
483
          elif self.__shr:
484
            info.append("shared")
485
          else:
486
            info.append(None)
487
        elif fname == "owner":
488
          if self.__exc:
489
            owner = [self.__exc]
490
          else:
491
            owner = self.__shr
492

  
493
          if owner:
494
            assert not self.__deleted
495
            info.append([i.getName() for i in owner])
496
          else:
497
            info.append(None)
498
        else:
499
          raise errors.OpExecError("Invalid query field '%s'" % fname)
500

  
501
      return info
502
    finally:
503
      self.__lock.release()
504

  
451 505
  def __check_deleted(self):
452 506
    """Raises an exception if the lock has been deleted.
453 507

  
......
671 725
        self.__deleted = True
672 726
        self.__exc = None
673 727

  
728
        assert not (self.__exc or self.__shr), "Found owner during deletion"
729

  
674 730
        # Notify all acquires. They'll throw an error.
675 731
        while self.__pending:
676 732
          self.__pending.pop().notifyAll()
......
713 769
  @ivar name: the name of the lockset
714 770

  
715 771
  """
716
  def __init__(self, members, name):
772
  def __init__(self, members, name, monitor=None):
717 773
    """Constructs a new LockSet.
718 774

  
719 775
    @type members: list of strings
720 776
    @param members: initial members of the set
777
    @type monitor: L{LockMonitor}
778
    @param monitor: Lock monitor with which to register member locks
721 779

  
722 780
    """
723 781
    assert members is not None, "members parameter is not a list"
724 782
    self.name = name
725 783

  
784
    # Lock monitor
785
    self.__monitor = monitor
786

  
726 787
    # Used internally to guarantee coherency.
727 788
    self.__lock = SharedLock(name)
728 789

  
......
731 792
    self.__lockdict = {}
732 793

  
733 794
    for mname in members:
734
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname))
795
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
796
                                          monitor=monitor)
735 797

  
736 798
    # The owner dict contains the set of locks each thread owns. For
737 799
    # performance each thread can access its own key without a global lock on
......
1055 1117
                               (invalid_names, self.name))
1056 1118

  
1057 1119
      for lockname in names:
1058
        lock = SharedLock(self._GetLockName(lockname))
1120
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1059 1121

  
1060 1122
        if acquired:
1061 1123
          lock.acquire(shared=shared)
......
1193 1255

  
1194 1256
    self.__class__._instance = self
1195 1257

  
1258
    self._monitor = LockMonitor()
1259

  
1196 1260
    # The keyring contains all the locks, at their level and in the correct
1197 1261
    # locking order.
1198 1262
    self.__keyring = {
1199
      LEVEL_CLUSTER: LockSet([BGL], "bgl lockset"),
1200
      LEVEL_NODE: LockSet(nodes, "nodes lockset"),
1201
      LEVEL_INSTANCE: LockSet(instances, "instances lockset"),
1202
    }
1263
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1264
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1265
      LEVEL_INSTANCE: LockSet(instances, "instances",
1266
                              monitor=self._monitor),
1267
      }
1268

  
1269
  def QueryLocks(self, fields, sync):
1270
    """Queries information from all locks.
1271

  
1272
    See L{LockMonitor.QueryLocks}.
1273

  
1274
    """
1275
    return self._monitor.QueryLocks(fields, sync)
1203 1276

  
1204 1277
  def _names(self, level):
1205 1278
    """List the lock names at the given level.
......
1352 1425
           "Cannot remove locks at a level while not owning it or"
1353 1426
           " owning some at a greater one")
1354 1427
    return self.__keyring[level].remove(names)
1428

  
1429

  
1430
class LockMonitor(object):
1431
  _LOCK_ATTR = "_lock"
1432

  
1433
  def __init__(self):
1434
    """Initializes this class.
1435

  
1436
    """
1437
    self._lock = SharedLock("LockMonitor")
1438

  
1439
    # Tracked locks. Weak references are used to avoid issues with circular
1440
    # references and deletion.
1441
    self._locks = weakref.WeakKeyDictionary()
1442

  
1443
  @ssynchronized(_LOCK_ATTR)
1444
  def RegisterLock(self, lock):
1445
    """Registers a new lock.
1446

  
1447
    """
1448
    logging.debug("Registering lock %s", lock.name)
1449
    assert lock not in self._locks, "Duplicate lock registration"
1450
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1451
           "Found duplicate lock name"
1452
    self._locks[lock] = None
1453

  
1454
  @ssynchronized(_LOCK_ATTR)
1455
  def _GetLockInfo(self, fields):
1456
    """Get information from all locks while the monitor lock is held.
1457

  
1458
    """
1459
    result = {}
1460

  
1461
    for lock in self._locks.keys():
1462
      assert lock.name not in result, "Found duplicate lock name"
1463
      result[lock.name] = lock.GetInfo(fields)
1464

  
1465
    return result
1466

  
1467
  def QueryLocks(self, fields, sync):
1468
    """Queries information from all locks.
1469

  
1470
    @type fields: list of strings
1471
    @param fields: List of fields to return
1472
    @type sync: boolean
1473
    @param sync: Whether to operate in synchronous mode
1474

  
1475
    """
1476
    if sync:
1477
      raise NotImplementedError("Synchronous queries are not implemented")
1478

  
1479
    # Get all data without sorting
1480
    result = self._GetLockInfo(fields)
1481

  
1482
    # Sort by name
1483
    return [result[name] for name in utils.NiceSort(result.keys())]
b/lib/luxi.py
59 59
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 60
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 61
REQ_QUERY_TAGS = "QueryTags"
62
REQ_QUERY_LOCKS = "QueryLocks"
62 63
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 64
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64 65

  
......
490 491

  
491 492
  def QueryTags(self, kind, name):
492 493
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
494

  
495
  def QueryLocks(self, fields, sync):
496
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))
b/man/gnt-debug.sgml
192 192
      </para>
193 193
    </refsect2>
194 194

  
195
    <refsect2>
196
      <title>LOCKS</title>
197
      <cmdsynopsis>
198
        <command>locks</command>
199
        <arg>--no-headers</arg>
200
        <arg>--separator=<replaceable>SEPARATOR</replaceable></arg>
201
        <sbr>
202
        <arg>-o <replaceable>[+]FIELD,...</replaceable></arg>
203
        <arg>--interval=<replaceable>SECONDS</replaceable></arg>
204
        <sbr>
205
      </cmdsynopsis>
206

  
207
      <para>
208
        Shows a list of locks in the master daemon.
209
      </para>
210

  
211
      <para>
212
        The <option>--no-headers</option> option will skip the initial
213
        header line. The <option>--separator</option> option takes an
214
        argument which denotes what will be used between the output
215
        fields. Both these options are to help scripting.
216
      </para>
217

  
218
      <para>
219
        The <option>-o</option> option takes a comma-separated list of
220
        output fields. The available fields and their meaning are:
221
        <variablelist>
222
          <varlistentry>
223
            <term>name</term>
224
            <listitem>
225
              <simpara>Lock name</simpara>
226
            </listitem>
227
          </varlistentry>
228
          <varlistentry>
229
            <term>mode</term>
230
            <listitem>
231
              <simpara>
232
                Mode in which the lock is currently acquired (exclusive or
233
                shared)
234
              </simpara>
235
            </listitem>
236
          </varlistentry>
237
          <varlistentry>
238
            <term>owner</term>
239
            <listitem>
240
              <simpara>Current lock owner(s)</simpara>
241
            </listitem>
242
          </varlistentry>
243
        </variablelist>
244
      </para>
245

  
246
      <para>
247
        If the value of the option starts with the character
248
        <constant>+</constant>, the new fields will be added to the default
249
        list. This allows to quickly see the default list plus a few other
250
        fields, instead of retyping the entire list of fields.
251
      </para>
252

  
253
      <para>
254
        Use <option>--interval</option> to repeat the listing. A delay
255
        specified by the option value in seconds is inserted.
256
      </para>
257

  
258
    </refsect2>
195 259
  </refsect1>
196 260

  
197 261
  &footer;
b/scripts/gnt-debug
39 39
from ganeti import errors
40 40

  
41 41

  
42
#: Default fields for L{ListLocks}
43
_LIST_LOCKS_DEF_FIELDS = [
44
  "name",
45
  "mode",
46
  "owner",
47
  ]
48

  
49

  
42 50
def Delay(opts, args):
43 51
  """Sleeps for a while
44 52

  
......
398 406
  return 0
399 407

  
400 408

  
409
def ListLocks(opts, args): # pylint: disable-msg=W0613
410
  """List all locks.
411

  
412
  @param opts: the command line options selected by the user
413
  @type args: list
414
  @param args: should be an empty list
415
  @rtype: int
416
  @return: the desired exit code
417

  
418
  """
419
  selected_fields = ParseFields(opts.output, _LIST_LOCKS_DEF_FIELDS)
420

  
421
  if not opts.no_headers:
422
    headers = {
423
      "name": "Name",
424
      "mode": "Mode",
425
      "owner": "Owner",
426
      }
427
  else:
428
    headers = None
429

  
430
  while True:
431
    # Not reusing client as interval might be too long
432
    output = GetClient().QueryLocks(selected_fields, False)
433

  
434
    # change raw values to nicer strings
435
    for row in output:
436
      for idx, field in enumerate(selected_fields):
437
        val = row[idx]
438

  
439
        if field in ("mode", "owner") and val is None:
440
          val = "-"
441
        elif field == "owner":
442
          val = utils.CommaJoin(val)
443

  
444
        row[idx] = str(val)
445

  
446
    data = GenerateTable(separator=opts.separator, headers=headers,
447
                         fields=selected_fields, data=output)
448
    for line in data:
449
      ToStdout(line)
450

  
451
    if not opts.interval:
452
      break
453

  
454
    ToStdout("")
455
    time.sleep(opts.interval)
456

  
457
  return 0
458

  
459

  
401 460
commands = {
402 461
  'delay': (
403 462
    Delay, [ArgUnknown(min=1, max=1)],
......
454 513
    "{opts...} <instance>", "Executes a TestAllocator OpCode"),
455 514
  "test-jobqueue": (
456 515
    TestJobqueue, ARGS_NONE, [],
457
    "", "Test a few aspects of the job queue")
516
    "", "Test a few aspects of the job queue"),
517
  "locks": (
518
    ListLocks, ARGS_NONE, [NOHDR_OPT, SEP_OPT, FIELDS_OPT, INTERVAL_OPT],
519
    "[--interval N]", "Show a list of locks in the master daemon"),
458 520
  }
459 521

  
460 522

  
b/test/ganeti.locking_unittest.py
27 27
import time
28 28
import Queue
29 29
import threading
30
import random
30 31

  
31 32
from ganeti import locking
32 33
from ganeti import errors
34
from ganeti import utils
33 35

  
34 36
import testutils
35 37

  
......
1422 1424
    self.GL.release(locking.LEVEL_CLUSTER, ['BGL'])
1423 1425

  
1424 1426

  
1427
class TestLockMonitor(_ThreadedTestCase):
1428
  def setUp(self):
1429
    _ThreadedTestCase.setUp(self)
1430
    self.lm = locking.LockMonitor()
1431

  
1432
  def testSingleThread(self):
1433
    locks = []
1434

  
1435
    for i in range(100):
1436
      name = "TestLock%s" % i
1437
      locks.append(locking.SharedLock(name, monitor=self.lm))
1438

  
1439
    self.assertEqual(len(self.lm._locks), len(locks))
1440

  
1441
    # Delete all locks
1442
    del locks[:]
1443

  
1444
    # The garbage collector might needs some time
1445
    def _CheckLocks():
1446
      if self.lm._locks:
1447
        raise utils.RetryAgain()
1448

  
1449
    utils.Retry(_CheckLocks, 0.1, 30.0)
1450

  
1451
    self.assertFalse(self.lm._locks)
1452

  
1453
  def testMultiThread(self):
1454
    locks = []
1455

  
1456
    def _CreateLock(prev, next, name):
1457
      prev.wait()
1458
      locks.append(locking.SharedLock(name, monitor=self.lm))
1459
      if next:
1460
        next.set()
1461

  
1462
    expnames = []
1463

  
1464
    first = threading.Event()
1465
    prev = first
1466

  
1467
    # Use a deterministic random generator
1468
    for i in random.Random(4263).sample(range(100), 33):
1469
      name = "MtTestLock%s" % i
1470
      expnames.append(name)
1471

  
1472
      ev = threading.Event()
1473
      self._addThread(target=_CreateLock, args=(prev, ev, name))
1474
      prev = ev
1475

  
1476
    # Add locks
1477
    first.set()
1478
    self._waitThreads()
1479

  
1480
    # Check order in which locks were added
1481
    self.assertEqual([i.name for i in locks], expnames)
1482

  
1483
    # Sync queries are not supported
1484
    self.assertRaises(NotImplementedError, self.lm.QueryLocks, ["name"], True)
1485

  
1486
    # Check query result
1487
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1488
                     [[name, None, None] for name in utils.NiceSort(expnames)])
1489

  
1490
    # Test exclusive acquire
1491
    for tlock in locks[::4]:
1492
      tlock.acquire(shared=0)
1493
      try:
1494
        def _GetExpResult(name):
1495
          if tlock.name == name:
1496
            return [name, "exclusive", [threading.currentThread().getName()]]
1497
          return [name, None, None]
1498

  
1499
        self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1500
                         [_GetExpResult(name)
1501
                          for name in utils.NiceSort(expnames)])
1502
      finally:
1503
        tlock.release()
1504

  
1505
    # Test shared acquire
1506
    def _Acquire(lock, shared, ev):
1507
      lock.acquire(shared=shared)
1508
      try:
1509
        ev.wait()
1510
      finally:
1511
        lock.release()
1512

  
1513
    for tlock1 in locks[::11]:
1514
      for tlock2 in locks[::-15]:
1515
        if tlock2 == tlock1:
1516
          continue
1517

  
1518
        for tlock3 in locks[::10]:
1519
          if tlock3 == tlock2:
1520
            continue
1521

  
1522
          ev = threading.Event()
1523

  
1524
          # Acquire locks
1525
          tthreads1 = []
1526
          for i in range(3):
1527
            tthreads1.append(self._addThread(target=_Acquire,
1528
                                             args=(tlock1, 1, ev)))
1529
          tthread2 = self._addThread(target=_Acquire, args=(tlock2, 1, ev))
1530
          tthread3 = self._addThread(target=_Acquire, args=(tlock3, 0, ev))
1531

  
1532
          # Check query result
1533
          for (name, mode, owner) in self.lm.QueryLocks(["name", "mode",
1534
                                                         "owner"], False):
1535
            if name == tlock1.name:
1536
              self.assertEqual(mode, "shared")
1537
              self.assertEqual(set(owner), set(i.getName() for i in tthreads1))
1538
              continue
1539

  
1540
            if name == tlock2.name:
1541
              self.assertEqual(mode, "shared")
1542
              self.assertEqual(owner, [tthread2.getName()])
1543
              continue
1544

  
1545
            if name == tlock3.name:
1546
              self.assertEqual(mode, "exclusive")
1547
              self.assertEqual(owner, [tthread3.getName()])
1548
              continue
1549

  
1550
            self.assert_(name in expnames)
1551
            self.assert_(mode is None)
1552
            self.assert_(owner is None)
1553

  
1554
          # Release locks again
1555
          ev.set()
1556

  
1557
          self._waitThreads()
1558

  
1559
          self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1560
                           [[name, None, None]
1561
                            for name in utils.NiceSort(expnames)])
1562

  
1563
  def testDelete(self):
1564
    lock = locking.SharedLock("TestLock", monitor=self.lm)
1565

  
1566
    self.assertEqual(len(self.lm._locks), 1)
1567
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1568
                     [[lock.name, None, None]])
1569

  
1570
    lock.delete()
1571

  
1572
    self.assertEqual(self.lm.QueryLocks(["name", "mode", "owner"], False),
1573
                     [[lock.name, "deleted", None]])
1574
    self.assertEqual(len(self.lm._locks), 1)
1575

  
1576

  
1425 1577
if __name__ == '__main__':
1426 1578
  testutils.GanetiTestProgram()

Also available in: Unified diff