Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 3ccb3a64

History | View | Annotate | Download (55.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40
from ganeti import query
41

    
42

    
43
_EXCLUSIVE_TEXT = "exclusive"
44
_SHARED_TEXT = "shared"
45
_DELETED_TEXT = "deleted"
46

    
47
_DEFAULT_PRIORITY = 0
48

    
49

    
50
def ssynchronized(mylock, shared=0):
51
  """Shared Synchronization decorator.
52

53
  Calls the function holding the given lock, either in exclusive or shared
54
  mode. It requires the passed lock to be a SharedLock (or support its
55
  semantics).
56

57
  @type mylock: lockable object or string
58
  @param mylock: lock to acquire or class member name of the lock to acquire
59

60
  """
61
  def wrap(fn):
62
    def sync_function(*args, **kwargs):
63
      if isinstance(mylock, basestring):
64
        assert args, "cannot ssynchronize on non-class method: self not found"
65
        # args[0] is "self"
66
        lock = getattr(args[0], mylock)
67
      else:
68
        lock = mylock
69
      lock.acquire(shared=shared)
70
      try:
71
        return fn(*args, **kwargs)
72
      finally:
73
        lock.release()
74
    return sync_function
75
  return wrap
76

    
77

    
78
class _SingleNotifyPipeConditionWaiter(object):
79
  """Helper class for SingleNotifyPipeCondition
80

81
  """
82
  __slots__ = [
83
    "_fd",
84
    "_poller",
85
    ]
86

    
87
  def __init__(self, poller, fd):
88
    """Constructor for _SingleNotifyPipeConditionWaiter
89

90
    @type poller: select.poll
91
    @param poller: Poller object
92
    @type fd: int
93
    @param fd: File descriptor to wait for
94

95
    """
96
    object.__init__(self)
97
    self._poller = poller
98
    self._fd = fd
99

    
100
  def __call__(self, timeout):
101
    """Wait for something to happen on the pipe.
102

103
    @type timeout: float or None
104
    @param timeout: Timeout for waiting (can be None)
105

106
    """
107
    running_timeout = utils.RunningTimeout(timeout, True)
108

    
109
    while True:
110
      remaining_time = running_timeout.Remaining()
111

    
112
      if remaining_time is not None:
113
        if remaining_time < 0.0:
114
          break
115

    
116
        # Our calculation uses seconds, poll() wants milliseconds
117
        remaining_time *= 1000
118

    
119
      try:
120
        result = self._poller.poll(remaining_time)
121
      except EnvironmentError, err:
122
        if err.errno != errno.EINTR:
123
          raise
124
        result = None
125

    
126
      # Check whether we were notified
127
      if result and result[0][0] == self._fd:
128
        break
129

    
130

    
131
class _BaseCondition(object):
132
  """Base class containing common code for conditions.
133

134
  Some of this code is taken from python's threading module.
135

136
  """
137
  __slots__ = [
138
    "_lock",
139
    "acquire",
140
    "release",
141
    "_is_owned",
142
    "_acquire_restore",
143
    "_release_save",
144
    ]
145

    
146
  def __init__(self, lock):
147
    """Constructor for _BaseCondition.
148

149
    @type lock: threading.Lock
150
    @param lock: condition base lock
151

152
    """
153
    object.__init__(self)
154

    
155
    try:
156
      self._release_save = lock._release_save
157
    except AttributeError:
158
      self._release_save = self._base_release_save
159
    try:
160
      self._acquire_restore = lock._acquire_restore
161
    except AttributeError:
162
      self._acquire_restore = self._base_acquire_restore
163
    try:
164
      self._is_owned = lock.is_owned
165
    except AttributeError:
166
      self._is_owned = self._base_is_owned
167

    
168
    self._lock = lock
169

    
170
    # Export the lock's acquire() and release() methods
171
    self.acquire = lock.acquire
172
    self.release = lock.release
173

    
174
  def _base_is_owned(self):
175
    """Check whether lock is owned by current thread.
176

177
    """
178
    if self._lock.acquire(0):
179
      self._lock.release()
180
      return False
181
    return True
182

    
183
  def _base_release_save(self):
184
    self._lock.release()
185

    
186
  def _base_acquire_restore(self, _):
187
    self._lock.acquire()
188

    
189
  def _check_owned(self):
190
    """Raise an exception if the current thread doesn't own the lock.
191

192
    """
193
    if not self._is_owned():
194
      raise RuntimeError("cannot work with un-aquired lock")
195

    
196

    
197
class SingleNotifyPipeCondition(_BaseCondition):
198
  """Condition which can only be notified once.
199

200
  This condition class uses pipes and poll, internally, to be able to wait for
201
  notification with a timeout, without resorting to polling. It is almost
202
  compatible with Python's threading.Condition, with the following differences:
203
    - notifyAll can only be called once, and no wait can happen after that
204
    - notify is not supported, only notifyAll
205

206
  """
207

    
208
  __slots__ = [
209
    "_poller",
210
    "_read_fd",
211
    "_write_fd",
212
    "_nwaiters",
213
    "_notified",
214
    ]
215

    
216
  _waiter_class = _SingleNotifyPipeConditionWaiter
217

    
218
  def __init__(self, lock):
219
    """Constructor for SingleNotifyPipeCondition
220

221
    """
222
    _BaseCondition.__init__(self, lock)
223
    self._nwaiters = 0
224
    self._notified = False
225
    self._read_fd = None
226
    self._write_fd = None
227
    self._poller = None
228

    
229
  def _check_unnotified(self):
230
    """Throws an exception if already notified.
231

232
    """
233
    if self._notified:
234
      raise RuntimeError("cannot use already notified condition")
235

    
236
  def _Cleanup(self):
237
    """Cleanup open file descriptors, if any.
238

239
    """
240
    if self._read_fd is not None:
241
      os.close(self._read_fd)
242
      self._read_fd = None
243

    
244
    if self._write_fd is not None:
245
      os.close(self._write_fd)
246
      self._write_fd = None
247
    self._poller = None
248

    
249
  def wait(self, timeout):
250
    """Wait for a notification.
251

252
    @type timeout: float or None
253
    @param timeout: Waiting timeout (can be None)
254

255
    """
256
    self._check_owned()
257
    self._check_unnotified()
258

    
259
    self._nwaiters += 1
260
    try:
261
      if self._poller is None:
262
        (self._read_fd, self._write_fd) = os.pipe()
263
        self._poller = select.poll()
264
        self._poller.register(self._read_fd, select.POLLHUP)
265

    
266
      wait_fn = self._waiter_class(self._poller, self._read_fd)
267
      state = self._release_save()
268
      try:
269
        # Wait for notification
270
        wait_fn(timeout)
271
      finally:
272
        # Re-acquire lock
273
        self._acquire_restore(state)
274
    finally:
275
      self._nwaiters -= 1
276
      if self._nwaiters == 0:
277
        self._Cleanup()
278

    
279
  def notifyAll(self): # pylint: disable=C0103
280
    """Close the writing side of the pipe to notify all waiters.
281

282
    """
283
    self._check_owned()
284
    self._check_unnotified()
285
    self._notified = True
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289

    
290

    
291
class PipeCondition(_BaseCondition):
292
  """Group-only non-polling condition with counters.
293

294
  This condition class uses pipes and poll, internally, to be able to wait for
295
  notification with a timeout, without resorting to polling. It is almost
296
  compatible with Python's threading.Condition, but only supports notifyAll and
297
  non-recursive locks. As an additional features it's able to report whether
298
  there are any waiting threads.
299

300
  """
301
  __slots__ = [
302
    "_waiters",
303
    "_single_condition",
304
    ]
305

    
306
  _single_condition_class = SingleNotifyPipeCondition
307

    
308
  def __init__(self, lock):
309
    """Initializes this class.
310

311
    """
312
    _BaseCondition.__init__(self, lock)
313
    self._waiters = set()
314
    self._single_condition = self._single_condition_class(self._lock)
315

    
316
  def wait(self, timeout):
317
    """Wait for a notification.
318

319
    @type timeout: float or None
320
    @param timeout: Waiting timeout (can be None)
321

322
    """
323
    self._check_owned()
324

    
325
    # Keep local reference to the pipe. It could be replaced by another thread
326
    # notifying while we're waiting.
327
    cond = self._single_condition
328

    
329
    self._waiters.add(threading.currentThread())
330
    try:
331
      cond.wait(timeout)
332
    finally:
333
      self._check_owned()
334
      self._waiters.remove(threading.currentThread())
335

    
336
  def notifyAll(self): # pylint: disable=C0103
337
    """Notify all currently waiting threads.
338

339
    """
340
    self._check_owned()
341
    self._single_condition.notifyAll()
342
    self._single_condition = self._single_condition_class(self._lock)
343

    
344
  def get_waiting(self):
345
    """Returns a list of all waiting threads.
346

347
    """
348
    self._check_owned()
349

    
350
    return self._waiters
351

    
352
  def has_waiting(self):
353
    """Returns whether there are active waiters.
354

355
    """
356
    self._check_owned()
357

    
358
    return bool(self._waiters)
359

    
360
  def __repr__(self):
361
    return ("<%s.%s waiters=%s at %#x>" %
362
            (self.__class__.__module__, self.__class__.__name__,
363
             self._waiters, id(self)))
364

    
365

    
366
class _PipeConditionWithMode(PipeCondition):
367
  __slots__ = [
368
    "shared",
369
    ]
370

    
371
  def __init__(self, lock, shared):
372
    """Initializes this class.
373

374
    """
375
    self.shared = shared
376
    PipeCondition.__init__(self, lock)
377

    
378

    
379
class SharedLock(object):
380
  """Implements a shared lock.
381

382
  Multiple threads can acquire the lock in a shared way by calling
383
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
384
  threads can call C{acquire(shared=0)}.
385

386
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
387
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
388
  ...]}. Each per-priority queue contains a normal in-order list of conditions
389
  to be notified when the lock can be acquired. Shared locks are grouped
390
  together by priority and the condition for them is stored in
391
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
392
  references for the per-priority queues indexed by priority for faster access.
393

394
  @type name: string
395
  @ivar name: the name of the lock
396

397
  """
398
  __slots__ = [
399
    "__weakref__",
400
    "__deleted",
401
    "__exc",
402
    "__lock",
403
    "__pending",
404
    "__pending_by_prio",
405
    "__pending_shared",
406
    "__shr",
407
    "name",
408
    ]
409

    
410
  __condition_class = _PipeConditionWithMode
411

    
412
  def __init__(self, name, monitor=None):
413
    """Construct a new SharedLock.
414

415
    @param name: the name of the lock
416
    @type monitor: L{LockMonitor}
417
    @param monitor: Lock monitor with which to register
418

419
    """
420
    object.__init__(self)
421

    
422
    self.name = name
423

    
424
    # Internal lock
425
    self.__lock = threading.Lock()
426

    
427
    # Queue containing waiting acquires
428
    self.__pending = []
429
    self.__pending_by_prio = {}
430
    self.__pending_shared = {}
431

    
432
    # Current lock holders
433
    self.__shr = set()
434
    self.__exc = None
435

    
436
    # is this lock in the deleted state?
437
    self.__deleted = False
438

    
439
    # Register with lock monitor
440
    if monitor:
441
      logging.debug("Adding lock %s to monitor", name)
442
      monitor.RegisterLock(self)
443

    
444
  def __repr__(self):
445
    return ("<%s.%s name=%s at %#x>" %
446
            (self.__class__.__module__, self.__class__.__name__,
447
             self.name, id(self)))
448

    
449
  def GetLockInfo(self, requested):
450
    """Retrieves information for querying locks.
451

452
    @type requested: set
453
    @param requested: Requested information, see C{query.LQ_*}
454

455
    """
456
    self.__lock.acquire()
457
    try:
458
      # Note: to avoid unintentional race conditions, no references to
459
      # modifiable objects should be returned unless they were created in this
460
      # function.
461
      mode = None
462
      owner_names = None
463

    
464
      if query.LQ_MODE in requested:
465
        if self.__deleted:
466
          mode = _DELETED_TEXT
467
          assert not (self.__exc or self.__shr)
468
        elif self.__exc:
469
          mode = _EXCLUSIVE_TEXT
470
        elif self.__shr:
471
          mode = _SHARED_TEXT
472

    
473
      # Current owner(s) are wanted
474
      if query.LQ_OWNER in requested:
475
        if self.__exc:
476
          owner = [self.__exc]
477
        else:
478
          owner = self.__shr
479

    
480
        if owner:
481
          assert not self.__deleted
482
          owner_names = [i.getName() for i in owner]
483

    
484
      # Pending acquires are wanted
485
      if query.LQ_PENDING in requested:
486
        pending = []
487

    
488
        # Sorting instead of copying and using heaq functions for simplicity
489
        for (_, prioqueue) in sorted(self.__pending):
490
          for cond in prioqueue:
491
            if cond.shared:
492
              pendmode = _SHARED_TEXT
493
            else:
494
              pendmode = _EXCLUSIVE_TEXT
495

    
496
            # List of names will be sorted in L{query._GetLockPending}
497
            pending.append((pendmode, [i.getName()
498
                                       for i in cond.get_waiting()]))
499
      else:
500
        pending = None
501

    
502
      return [(self.name, mode, owner_names, pending)]
503
    finally:
504
      self.__lock.release()
505

    
506
  def __check_deleted(self):
507
    """Raises an exception if the lock has been deleted.
508

509
    """
510
    if self.__deleted:
511
      raise errors.LockError("Deleted lock %s" % self.name)
512

    
513
  def __is_sharer(self):
514
    """Is the current thread sharing the lock at this time?
515

516
    """
517
    return threading.currentThread() in self.__shr
518

    
519
  def __is_exclusive(self):
520
    """Is the current thread holding the lock exclusively at this time?
521

522
    """
523
    return threading.currentThread() == self.__exc
524

    
525
  def __is_owned(self, shared=-1):
526
    """Is the current thread somehow owning the lock at this time?
527

528
    This is a private version of the function, which presumes you're holding
529
    the internal lock.
530

531
    """
532
    if shared < 0:
533
      return self.__is_sharer() or self.__is_exclusive()
534
    elif shared:
535
      return self.__is_sharer()
536
    else:
537
      return self.__is_exclusive()
538

    
539
  def is_owned(self, shared=-1):
540
    """Is the current thread somehow owning the lock at this time?
541

542
    @param shared:
543
        - < 0: check for any type of ownership (default)
544
        - 0: check for exclusive ownership
545
        - > 0: check for shared ownership
546

547
    """
548
    self.__lock.acquire()
549
    try:
550
      return self.__is_owned(shared=shared)
551
    finally:
552
      self.__lock.release()
553

    
554
  #: Necessary to remain compatible with threading.Condition, which tries to
555
  #: retrieve a locks' "_is_owned" attribute
556
  _is_owned = is_owned
557

    
558
  def _count_pending(self):
559
    """Returns the number of pending acquires.
560

561
    @rtype: int
562

563
    """
564
    self.__lock.acquire()
565
    try:
566
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
567
    finally:
568
      self.__lock.release()
569

    
570
  def _check_empty(self):
571
    """Checks whether there are any pending acquires.
572

573
    @rtype: bool
574

575
    """
576
    self.__lock.acquire()
577
    try:
578
      # Order is important: __find_first_pending_queue modifies __pending
579
      (_, prioqueue) = self.__find_first_pending_queue()
580

    
581
      return not (prioqueue or
582
                  self.__pending or
583
                  self.__pending_by_prio or
584
                  self.__pending_shared)
585
    finally:
586
      self.__lock.release()
587

    
588
  def __do_acquire(self, shared):
589
    """Actually acquire the lock.
590

591
    """
592
    if shared:
593
      self.__shr.add(threading.currentThread())
594
    else:
595
      self.__exc = threading.currentThread()
596

    
597
  def __can_acquire(self, shared):
598
    """Determine whether lock can be acquired.
599

600
    """
601
    if shared:
602
      return self.__exc is None
603
    else:
604
      return len(self.__shr) == 0 and self.__exc is None
605

    
606
  def __find_first_pending_queue(self):
607
    """Tries to find the topmost queued entry with pending acquires.
608

609
    Removes empty entries while going through the list.
610

611
    """
612
    while self.__pending:
613
      (priority, prioqueue) = self.__pending[0]
614

    
615
      if prioqueue:
616
        return (priority, prioqueue)
617

    
618
      # Remove empty queue
619
      heapq.heappop(self.__pending)
620
      del self.__pending_by_prio[priority]
621
      assert priority not in self.__pending_shared
622

    
623
    return (None, None)
624

    
625
  def __is_on_top(self, cond):
626
    """Checks whether the passed condition is on top of the queue.
627

628
    The caller must make sure the queue isn't empty.
629

630
    """
631
    (_, prioqueue) = self.__find_first_pending_queue()
632

    
633
    return cond == prioqueue[0]
634

    
635
  def __acquire_unlocked(self, shared, timeout, priority):
636
    """Acquire a shared lock.
637

638
    @param shared: whether to acquire in shared mode; by default an
639
        exclusive lock will be acquired
640
    @param timeout: maximum waiting time before giving up
641
    @type priority: integer
642
    @param priority: Priority for acquiring lock
643

644
    """
645
    self.__check_deleted()
646

    
647
    # We cannot acquire the lock if we already have it
648
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
649
                                   " %s" % self.name)
650

    
651
    # Remove empty entries from queue
652
    self.__find_first_pending_queue()
653

    
654
    # Check whether someone else holds the lock or there are pending acquires.
655
    if not self.__pending and self.__can_acquire(shared):
656
      # Apparently not, can acquire lock directly.
657
      self.__do_acquire(shared)
658
      return True
659

    
660
    prioqueue = self.__pending_by_prio.get(priority, None)
661

    
662
    if shared:
663
      # Try to re-use condition for shared acquire
664
      wait_condition = self.__pending_shared.get(priority, None)
665
      assert (wait_condition is None or
666
              (wait_condition.shared and wait_condition in prioqueue))
667
    else:
668
      wait_condition = None
669

    
670
    if wait_condition is None:
671
      if prioqueue is None:
672
        assert priority not in self.__pending_by_prio
673

    
674
        prioqueue = []
675
        heapq.heappush(self.__pending, (priority, prioqueue))
676
        self.__pending_by_prio[priority] = prioqueue
677

    
678
      wait_condition = self.__condition_class(self.__lock, shared)
679
      prioqueue.append(wait_condition)
680

    
681
      if shared:
682
        # Keep reference for further shared acquires on same priority. This is
683
        # better than trying to find it in the list of pending acquires.
684
        assert priority not in self.__pending_shared
685
        self.__pending_shared[priority] = wait_condition
686

    
687
    try:
688
      # Wait until we become the topmost acquire in the queue or the timeout
689
      # expires.
690
      # TODO: Decrease timeout with spurious notifications
691
      while not (self.__is_on_top(wait_condition) and
692
                 self.__can_acquire(shared)):
693
        # Wait for notification
694
        wait_condition.wait(timeout)
695
        self.__check_deleted()
696

    
697
        # A lot of code assumes blocking acquires always succeed. Loop
698
        # internally for that case.
699
        if timeout is not None:
700
          break
701

    
702
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
703
        self.__do_acquire(shared)
704
        return True
705
    finally:
706
      # Remove condition from queue if there are no more waiters
707
      if not wait_condition.has_waiting():
708
        prioqueue.remove(wait_condition)
709
        if wait_condition.shared:
710
          # Remove from list of shared acquires if it wasn't while releasing
711
          # (e.g. on lock deletion)
712
          self.__pending_shared.pop(priority, None)
713

    
714
    return False
715

    
716
  def acquire(self, shared=0, timeout=None, priority=None,
717
              test_notify=None):
718
    """Acquire a shared lock.
719

720
    @type shared: integer (0/1) used as a boolean
721
    @param shared: whether to acquire in shared mode; by default an
722
        exclusive lock will be acquired
723
    @type timeout: float
724
    @param timeout: maximum waiting time before giving up
725
    @type priority: integer
726
    @param priority: Priority for acquiring lock
727
    @type test_notify: callable or None
728
    @param test_notify: Special callback function for unittesting
729

730
    """
731
    if priority is None:
732
      priority = _DEFAULT_PRIORITY
733

    
734
    self.__lock.acquire()
735
    try:
736
      # We already got the lock, notify now
737
      if __debug__ and callable(test_notify):
738
        test_notify()
739

    
740
      return self.__acquire_unlocked(shared, timeout, priority)
741
    finally:
742
      self.__lock.release()
743

    
744
  def downgrade(self):
745
    """Changes the lock mode from exclusive to shared.
746

747
    Pending acquires in shared mode on the same priority will go ahead.
748

749
    """
750
    self.__lock.acquire()
751
    try:
752
      assert self.__is_owned(), "Lock must be owned"
753

    
754
      if self.__is_exclusive():
755
        # Do nothing if the lock is already acquired in shared mode
756
        self.__exc = None
757
        self.__do_acquire(1)
758

    
759
        # Important: pending shared acquires should only jump ahead if there
760
        # was a transition from exclusive to shared, otherwise an owner of a
761
        # shared lock can keep calling this function to push incoming shared
762
        # acquires
763
        (priority, prioqueue) = self.__find_first_pending_queue()
764
        if prioqueue:
765
          # Is there a pending shared acquire on this priority?
766
          cond = self.__pending_shared.pop(priority, None)
767
          if cond:
768
            assert cond.shared
769
            assert cond in prioqueue
770

    
771
            # Ensure shared acquire is on top of queue
772
            if len(prioqueue) > 1:
773
              prioqueue.remove(cond)
774
              prioqueue.insert(0, cond)
775

    
776
            # Notify
777
            cond.notifyAll()
778

    
779
      assert not self.__is_exclusive()
780
      assert self.__is_sharer()
781

    
782
      return True
783
    finally:
784
      self.__lock.release()
785

    
786
  def release(self):
787
    """Release a Shared Lock.
788

789
    You must have acquired the lock, either in shared or in exclusive mode,
790
    before calling this function.
791

792
    """
793
    self.__lock.acquire()
794
    try:
795
      assert self.__is_exclusive() or self.__is_sharer(), \
796
        "Cannot release non-owned lock"
797

    
798
      # Autodetect release type
799
      if self.__is_exclusive():
800
        self.__exc = None
801
      else:
802
        self.__shr.remove(threading.currentThread())
803

    
804
      # Notify topmost condition in queue
805
      (priority, prioqueue) = self.__find_first_pending_queue()
806
      if prioqueue:
807
        cond = prioqueue[0]
808
        cond.notifyAll()
809
        if cond.shared:
810
          # Prevent further shared acquires from sneaking in while waiters are
811
          # notified
812
          self.__pending_shared.pop(priority, None)
813

    
814
    finally:
815
      self.__lock.release()
816

    
817
  def delete(self, timeout=None, priority=None):
818
    """Delete a Shared Lock.
819

820
    This operation will declare the lock for removal. First the lock will be
821
    acquired in exclusive mode if you don't already own it, then the lock
822
    will be put in a state where any future and pending acquire() fail.
823

824
    @type timeout: float
825
    @param timeout: maximum waiting time before giving up
826
    @type priority: integer
827
    @param priority: Priority for acquiring lock
828

829
    """
830
    if priority is None:
831
      priority = _DEFAULT_PRIORITY
832

    
833
    self.__lock.acquire()
834
    try:
835
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
836

    
837
      self.__check_deleted()
838

    
839
      # The caller is allowed to hold the lock exclusively already.
840
      acquired = self.__is_exclusive()
841

    
842
      if not acquired:
843
        acquired = self.__acquire_unlocked(0, timeout, priority)
844

    
845
        assert self.__is_exclusive() and not self.__is_sharer(), \
846
          "Lock wasn't acquired in exclusive mode"
847

    
848
      if acquired:
849
        self.__deleted = True
850
        self.__exc = None
851

    
852
        assert not (self.__exc or self.__shr), "Found owner during deletion"
853

    
854
        # Notify all acquires. They'll throw an error.
855
        for (_, prioqueue) in self.__pending:
856
          for cond in prioqueue:
857
            cond.notifyAll()
858

    
859
        assert self.__deleted
860

    
861
      return acquired
862
    finally:
863
      self.__lock.release()
864

    
865
  def _release_save(self):
866
    shared = self.__is_sharer()
867
    self.release()
868
    return shared
869

    
870
  def _acquire_restore(self, shared):
871
    self.acquire(shared=shared)
872

    
873

    
874
# Whenever we want to acquire a full LockSet we pass None as the value
875
# to acquire.  Hide this behind this nicely named constant.
876
ALL_SET = None
877

    
878

    
879
class _AcquireTimeout(Exception):
880
  """Internal exception to abort an acquire on a timeout.
881

882
  """
883

    
884

    
885
class LockSet:
886
  """Implements a set of locks.
887

888
  This abstraction implements a set of shared locks for the same resource type,
889
  distinguished by name. The user can lock a subset of the resources and the
890
  LockSet will take care of acquiring the locks always in the same order, thus
891
  preventing deadlock.
892

893
  All the locks needed in the same set must be acquired together, though.
894

895
  @type name: string
896
  @ivar name: the name of the lockset
897

898
  """
899
  def __init__(self, members, name, monitor=None):
900
    """Constructs a new LockSet.
901

902
    @type members: list of strings
903
    @param members: initial members of the set
904
    @type monitor: L{LockMonitor}
905
    @param monitor: Lock monitor with which to register member locks
906

907
    """
908
    assert members is not None, "members parameter is not a list"
909
    self.name = name
910

    
911
    # Lock monitor
912
    self.__monitor = monitor
913

    
914
    # Used internally to guarantee coherency
915
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
916

    
917
    # The lockdict indexes the relationship name -> lock
918
    # The order-of-locking is implied by the alphabetical order of names
919
    self.__lockdict = {}
920

    
921
    for mname in members:
922
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
923
                                          monitor=monitor)
924

    
925
    # The owner dict contains the set of locks each thread owns. For
926
    # performance each thread can access its own key without a global lock on
927
    # this structure. It is paramount though that *no* other type of access is
928
    # done to this structure (eg. no looping over its keys). *_owner helper
929
    # function are defined to guarantee access is correct, but in general never
930
    # do anything different than __owners[threading.currentThread()], or there
931
    # will be trouble.
932
    self.__owners = {}
933

    
934
  def _GetLockName(self, mname):
935
    """Returns the name for a member lock.
936

937
    """
938
    return "%s/%s" % (self.name, mname)
939

    
940
  def _get_lock(self):
941
    """Returns the lockset-internal lock.
942

943
    """
944
    return self.__lock
945

    
946
  def _get_lockdict(self):
947
    """Returns the lockset-internal lock dictionary.
948

949
    Accessing this structure is only safe in single-thread usage or when the
950
    lockset-internal lock is held.
951

952
    """
953
    return self.__lockdict
954

    
955
  def is_owned(self):
956
    """Is the current thread a current level owner?
957

958
    @note: Use L{check_owned} to check if a specific lock is held
959

960
    """
961
    return threading.currentThread() in self.__owners
962

    
963
  def check_owned(self, names, shared=-1):
964
    """Check if locks are owned in a specific mode.
965

966
    @type names: sequence or string
967
    @param names: Lock names (or a single lock name)
968
    @param shared: See L{SharedLock.is_owned}
969
    @rtype: bool
970
    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
971
      L{list_owned} to get the names of all owned locks
972

973
    """
974
    if isinstance(names, basestring):
975
      names = [names]
976

    
977
    # Avoid check if no locks are owned anyway
978
    if names and self.is_owned():
979
      candidates = []
980

    
981
      # Gather references to all locks (in case they're deleted in the meantime)
982
      for lname in names:
983
        try:
984
          lock = self.__lockdict[lname]
985
        except KeyError:
986
          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
987
                                 " have been removed)" % (lname, self.name))
988
        else:
989
          candidates.append(lock)
990

    
991
      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
992
    else:
993
      return False
994

    
995
  def _add_owned(self, name=None):
996
    """Note the current thread owns the given lock"""
997
    if name is None:
998
      if not self.is_owned():
999
        self.__owners[threading.currentThread()] = set()
1000
    else:
1001
      if self.is_owned():
1002
        self.__owners[threading.currentThread()].add(name)
1003
      else:
1004
        self.__owners[threading.currentThread()] = set([name])
1005

    
1006
  def _del_owned(self, name=None):
1007
    """Note the current thread owns the given lock"""
1008

    
1009
    assert not (name is None and self.__lock.is_owned()), \
1010
           "Cannot hold internal lock when deleting owner status"
1011

    
1012
    if name is not None:
1013
      self.__owners[threading.currentThread()].remove(name)
1014

    
1015
    # Only remove the key if we don't hold the set-lock as well
1016
    if (not self.__lock.is_owned() and
1017
        not self.__owners[threading.currentThread()]):
1018
      del self.__owners[threading.currentThread()]
1019

    
1020
  def list_owned(self):
1021
    """Get the set of resource names owned by the current thread"""
1022
    if self.is_owned():
1023
      return self.__owners[threading.currentThread()].copy()
1024
    else:
1025
      return set()
1026

    
1027
  def _release_and_delete_owned(self):
1028
    """Release and delete all resources owned by the current thread"""
1029
    for lname in self.list_owned():
1030
      lock = self.__lockdict[lname]
1031
      if lock.is_owned():
1032
        lock.release()
1033
      self._del_owned(name=lname)
1034

    
1035
  def __names(self):
1036
    """Return the current set of names.
1037

1038
    Only call this function while holding __lock and don't iterate on the
1039
    result after releasing the lock.
1040

1041
    """
1042
    return self.__lockdict.keys()
1043

    
1044
  def _names(self):
1045
    """Return a copy of the current set of elements.
1046

1047
    Used only for debugging purposes.
1048

1049
    """
1050
    # If we don't already own the set-level lock acquired
1051
    # we'll get it and note we need to release it later.
1052
    release_lock = False
1053
    if not self.__lock.is_owned():
1054
      release_lock = True
1055
      self.__lock.acquire(shared=1)
1056
    try:
1057
      result = self.__names()
1058
    finally:
1059
      if release_lock:
1060
        self.__lock.release()
1061
    return set(result)
1062

    
1063
  def acquire(self, names, timeout=None, shared=0, priority=None,
1064
              test_notify=None):
1065
    """Acquire a set of resource locks.
1066

1067
    @type names: list of strings (or string)
1068
    @param names: the names of the locks which shall be acquired
1069
        (special lock names, or instance/node names)
1070
    @type shared: integer (0/1) used as a boolean
1071
    @param shared: whether to acquire in shared mode; by default an
1072
        exclusive lock will be acquired
1073
    @type timeout: float or None
1074
    @param timeout: Maximum time to acquire all locks
1075
    @type priority: integer
1076
    @param priority: Priority for acquiring locks
1077
    @type test_notify: callable or None
1078
    @param test_notify: Special callback function for unittesting
1079

1080
    @return: Set of all locks successfully acquired or None in case of timeout
1081

1082
    @raise errors.LockError: when any lock we try to acquire has
1083
        been deleted before we succeed. In this case none of the
1084
        locks requested will be acquired.
1085

1086
    """
1087
    assert timeout is None or timeout >= 0.0
1088

    
1089
    # Check we don't already own locks at this level
1090
    assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1091
                                 " (lockset %s)" % self.name)
1092

    
1093
    if priority is None:
1094
      priority = _DEFAULT_PRIORITY
1095

    
1096
    # We need to keep track of how long we spent waiting for a lock. The
1097
    # timeout passed to this function is over all lock acquires.
1098
    running_timeout = utils.RunningTimeout(timeout, False)
1099

    
1100
    try:
1101
      if names is not None:
1102
        # Support passing in a single resource to acquire rather than many
1103
        if isinstance(names, basestring):
1104
          names = [names]
1105

    
1106
        return self.__acquire_inner(names, False, shared, priority,
1107
                                    running_timeout.Remaining, test_notify)
1108

    
1109
      else:
1110
        # If no names are given acquire the whole set by not letting new names
1111
        # being added before we release, and getting the current list of names.
1112
        # Some of them may then be deleted later, but we'll cope with this.
1113
        #
1114
        # We'd like to acquire this lock in a shared way, as it's nice if
1115
        # everybody else can use the instances at the same time. If we are
1116
        # acquiring them exclusively though they won't be able to do this
1117
        # anyway, though, so we'll get the list lock exclusively as well in
1118
        # order to be able to do add() on the set while owning it.
1119
        if not self.__lock.acquire(shared=shared, priority=priority,
1120
                                   timeout=running_timeout.Remaining()):
1121
          raise _AcquireTimeout()
1122
        try:
1123
          # note we own the set-lock
1124
          self._add_owned()
1125

    
1126
          return self.__acquire_inner(self.__names(), True, shared, priority,
1127
                                      running_timeout.Remaining, test_notify)
1128
        except:
1129
          # We shouldn't have problems adding the lock to the owners list, but
1130
          # if we did we'll try to release this lock and re-raise exception.
1131
          # Of course something is going to be really wrong, after this.
1132
          self.__lock.release()
1133
          self._del_owned()
1134
          raise
1135

    
1136
    except _AcquireTimeout:
1137
      return None
1138

    
1139
  def __acquire_inner(self, names, want_all, shared, priority,
1140
                      timeout_fn, test_notify):
1141
    """Inner logic for acquiring a number of locks.
1142

1143
    @param names: Names of the locks to be acquired
1144
    @param want_all: Whether all locks in the set should be acquired
1145
    @param shared: Whether to acquire in shared mode
1146
    @param timeout_fn: Function returning remaining timeout
1147
    @param priority: Priority for acquiring locks
1148
    @param test_notify: Special callback function for unittesting
1149

1150
    """
1151
    acquire_list = []
1152

    
1153
    # First we look the locks up on __lockdict. We have no way of being sure
1154
    # they will still be there after, but this makes it a lot faster should
1155
    # just one of them be the already wrong. Using a sorted sequence to prevent
1156
    # deadlocks.
1157
    for lname in sorted(utils.UniqueSequence(names)):
1158
      try:
1159
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1160
      except KeyError:
1161
        if want_all:
1162
          # We are acquiring all the set, it doesn't matter if this particular
1163
          # element is not there anymore.
1164
          continue
1165

    
1166
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1167
                               " been removed)" % (lname, self.name))
1168

    
1169
      acquire_list.append((lname, lock))
1170

    
1171
    # This will hold the locknames we effectively acquired.
1172
    acquired = set()
1173

    
1174
    try:
1175
      # Now acquire_list contains a sorted list of resources and locks we
1176
      # want.  In order to get them we loop on this (private) list and
1177
      # acquire() them.  We gave no real guarantee they will still exist till
1178
      # this is done but .acquire() itself is safe and will alert us if the
1179
      # lock gets deleted.
1180
      for (lname, lock) in acquire_list:
1181
        if __debug__ and callable(test_notify):
1182
          test_notify_fn = lambda: test_notify(lname)
1183
        else:
1184
          test_notify_fn = None
1185

    
1186
        timeout = timeout_fn()
1187

    
1188
        try:
1189
          # raises LockError if the lock was deleted
1190
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1191
                                     priority=priority,
1192
                                     test_notify=test_notify_fn)
1193
        except errors.LockError:
1194
          if want_all:
1195
            # We are acquiring all the set, it doesn't matter if this
1196
            # particular element is not there anymore.
1197
            continue
1198

    
1199
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1200
                                 " have been removed)" % (lname, self.name))
1201

    
1202
        if not acq_success:
1203
          # Couldn't get lock or timeout occurred
1204
          if timeout is None:
1205
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1206
            # blocking.
1207
            raise errors.LockError("Failed to get lock %s (set %s)" %
1208
                                   (lname, self.name))
1209

    
1210
          raise _AcquireTimeout()
1211

    
1212
        try:
1213
          # now the lock cannot be deleted, we have it!
1214
          self._add_owned(name=lname)
1215
          acquired.add(lname)
1216

    
1217
        except:
1218
          # We shouldn't have problems adding the lock to the owners list, but
1219
          # if we did we'll try to release this lock and re-raise exception.
1220
          # Of course something is going to be really wrong after this.
1221
          if lock.is_owned():
1222
            lock.release()
1223
          raise
1224

    
1225
    except:
1226
      # Release all owned locks
1227
      self._release_and_delete_owned()
1228
      raise
1229

    
1230
    return acquired
1231

    
1232
  def downgrade(self, names=None):
1233
    """Downgrade a set of resource locks from exclusive to shared mode.
1234

1235
    The locks must have been acquired in exclusive mode.
1236

1237
    """
1238
    assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1239
                             " lock" % self.name)
1240

    
1241
    # Support passing in a single resource to downgrade rather than many
1242
    if isinstance(names, basestring):
1243
      names = [names]
1244

    
1245
    owned = self.list_owned()
1246

    
1247
    if names is None:
1248
      names = owned
1249
    else:
1250
      names = set(names)
1251
      assert owned.issuperset(names), \
1252
        ("downgrade() on unheld resources %s (set %s)" %
1253
         (names.difference(owned), self.name))
1254

    
1255
    for lockname in names:
1256
      self.__lockdict[lockname].downgrade()
1257

    
1258
    # Do we own the lockset in exclusive mode?
1259
    if self.__lock.is_owned(shared=0):
1260
      # Have all locks been downgraded?
1261
      if not compat.any(lock.is_owned(shared=0)
1262
                        for lock in self.__lockdict.values()):
1263
        self.__lock.downgrade()
1264
        assert self.__lock.is_owned(shared=1)
1265

    
1266
    return True
1267

    
1268
  def release(self, names=None):
1269
    """Release a set of resource locks, at the same level.
1270

1271
    You must have acquired the locks, either in shared or in exclusive mode,
1272
    before releasing them.
1273

1274
    @type names: list of strings, or None
1275
    @param names: the names of the locks which shall be released
1276
        (defaults to all the locks acquired at that level).
1277

1278
    """
1279
    assert self.is_owned(), ("release() on lock set %s while not owner" %
1280
                             self.name)
1281

    
1282
    # Support passing in a single resource to release rather than many
1283
    if isinstance(names, basestring):
1284
      names = [names]
1285

    
1286
    if names is None:
1287
      names = self.list_owned()
1288
    else:
1289
      names = set(names)
1290
      assert self.list_owned().issuperset(names), (
1291
               "release() on unheld resources %s (set %s)" %
1292
               (names.difference(self.list_owned()), self.name))
1293

    
1294
    # First of all let's release the "all elements" lock, if set.
1295
    # After this 'add' can work again
1296
    if self.__lock.is_owned():
1297
      self.__lock.release()
1298
      self._del_owned()
1299

    
1300
    for lockname in names:
1301
      # If we are sure the lock doesn't leave __lockdict without being
1302
      # exclusively held we can do this...
1303
      self.__lockdict[lockname].release()
1304
      self._del_owned(name=lockname)
1305

    
1306
  def add(self, names, acquired=0, shared=0):
1307
    """Add a new set of elements to the set
1308

1309
    @type names: list of strings
1310
    @param names: names of the new elements to add
1311
    @type acquired: integer (0/1) used as a boolean
1312
    @param acquired: pre-acquire the new resource?
1313
    @type shared: integer (0/1) used as a boolean
1314
    @param shared: is the pre-acquisition shared?
1315

1316
    """
1317
    # Check we don't already own locks at this level
1318
    assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1319
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1320
       self.name)
1321

    
1322
    # Support passing in a single resource to add rather than many
1323
    if isinstance(names, basestring):
1324
      names = [names]
1325

    
1326
    # If we don't already own the set-level lock acquired in an exclusive way
1327
    # we'll get it and note we need to release it later.
1328
    release_lock = False
1329
    if not self.__lock.is_owned():
1330
      release_lock = True
1331
      self.__lock.acquire()
1332

    
1333
    try:
1334
      invalid_names = set(self.__names()).intersection(names)
1335
      if invalid_names:
1336
        # This must be an explicit raise, not an assert, because assert is
1337
        # turned off when using optimization, and this can happen because of
1338
        # concurrency even if the user doesn't want it.
1339
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1340
                               (invalid_names, self.name))
1341

    
1342
      for lockname in names:
1343
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1344

    
1345
        if acquired:
1346
          # No need for priority or timeout here as this lock has just been
1347
          # created
1348
          lock.acquire(shared=shared)
1349
          # now the lock cannot be deleted, we have it!
1350
          try:
1351
            self._add_owned(name=lockname)
1352
          except:
1353
            # We shouldn't have problems adding the lock to the owners list,
1354
            # but if we did we'll try to release this lock and re-raise
1355
            # exception.  Of course something is going to be really wrong,
1356
            # after this.  On the other hand the lock hasn't been added to the
1357
            # __lockdict yet so no other threads should be pending on it. This
1358
            # release is just a safety measure.
1359
            lock.release()
1360
            raise
1361

    
1362
        self.__lockdict[lockname] = lock
1363

    
1364
    finally:
1365
      # Only release __lock if we were not holding it previously.
1366
      if release_lock:
1367
        self.__lock.release()
1368

    
1369
    return True
1370

    
1371
  def remove(self, names):
1372
    """Remove elements from the lock set.
1373

1374
    You can either not hold anything in the lockset or already hold a superset
1375
    of the elements you want to delete, exclusively.
1376

1377
    @type names: list of strings
1378
    @param names: names of the resource to remove.
1379

1380
    @return: a list of locks which we removed; the list is always
1381
        equal to the names list if we were holding all the locks
1382
        exclusively
1383

1384
    """
1385
    # Support passing in a single resource to remove rather than many
1386
    if isinstance(names, basestring):
1387
      names = [names]
1388

    
1389
    # If we own any subset of this lock it must be a superset of what we want
1390
    # to delete. The ownership must also be exclusive, but that will be checked
1391
    # by the lock itself.
1392
    assert not self.is_owned() or self.list_owned().issuperset(names), (
1393
      "remove() on acquired lockset %s while not owning all elements" %
1394
      self.name)
1395

    
1396
    removed = []
1397

    
1398
    for lname in names:
1399
      # Calling delete() acquires the lock exclusively if we don't already own
1400
      # it, and causes all pending and subsequent lock acquires to fail. It's
1401
      # fine to call it out of order because delete() also implies release(),
1402
      # and the assertion above guarantees that if we either already hold
1403
      # everything we want to delete, or we hold none.
1404
      try:
1405
        self.__lockdict[lname].delete()
1406
        removed.append(lname)
1407
      except (KeyError, errors.LockError):
1408
        # This cannot happen if we were already holding it, verify:
1409
        assert not self.is_owned(), ("remove failed while holding lockset %s" %
1410
                                     self.name)
1411
      else:
1412
        # If no LockError was raised we are the ones who deleted the lock.
1413
        # This means we can safely remove it from lockdict, as any further or
1414
        # pending delete() or acquire() will fail (and nobody can have the lock
1415
        # since before our call to delete()).
1416
        #
1417
        # This is done in an else clause because if the exception was thrown
1418
        # it's the job of the one who actually deleted it.
1419
        del self.__lockdict[lname]
1420
        # And let's remove it from our private list if we owned it.
1421
        if self.is_owned():
1422
          self._del_owned(name=lname)
1423

    
1424
    return removed
1425

    
1426

    
1427
# Locking levels, must be acquired in increasing order.
1428
# Current rules are:
1429
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1430
#   acquired before performing any operation, either in shared or in exclusive
1431
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1432
#   avoided.
1433
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1434
#   If you need more than one node, or more than one instance, acquire them at
1435
#   the same time.
1436
LEVEL_CLUSTER = 0
1437
LEVEL_INSTANCE = 1
1438
LEVEL_NODEGROUP = 2
1439
LEVEL_NODE = 3
1440
LEVEL_NODE_RES = 4
1441

    
1442
LEVELS = [
1443
  LEVEL_CLUSTER,
1444
  LEVEL_INSTANCE,
1445
  LEVEL_NODEGROUP,
1446
  LEVEL_NODE,
1447
  LEVEL_NODE_RES,
1448
  ]
1449

    
1450
# Lock levels which are modifiable
1451
LEVELS_MOD = frozenset([
1452
  LEVEL_NODE_RES,
1453
  LEVEL_NODE,
1454
  LEVEL_NODEGROUP,
1455
  LEVEL_INSTANCE,
1456
  ])
1457

    
1458
#: Lock level names (make sure to use singular form)
1459
LEVEL_NAMES = {
1460
  LEVEL_CLUSTER: "cluster",
1461
  LEVEL_INSTANCE: "instance",
1462
  LEVEL_NODEGROUP: "nodegroup",
1463
  LEVEL_NODE: "node",
1464
  LEVEL_NODE_RES: "node-res",
1465
  }
1466

    
1467
# Constant for the big ganeti lock
1468
BGL = "BGL"
1469

    
1470

    
1471
class GanetiLockManager:
1472
  """The Ganeti Locking Library
1473

1474
  The purpose of this small library is to manage locking for ganeti clusters
1475
  in a central place, while at the same time doing dynamic checks against
1476
  possible deadlocks. It will also make it easier to transition to a different
1477
  lock type should we migrate away from python threads.
1478

1479
  """
1480
  _instance = None
1481

    
1482
  def __init__(self, nodes, nodegroups, instances):
1483
    """Constructs a new GanetiLockManager object.
1484

1485
    There should be only a GanetiLockManager object at any time, so this
1486
    function raises an error if this is not the case.
1487

1488
    @param nodes: list of node names
1489
    @param nodegroups: list of nodegroup uuids
1490
    @param instances: list of instance names
1491

1492
    """
1493
    assert self.__class__._instance is None, \
1494
           "double GanetiLockManager instance"
1495

    
1496
    self.__class__._instance = self
1497

    
1498
    self._monitor = LockMonitor()
1499

    
1500
    # The keyring contains all the locks, at their level and in the correct
1501
    # locking order.
1502
    self.__keyring = {
1503
      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1504
      LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1505
      LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1506
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1507
      LEVEL_INSTANCE: LockSet(instances, "instance",
1508
                              monitor=self._monitor),
1509
      }
1510

    
1511
    assert compat.all(ls.name == LEVEL_NAMES[level]
1512
                      for (level, ls) in self.__keyring.items())
1513

    
1514
  def AddToLockMonitor(self, provider):
1515
    """Registers a new lock with the monitor.
1516

1517
    See L{LockMonitor.RegisterLock}.
1518

1519
    """
1520
    return self._monitor.RegisterLock(provider)
1521

    
1522
  def QueryLocks(self, fields):
1523
    """Queries information from all locks.
1524

1525
    See L{LockMonitor.QueryLocks}.
1526

1527
    """
1528
    return self._monitor.QueryLocks(fields)
1529

    
1530
  def OldStyleQueryLocks(self, fields):
1531
    """Queries information from all locks, returning old-style data.
1532

1533
    See L{LockMonitor.OldStyleQueryLocks}.
1534

1535
    """
1536
    return self._monitor.OldStyleQueryLocks(fields)
1537

    
1538
  def _names(self, level):
1539
    """List the lock names at the given level.
1540

1541
    This can be used for debugging/testing purposes.
1542

1543
    @param level: the level whose list of locks to get
1544

1545
    """
1546
    assert level in LEVELS, "Invalid locking level %s" % level
1547
    return self.__keyring[level]._names()
1548

    
1549
  def is_owned(self, level):
1550
    """Check whether we are owning locks at the given level
1551

1552
    """
1553
    return self.__keyring[level].is_owned()
1554

    
1555
  def list_owned(self, level):
1556
    """Get the set of owned locks at the given level
1557

1558
    """
1559
    return self.__keyring[level].list_owned()
1560

    
1561
  def check_owned(self, level, names, shared=-1):
1562
    """Check if locks at a certain level are owned in a specific mode.
1563

1564
    @see: L{LockSet.check_owned}
1565

1566
    """
1567
    return self.__keyring[level].check_owned(names, shared=shared)
1568

    
1569
  def _upper_owned(self, level):
1570
    """Check that we don't own any lock at a level greater than the given one.
1571

1572
    """
1573
    # This way of checking only works if LEVELS[i] = i, which we check for in
1574
    # the test cases.
1575
    return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1576

    
1577
  def _BGL_owned(self): # pylint: disable=C0103
1578
    """Check if the current thread owns the BGL.
1579

1580
    Both an exclusive or a shared acquisition work.
1581

1582
    """
1583
    return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1584

    
1585
  @staticmethod
1586
  def _contains_BGL(level, names): # pylint: disable=C0103
1587
    """Check if the level contains the BGL.
1588

1589
    Check if acting on the given level and set of names will change
1590
    the status of the Big Ganeti Lock.
1591

1592
    """
1593
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1594

    
1595
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1596
    """Acquire a set of resource locks, at the same level.
1597

1598
    @type level: member of locking.LEVELS
1599
    @param level: the level at which the locks shall be acquired
1600
    @type names: list of strings (or string)
1601
    @param names: the names of the locks which shall be acquired
1602
        (special lock names, or instance/node names)
1603
    @type shared: integer (0/1) used as a boolean
1604
    @param shared: whether to acquire in shared mode; by default
1605
        an exclusive lock will be acquired
1606
    @type timeout: float
1607
    @param timeout: Maximum time to acquire all locks
1608
    @type priority: integer
1609
    @param priority: Priority for acquiring lock
1610

1611
    """
1612
    assert level in LEVELS, "Invalid locking level %s" % level
1613

    
1614
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1615
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1616
    # so even if we've migrated we need to at least share the BGL to be
1617
    # compatible with them. Of course if we own the BGL exclusively there's no
1618
    # point in acquiring any other lock, unless perhaps we are half way through
1619
    # the migration of the current opcode.
1620
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1621
            "You must own the Big Ganeti Lock before acquiring any other")
1622

    
1623
    # Check we don't own locks at the same or upper levels.
1624
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1625
           " while owning some at a greater one")
1626

    
1627
    # Acquire the locks in the set.
1628
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1629
                                         priority=priority)
1630

    
1631
  def downgrade(self, level, names=None):
1632
    """Downgrade a set of resource locks from exclusive to shared mode.
1633

1634
    You must have acquired the locks in exclusive mode.
1635

1636
    @type level: member of locking.LEVELS
1637
    @param level: the level at which the locks shall be downgraded
1638
    @type names: list of strings, or None
1639
    @param names: the names of the locks which shall be downgraded
1640
        (defaults to all the locks acquired at the level)
1641

1642
    """
1643
    assert level in LEVELS, "Invalid locking level %s" % level
1644

    
1645
    return self.__keyring[level].downgrade(names=names)
1646

    
1647
  def release(self, level, names=None):
1648
    """Release a set of resource locks, at the same level.
1649

1650
    You must have acquired the locks, either in shared or in exclusive
1651
    mode, before releasing them.
1652

1653
    @type level: member of locking.LEVELS
1654
    @param level: the level at which the locks shall be released
1655
    @type names: list of strings, or None
1656
    @param names: the names of the locks which shall be released
1657
        (defaults to all the locks acquired at that level)
1658

1659
    """
1660
    assert level in LEVELS, "Invalid locking level %s" % level
1661
    assert (not self._contains_BGL(level, names) or
1662
            not self._upper_owned(LEVEL_CLUSTER)), (
1663
            "Cannot release the Big Ganeti Lock while holding something"
1664
            " at upper levels (%r)" %
1665
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1666
                              for i in self.__keyring.keys()]), ))
1667

    
1668
    # Release will complain if we don't own the locks already
1669
    return self.__keyring[level].release(names)
1670

    
1671
  def add(self, level, names, acquired=0, shared=0):
1672
    """Add locks at the specified level.
1673

1674
    @type level: member of locking.LEVELS_MOD
1675
    @param level: the level at which the locks shall be added
1676
    @type names: list of strings
1677
    @param names: names of the locks to acquire
1678
    @type acquired: integer (0/1) used as a boolean
1679
    @param acquired: whether to acquire the newly added locks
1680
    @type shared: integer (0/1) used as a boolean
1681
    @param shared: whether the acquisition will be shared
1682

1683
    """
1684
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1685
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1686
           " operations")
1687
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1688
           " while owning some at a greater one")
1689
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1690

    
1691
  def remove(self, level, names):
1692
    """Remove locks from the specified level.
1693

1694
    You must either already own the locks you are trying to remove
1695
    exclusively or not own any lock at an upper level.
1696

1697
    @type level: member of locking.LEVELS_MOD
1698
    @param level: the level at which the locks shall be removed
1699
    @type names: list of strings
1700
    @param names: the names of the locks which shall be removed
1701
        (special lock names, or instance/node names)
1702

1703
    """
1704
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1705
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1706
           " operations")
1707
    # Check we either own the level or don't own anything from here
1708
    # up. LockSet.remove() will check the case in which we don't own
1709
    # all the needed resources, or we have a shared ownership.
1710
    assert self.is_owned(level) or not self._upper_owned(level), (
1711
           "Cannot remove locks at a level while not owning it or"
1712
           " owning some at a greater one")
1713
    return self.__keyring[level].remove(names)
1714

    
1715

    
1716
def _MonitorSortKey((item, idx, num)):
1717
  """Sorting key function.
1718

1719
  Sort by name, registration order and then order of information. This provides
1720
  a stable sort order over different providers, even if they return the same
1721
  name.
1722

1723
  """
1724
  (name, _, _, _) = item
1725

    
1726
  return (utils.NiceSortKey(name), num, idx)
1727

    
1728

    
1729
class LockMonitor(object):
1730
  _LOCK_ATTR = "_lock"
1731

    
1732
  def __init__(self):
1733
    """Initializes this class.
1734

1735
    """
1736
    self._lock = SharedLock("LockMonitor")
1737

    
1738
    # Counter for stable sorting
1739
    self._counter = itertools.count(0)
1740

    
1741
    # Tracked locks. Weak references are used to avoid issues with circular
1742
    # references and deletion.
1743
    self._locks = weakref.WeakKeyDictionary()
1744

    
1745
  @ssynchronized(_LOCK_ATTR)
1746
  def RegisterLock(self, provider):
1747
    """Registers a new lock.
1748

1749
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1750
      a single C{set} containing the requested information items
1751
    @note: It would be nicer to only receive the function generating the
1752
      requested information but, as it turns out, weak references to bound
1753
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1754
      workarounds, but none of the ones I found works properly in combination
1755
      with a standard C{WeakKeyDictionary}
1756

1757
    """
1758
    assert provider not in self._locks, "Duplicate registration"
1759

    
1760
    # There used to be a check for duplicate names here. As it turned out, when
1761
    # a lock is re-created with the same name in a very short timeframe, the
1762
    # previous instance might not yet be removed from the weakref dictionary.
1763
    # By keeping track of the order of incoming registrations, a stable sort
1764
    # ordering can still be guaranteed.
1765

    
1766
    self._locks[provider] = self._counter.next()
1767

    
1768
  def _GetLockInfo(self, requested):
1769
    """Get information from all locks.
1770

1771
    """
1772
    # Must hold lock while getting consistent list of tracked items
1773
    self._lock.acquire(shared=1)
1774
    try:
1775
      items = self._locks.items()
1776
    finally:
1777
      self._lock.release()
1778

    
1779
    return [(info, idx, num)
1780
            for (provider, num) in items
1781
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1782

    
1783
  def _Query(self, fields):
1784
    """Queries information from all locks.
1785

1786
    @type fields: list of strings
1787
    @param fields: List of fields to return
1788

1789
    """
1790
    qobj = query.Query(query.LOCK_FIELDS, fields)
1791

    
1792
    # Get all data with internal lock held and then sort by name and incoming
1793
    # order
1794
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1795
                      key=_MonitorSortKey)
1796

    
1797
    # Extract lock information and build query data
1798
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1799

    
1800
  def QueryLocks(self, fields):
1801
    """Queries information from all locks.
1802

1803
    @type fields: list of strings
1804
    @param fields: List of fields to return
1805

1806
    """
1807
    (qobj, ctx) = self._Query(fields)
1808

    
1809
    # Prepare query response
1810
    return query.GetQueryResponse(qobj, ctx)
1811

    
1812
  def OldStyleQueryLocks(self, fields):
1813
    """Queries information from all locks, returning old-style data.
1814

1815
    @type fields: list of strings
1816
    @param fields: List of fields to return
1817

1818
    """
1819
    (qobj, ctx) = self._Query(fields)
1820

    
1821
    return qobj.OldStyleQuery(ctx)