Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 20699809

History | View | Annotate | Download (53.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40
from ganeti import query
41

    
42

    
43
_EXCLUSIVE_TEXT = "exclusive"
44
_SHARED_TEXT = "shared"
45
_DELETED_TEXT = "deleted"
46

    
47
_DEFAULT_PRIORITY = 0
48

    
49

    
50
def ssynchronized(mylock, shared=0):
51
  """Shared Synchronization decorator.
52

53
  Calls the function holding the given lock, either in exclusive or shared
54
  mode. It requires the passed lock to be a SharedLock (or support its
55
  semantics).
56

57
  @type mylock: lockable object or string
58
  @param mylock: lock to acquire or class member name of the lock to acquire
59

60
  """
61
  def wrap(fn):
62
    def sync_function(*args, **kwargs):
63
      if isinstance(mylock, basestring):
64
        assert args, "cannot ssynchronize on non-class method: self not found"
65
        # args[0] is "self"
66
        lock = getattr(args[0], mylock)
67
      else:
68
        lock = mylock
69
      lock.acquire(shared=shared)
70
      try:
71
        return fn(*args, **kwargs)
72
      finally:
73
        lock.release()
74
    return sync_function
75
  return wrap
76

    
77

    
78
class _SingleNotifyPipeConditionWaiter(object):
79
  """Helper class for SingleNotifyPipeCondition
80

81
  """
82
  __slots__ = [
83
    "_fd",
84
    "_poller",
85
    ]
86

    
87
  def __init__(self, poller, fd):
88
    """Constructor for _SingleNotifyPipeConditionWaiter
89

90
    @type poller: select.poll
91
    @param poller: Poller object
92
    @type fd: int
93
    @param fd: File descriptor to wait for
94

95
    """
96
    object.__init__(self)
97
    self._poller = poller
98
    self._fd = fd
99

    
100
  def __call__(self, timeout):
101
    """Wait for something to happen on the pipe.
102

103
    @type timeout: float or None
104
    @param timeout: Timeout for waiting (can be None)
105

106
    """
107
    running_timeout = utils.RunningTimeout(timeout, True)
108

    
109
    while True:
110
      remaining_time = running_timeout.Remaining()
111

    
112
      if remaining_time is not None:
113
        if remaining_time < 0.0:
114
          break
115

    
116
        # Our calculation uses seconds, poll() wants milliseconds
117
        remaining_time *= 1000
118

    
119
      try:
120
        result = self._poller.poll(remaining_time)
121
      except EnvironmentError, err:
122
        if err.errno != errno.EINTR:
123
          raise
124
        result = None
125

    
126
      # Check whether we were notified
127
      if result and result[0][0] == self._fd:
128
        break
129

    
130

    
131
class _BaseCondition(object):
132
  """Base class containing common code for conditions.
133

134
  Some of this code is taken from python's threading module.
135

136
  """
137
  __slots__ = [
138
    "_lock",
139
    "acquire",
140
    "release",
141
    "_is_owned",
142
    "_acquire_restore",
143
    "_release_save",
144
    ]
145

    
146
  def __init__(self, lock):
147
    """Constructor for _BaseCondition.
148

149
    @type lock: threading.Lock
150
    @param lock: condition base lock
151

152
    """
153
    object.__init__(self)
154

    
155
    try:
156
      self._release_save = lock._release_save
157
    except AttributeError:
158
      self._release_save = self._base_release_save
159
    try:
160
      self._acquire_restore = lock._acquire_restore
161
    except AttributeError:
162
      self._acquire_restore = self._base_acquire_restore
163
    try:
164
      self._is_owned = lock._is_owned
165
    except AttributeError:
166
      self._is_owned = self._base_is_owned
167

    
168
    self._lock = lock
169

    
170
    # Export the lock's acquire() and release() methods
171
    self.acquire = lock.acquire
172
    self.release = lock.release
173

    
174
  def _base_is_owned(self):
175
    """Check whether lock is owned by current thread.
176

177
    """
178
    if self._lock.acquire(0):
179
      self._lock.release()
180
      return False
181
    return True
182

    
183
  def _base_release_save(self):
184
    self._lock.release()
185

    
186
  def _base_acquire_restore(self, _):
187
    self._lock.acquire()
188

    
189
  def _check_owned(self):
190
    """Raise an exception if the current thread doesn't own the lock.
191

192
    """
193
    if not self._is_owned():
194
      raise RuntimeError("cannot work with un-aquired lock")
195

    
196

    
197
class SingleNotifyPipeCondition(_BaseCondition):
198
  """Condition which can only be notified once.
199

200
  This condition class uses pipes and poll, internally, to be able to wait for
201
  notification with a timeout, without resorting to polling. It is almost
202
  compatible with Python's threading.Condition, with the following differences:
203
    - notifyAll can only be called once, and no wait can happen after that
204
    - notify is not supported, only notifyAll
205

206
  """
207

    
208
  __slots__ = [
209
    "_poller",
210
    "_read_fd",
211
    "_write_fd",
212
    "_nwaiters",
213
    "_notified",
214
    ]
215

    
216
  _waiter_class = _SingleNotifyPipeConditionWaiter
217

    
218
  def __init__(self, lock):
219
    """Constructor for SingleNotifyPipeCondition
220

221
    """
222
    _BaseCondition.__init__(self, lock)
223
    self._nwaiters = 0
224
    self._notified = False
225
    self._read_fd = None
226
    self._write_fd = None
227
    self._poller = None
228

    
229
  def _check_unnotified(self):
230
    """Throws an exception if already notified.
231

232
    """
233
    if self._notified:
234
      raise RuntimeError("cannot use already notified condition")
235

    
236
  def _Cleanup(self):
237
    """Cleanup open file descriptors, if any.
238

239
    """
240
    if self._read_fd is not None:
241
      os.close(self._read_fd)
242
      self._read_fd = None
243

    
244
    if self._write_fd is not None:
245
      os.close(self._write_fd)
246
      self._write_fd = None
247
    self._poller = None
248

    
249
  def wait(self, timeout):
250
    """Wait for a notification.
251

252
    @type timeout: float or None
253
    @param timeout: Waiting timeout (can be None)
254

255
    """
256
    self._check_owned()
257
    self._check_unnotified()
258

    
259
    self._nwaiters += 1
260
    try:
261
      if self._poller is None:
262
        (self._read_fd, self._write_fd) = os.pipe()
263
        self._poller = select.poll()
264
        self._poller.register(self._read_fd, select.POLLHUP)
265

    
266
      wait_fn = self._waiter_class(self._poller, self._read_fd)
267
      state = self._release_save()
268
      try:
269
        # Wait for notification
270
        wait_fn(timeout)
271
      finally:
272
        # Re-acquire lock
273
        self._acquire_restore(state)
274
    finally:
275
      self._nwaiters -= 1
276
      if self._nwaiters == 0:
277
        self._Cleanup()
278

    
279
  def notifyAll(self): # pylint: disable=C0103
280
    """Close the writing side of the pipe to notify all waiters.
281

282
    """
283
    self._check_owned()
284
    self._check_unnotified()
285
    self._notified = True
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289

    
290

    
291
class PipeCondition(_BaseCondition):
292
  """Group-only non-polling condition with counters.
293

294
  This condition class uses pipes and poll, internally, to be able to wait for
295
  notification with a timeout, without resorting to polling. It is almost
296
  compatible with Python's threading.Condition, but only supports notifyAll and
297
  non-recursive locks. As an additional features it's able to report whether
298
  there are any waiting threads.
299

300
  """
301
  __slots__ = [
302
    "_waiters",
303
    "_single_condition",
304
    ]
305

    
306
  _single_condition_class = SingleNotifyPipeCondition
307

    
308
  def __init__(self, lock):
309
    """Initializes this class.
310

311
    """
312
    _BaseCondition.__init__(self, lock)
313
    self._waiters = set()
314
    self._single_condition = self._single_condition_class(self._lock)
315

    
316
  def wait(self, timeout):
317
    """Wait for a notification.
318

319
    @type timeout: float or None
320
    @param timeout: Waiting timeout (can be None)
321

322
    """
323
    self._check_owned()
324

    
325
    # Keep local reference to the pipe. It could be replaced by another thread
326
    # notifying while we're waiting.
327
    cond = self._single_condition
328

    
329
    self._waiters.add(threading.currentThread())
330
    try:
331
      cond.wait(timeout)
332
    finally:
333
      self._check_owned()
334
      self._waiters.remove(threading.currentThread())
335

    
336
  def notifyAll(self): # pylint: disable=C0103
337
    """Notify all currently waiting threads.
338

339
    """
340
    self._check_owned()
341
    self._single_condition.notifyAll()
342
    self._single_condition = self._single_condition_class(self._lock)
343

    
344
  def get_waiting(self):
345
    """Returns a list of all waiting threads.
346

347
    """
348
    self._check_owned()
349

    
350
    return self._waiters
351

    
352
  def has_waiting(self):
353
    """Returns whether there are active waiters.
354

355
    """
356
    self._check_owned()
357

    
358
    return bool(self._waiters)
359

    
360
  def __repr__(self):
361
    return ("<%s.%s waiters=%s at %#x>" %
362
            (self.__class__.__module__, self.__class__.__name__,
363
             self._waiters, id(self)))
364

    
365

    
366
class _PipeConditionWithMode(PipeCondition):
367
  __slots__ = [
368
    "shared",
369
    ]
370

    
371
  def __init__(self, lock, shared):
372
    """Initializes this class.
373

374
    """
375
    self.shared = shared
376
    PipeCondition.__init__(self, lock)
377

    
378

    
379
class SharedLock(object):
380
  """Implements a shared lock.
381

382
  Multiple threads can acquire the lock in a shared way by calling
383
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
384
  threads can call C{acquire(shared=0)}.
385

386
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
387
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
388
  ...]}. Each per-priority queue contains a normal in-order list of conditions
389
  to be notified when the lock can be acquired. Shared locks are grouped
390
  together by priority and the condition for them is stored in
391
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
392
  references for the per-priority queues indexed by priority for faster access.
393

394
  @type name: string
395
  @ivar name: the name of the lock
396

397
  """
398
  __slots__ = [
399
    "__weakref__",
400
    "__deleted",
401
    "__exc",
402
    "__lock",
403
    "__pending",
404
    "__pending_by_prio",
405
    "__pending_shared",
406
    "__shr",
407
    "name",
408
    ]
409

    
410
  __condition_class = _PipeConditionWithMode
411

    
412
  def __init__(self, name, monitor=None):
413
    """Construct a new SharedLock.
414

415
    @param name: the name of the lock
416
    @type monitor: L{LockMonitor}
417
    @param monitor: Lock monitor with which to register
418

419
    """
420
    object.__init__(self)
421

    
422
    self.name = name
423

    
424
    # Internal lock
425
    self.__lock = threading.Lock()
426

    
427
    # Queue containing waiting acquires
428
    self.__pending = []
429
    self.__pending_by_prio = {}
430
    self.__pending_shared = {}
431

    
432
    # Current lock holders
433
    self.__shr = set()
434
    self.__exc = None
435

    
436
    # is this lock in the deleted state?
437
    self.__deleted = False
438

    
439
    # Register with lock monitor
440
    if monitor:
441
      logging.debug("Adding lock %s to monitor", name)
442
      monitor.RegisterLock(self)
443

    
444
  def __repr__(self):
445
    return ("<%s.%s name=%s at %#x>" %
446
            (self.__class__.__module__, self.__class__.__name__,
447
             self.name, id(self)))
448

    
449
  def GetLockInfo(self, requested):
450
    """Retrieves information for querying locks.
451

452
    @type requested: set
453
    @param requested: Requested information, see C{query.LQ_*}
454

455
    """
456
    self.__lock.acquire()
457
    try:
458
      # Note: to avoid unintentional race conditions, no references to
459
      # modifiable objects should be returned unless they were created in this
460
      # function.
461
      mode = None
462
      owner_names = None
463

    
464
      if query.LQ_MODE in requested:
465
        if self.__deleted:
466
          mode = _DELETED_TEXT
467
          assert not (self.__exc or self.__shr)
468
        elif self.__exc:
469
          mode = _EXCLUSIVE_TEXT
470
        elif self.__shr:
471
          mode = _SHARED_TEXT
472

    
473
      # Current owner(s) are wanted
474
      if query.LQ_OWNER in requested:
475
        if self.__exc:
476
          owner = [self.__exc]
477
        else:
478
          owner = self.__shr
479

    
480
        if owner:
481
          assert not self.__deleted
482
          owner_names = [i.getName() for i in owner]
483

    
484
      # Pending acquires are wanted
485
      if query.LQ_PENDING in requested:
486
        pending = []
487

    
488
        # Sorting instead of copying and using heaq functions for simplicity
489
        for (_, prioqueue) in sorted(self.__pending):
490
          for cond in prioqueue:
491
            if cond.shared:
492
              pendmode = _SHARED_TEXT
493
            else:
494
              pendmode = _EXCLUSIVE_TEXT
495

    
496
            # List of names will be sorted in L{query._GetLockPending}
497
            pending.append((pendmode, [i.getName()
498
                                       for i in cond.get_waiting()]))
499
      else:
500
        pending = None
501

    
502
      return [(self.name, mode, owner_names, pending)]
503
    finally:
504
      self.__lock.release()
505

    
506
  def __check_deleted(self):
507
    """Raises an exception if the lock has been deleted.
508

509
    """
510
    if self.__deleted:
511
      raise errors.LockError("Deleted lock %s" % self.name)
512

    
513
  def __is_sharer(self):
514
    """Is the current thread sharing the lock at this time?
515

516
    """
517
    return threading.currentThread() in self.__shr
518

    
519
  def __is_exclusive(self):
520
    """Is the current thread holding the lock exclusively at this time?
521

522
    """
523
    return threading.currentThread() == self.__exc
524

    
525
  def __is_owned(self, shared=-1):
526
    """Is the current thread somehow owning the lock at this time?
527

528
    This is a private version of the function, which presumes you're holding
529
    the internal lock.
530

531
    """
532
    if shared < 0:
533
      return self.__is_sharer() or self.__is_exclusive()
534
    elif shared:
535
      return self.__is_sharer()
536
    else:
537
      return self.__is_exclusive()
538

    
539
  def _is_owned(self, shared=-1):
540
    """Is the current thread somehow owning the lock at this time?
541

542
    @param shared:
543
        - < 0: check for any type of ownership (default)
544
        - 0: check for exclusive ownership
545
        - > 0: check for shared ownership
546

547
    """
548
    self.__lock.acquire()
549
    try:
550
      return self.__is_owned(shared=shared)
551
    finally:
552
      self.__lock.release()
553

    
554
  is_owned = _is_owned
555

    
556
  def _count_pending(self):
557
    """Returns the number of pending acquires.
558

559
    @rtype: int
560

561
    """
562
    self.__lock.acquire()
563
    try:
564
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
565
    finally:
566
      self.__lock.release()
567

    
568
  def _check_empty(self):
569
    """Checks whether there are any pending acquires.
570

571
    @rtype: bool
572

573
    """
574
    self.__lock.acquire()
575
    try:
576
      # Order is important: __find_first_pending_queue modifies __pending
577
      (_, prioqueue) = self.__find_first_pending_queue()
578

    
579
      return not (prioqueue or
580
                  self.__pending or
581
                  self.__pending_by_prio or
582
                  self.__pending_shared)
583
    finally:
584
      self.__lock.release()
585

    
586
  def __do_acquire(self, shared):
587
    """Actually acquire the lock.
588

589
    """
590
    if shared:
591
      self.__shr.add(threading.currentThread())
592
    else:
593
      self.__exc = threading.currentThread()
594

    
595
  def __can_acquire(self, shared):
596
    """Determine whether lock can be acquired.
597

598
    """
599
    if shared:
600
      return self.__exc is None
601
    else:
602
      return len(self.__shr) == 0 and self.__exc is None
603

    
604
  def __find_first_pending_queue(self):
605
    """Tries to find the topmost queued entry with pending acquires.
606

607
    Removes empty entries while going through the list.
608

609
    """
610
    while self.__pending:
611
      (priority, prioqueue) = self.__pending[0]
612

    
613
      if prioqueue:
614
        return (priority, prioqueue)
615

    
616
      # Remove empty queue
617
      heapq.heappop(self.__pending)
618
      del self.__pending_by_prio[priority]
619
      assert priority not in self.__pending_shared
620

    
621
    return (None, None)
622

    
623
  def __is_on_top(self, cond):
624
    """Checks whether the passed condition is on top of the queue.
625

626
    The caller must make sure the queue isn't empty.
627

628
    """
629
    (_, prioqueue) = self.__find_first_pending_queue()
630

    
631
    return cond == prioqueue[0]
632

    
633
  def __acquire_unlocked(self, shared, timeout, priority):
634
    """Acquire a shared lock.
635

636
    @param shared: whether to acquire in shared mode; by default an
637
        exclusive lock will be acquired
638
    @param timeout: maximum waiting time before giving up
639
    @type priority: integer
640
    @param priority: Priority for acquiring lock
641

642
    """
643
    self.__check_deleted()
644

    
645
    # We cannot acquire the lock if we already have it
646
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
647
                                   " %s" % self.name)
648

    
649
    # Remove empty entries from queue
650
    self.__find_first_pending_queue()
651

    
652
    # Check whether someone else holds the lock or there are pending acquires.
653
    if not self.__pending and self.__can_acquire(shared):
654
      # Apparently not, can acquire lock directly.
655
      self.__do_acquire(shared)
656
      return True
657

    
658
    prioqueue = self.__pending_by_prio.get(priority, None)
659

    
660
    if shared:
661
      # Try to re-use condition for shared acquire
662
      wait_condition = self.__pending_shared.get(priority, None)
663
      assert (wait_condition is None or
664
              (wait_condition.shared and wait_condition in prioqueue))
665
    else:
666
      wait_condition = None
667

    
668
    if wait_condition is None:
669
      if prioqueue is None:
670
        assert priority not in self.__pending_by_prio
671

    
672
        prioqueue = []
673
        heapq.heappush(self.__pending, (priority, prioqueue))
674
        self.__pending_by_prio[priority] = prioqueue
675

    
676
      wait_condition = self.__condition_class(self.__lock, shared)
677
      prioqueue.append(wait_condition)
678

    
679
      if shared:
680
        # Keep reference for further shared acquires on same priority. This is
681
        # better than trying to find it in the list of pending acquires.
682
        assert priority not in self.__pending_shared
683
        self.__pending_shared[priority] = wait_condition
684

    
685
    try:
686
      # Wait until we become the topmost acquire in the queue or the timeout
687
      # expires.
688
      # TODO: Decrease timeout with spurious notifications
689
      while not (self.__is_on_top(wait_condition) and
690
                 self.__can_acquire(shared)):
691
        # Wait for notification
692
        wait_condition.wait(timeout)
693
        self.__check_deleted()
694

    
695
        # A lot of code assumes blocking acquires always succeed. Loop
696
        # internally for that case.
697
        if timeout is not None:
698
          break
699

    
700
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
701
        self.__do_acquire(shared)
702
        return True
703
    finally:
704
      # Remove condition from queue if there are no more waiters
705
      if not wait_condition.has_waiting():
706
        prioqueue.remove(wait_condition)
707
        if wait_condition.shared:
708
          # Remove from list of shared acquires if it wasn't while releasing
709
          # (e.g. on lock deletion)
710
          self.__pending_shared.pop(priority, None)
711

    
712
    return False
713

    
714
  def acquire(self, shared=0, timeout=None, priority=None,
715
              test_notify=None):
716
    """Acquire a shared lock.
717

718
    @type shared: integer (0/1) used as a boolean
719
    @param shared: whether to acquire in shared mode; by default an
720
        exclusive lock will be acquired
721
    @type timeout: float
722
    @param timeout: maximum waiting time before giving up
723
    @type priority: integer
724
    @param priority: Priority for acquiring lock
725
    @type test_notify: callable or None
726
    @param test_notify: Special callback function for unittesting
727

728
    """
729
    if priority is None:
730
      priority = _DEFAULT_PRIORITY
731

    
732
    self.__lock.acquire()
733
    try:
734
      # We already got the lock, notify now
735
      if __debug__ and callable(test_notify):
736
        test_notify()
737

    
738
      return self.__acquire_unlocked(shared, timeout, priority)
739
    finally:
740
      self.__lock.release()
741

    
742
  def downgrade(self):
743
    """Changes the lock mode from exclusive to shared.
744

745
    Pending acquires in shared mode on the same priority will go ahead.
746

747
    """
748
    self.__lock.acquire()
749
    try:
750
      assert self.__is_owned(), "Lock must be owned"
751

    
752
      if self.__is_exclusive():
753
        # Do nothing if the lock is already acquired in shared mode
754
        self.__exc = None
755
        self.__do_acquire(1)
756

    
757
        # Important: pending shared acquires should only jump ahead if there
758
        # was a transition from exclusive to shared, otherwise an owner of a
759
        # shared lock can keep calling this function to push incoming shared
760
        # acquires
761
        (priority, prioqueue) = self.__find_first_pending_queue()
762
        if prioqueue:
763
          # Is there a pending shared acquire on this priority?
764
          cond = self.__pending_shared.pop(priority, None)
765
          if cond:
766
            assert cond.shared
767
            assert cond in prioqueue
768

    
769
            # Ensure shared acquire is on top of queue
770
            if len(prioqueue) > 1:
771
              prioqueue.remove(cond)
772
              prioqueue.insert(0, cond)
773

    
774
            # Notify
775
            cond.notifyAll()
776

    
777
      assert not self.__is_exclusive()
778
      assert self.__is_sharer()
779

    
780
      return True
781
    finally:
782
      self.__lock.release()
783

    
784
  def release(self):
785
    """Release a Shared Lock.
786

787
    You must have acquired the lock, either in shared or in exclusive mode,
788
    before calling this function.
789

790
    """
791
    self.__lock.acquire()
792
    try:
793
      assert self.__is_exclusive() or self.__is_sharer(), \
794
        "Cannot release non-owned lock"
795

    
796
      # Autodetect release type
797
      if self.__is_exclusive():
798
        self.__exc = None
799
      else:
800
        self.__shr.remove(threading.currentThread())
801

    
802
      # Notify topmost condition in queue
803
      (priority, prioqueue) = self.__find_first_pending_queue()
804
      if prioqueue:
805
        cond = prioqueue[0]
806
        cond.notifyAll()
807
        if cond.shared:
808
          # Prevent further shared acquires from sneaking in while waiters are
809
          # notified
810
          self.__pending_shared.pop(priority, None)
811

    
812
    finally:
813
      self.__lock.release()
814

    
815
  def delete(self, timeout=None, priority=None):
816
    """Delete a Shared Lock.
817

818
    This operation will declare the lock for removal. First the lock will be
819
    acquired in exclusive mode if you don't already own it, then the lock
820
    will be put in a state where any future and pending acquire() fail.
821

822
    @type timeout: float
823
    @param timeout: maximum waiting time before giving up
824
    @type priority: integer
825
    @param priority: Priority for acquiring lock
826

827
    """
828
    if priority is None:
829
      priority = _DEFAULT_PRIORITY
830

    
831
    self.__lock.acquire()
832
    try:
833
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
834

    
835
      self.__check_deleted()
836

    
837
      # The caller is allowed to hold the lock exclusively already.
838
      acquired = self.__is_exclusive()
839

    
840
      if not acquired:
841
        acquired = self.__acquire_unlocked(0, timeout, priority)
842

    
843
        assert self.__is_exclusive() and not self.__is_sharer(), \
844
          "Lock wasn't acquired in exclusive mode"
845

    
846
      if acquired:
847
        self.__deleted = True
848
        self.__exc = None
849

    
850
        assert not (self.__exc or self.__shr), "Found owner during deletion"
851

    
852
        # Notify all acquires. They'll throw an error.
853
        for (_, prioqueue) in self.__pending:
854
          for cond in prioqueue:
855
            cond.notifyAll()
856

    
857
        assert self.__deleted
858

    
859
      return acquired
860
    finally:
861
      self.__lock.release()
862

    
863
  def _release_save(self):
864
    shared = self.__is_sharer()
865
    self.release()
866
    return shared
867

    
868
  def _acquire_restore(self, shared):
869
    self.acquire(shared=shared)
870

    
871

    
872
# Whenever we want to acquire a full LockSet we pass None as the value
873
# to acquire.  Hide this behind this nicely named constant.
874
ALL_SET = None
875

    
876

    
877
class _AcquireTimeout(Exception):
878
  """Internal exception to abort an acquire on a timeout.
879

880
  """
881

    
882

    
883
class LockSet:
884
  """Implements a set of locks.
885

886
  This abstraction implements a set of shared locks for the same resource type,
887
  distinguished by name. The user can lock a subset of the resources and the
888
  LockSet will take care of acquiring the locks always in the same order, thus
889
  preventing deadlock.
890

891
  All the locks needed in the same set must be acquired together, though.
892

893
  @type name: string
894
  @ivar name: the name of the lockset
895

896
  """
897
  def __init__(self, members, name, monitor=None):
898
    """Constructs a new LockSet.
899

900
    @type members: list of strings
901
    @param members: initial members of the set
902
    @type monitor: L{LockMonitor}
903
    @param monitor: Lock monitor with which to register member locks
904

905
    """
906
    assert members is not None, "members parameter is not a list"
907
    self.name = name
908

    
909
    # Lock monitor
910
    self.__monitor = monitor
911

    
912
    # Used internally to guarantee coherency
913
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
914

    
915
    # The lockdict indexes the relationship name -> lock
916
    # The order-of-locking is implied by the alphabetical order of names
917
    self.__lockdict = {}
918

    
919
    for mname in members:
920
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
921
                                          monitor=monitor)
922

    
923
    # The owner dict contains the set of locks each thread owns. For
924
    # performance each thread can access its own key without a global lock on
925
    # this structure. It is paramount though that *no* other type of access is
926
    # done to this structure (eg. no looping over its keys). *_owner helper
927
    # function are defined to guarantee access is correct, but in general never
928
    # do anything different than __owners[threading.currentThread()], or there
929
    # will be trouble.
930
    self.__owners = {}
931

    
932
  def _GetLockName(self, mname):
933
    """Returns the name for a member lock.
934

935
    """
936
    return "%s/%s" % (self.name, mname)
937

    
938
  def _get_lock(self):
939
    """Returns the lockset-internal lock.
940

941
    """
942
    return self.__lock
943

    
944
  def _get_lockdict(self):
945
    """Returns the lockset-internal lock dictionary.
946

947
    Accessing this structure is only safe in single-thread usage or when the
948
    lockset-internal lock is held.
949

950
    """
951
    return self.__lockdict
952

    
953
  def _is_owned(self):
954
    """Is the current thread a current level owner?"""
955
    return threading.currentThread() in self.__owners
956

    
957
  def _add_owned(self, name=None):
958
    """Note the current thread owns the given lock"""
959
    if name is None:
960
      if not self._is_owned():
961
        self.__owners[threading.currentThread()] = set()
962
    else:
963
      if self._is_owned():
964
        self.__owners[threading.currentThread()].add(name)
965
      else:
966
        self.__owners[threading.currentThread()] = set([name])
967

    
968
  def _del_owned(self, name=None):
969
    """Note the current thread owns the given lock"""
970

    
971
    assert not (name is None and self.__lock._is_owned()), \
972
           "Cannot hold internal lock when deleting owner status"
973

    
974
    if name is not None:
975
      self.__owners[threading.currentThread()].remove(name)
976

    
977
    # Only remove the key if we don't hold the set-lock as well
978
    if (not self.__lock._is_owned() and
979
        not self.__owners[threading.currentThread()]):
980
      del self.__owners[threading.currentThread()]
981

    
982
  def _list_owned(self):
983
    """Get the set of resource names owned by the current thread"""
984
    if self._is_owned():
985
      return self.__owners[threading.currentThread()].copy()
986
    else:
987
      return set()
988

    
989
  def _release_and_delete_owned(self):
990
    """Release and delete all resources owned by the current thread"""
991
    for lname in self._list_owned():
992
      lock = self.__lockdict[lname]
993
      if lock._is_owned():
994
        lock.release()
995
      self._del_owned(name=lname)
996

    
997
  def __names(self):
998
    """Return the current set of names.
999

1000
    Only call this function while holding __lock and don't iterate on the
1001
    result after releasing the lock.
1002

1003
    """
1004
    return self.__lockdict.keys()
1005

    
1006
  def _names(self):
1007
    """Return a copy of the current set of elements.
1008

1009
    Used only for debugging purposes.
1010

1011
    """
1012
    # If we don't already own the set-level lock acquired
1013
    # we'll get it and note we need to release it later.
1014
    release_lock = False
1015
    if not self.__lock._is_owned():
1016
      release_lock = True
1017
      self.__lock.acquire(shared=1)
1018
    try:
1019
      result = self.__names()
1020
    finally:
1021
      if release_lock:
1022
        self.__lock.release()
1023
    return set(result)
1024

    
1025
  def acquire(self, names, timeout=None, shared=0, priority=None,
1026
              test_notify=None):
1027
    """Acquire a set of resource locks.
1028

1029
    @type names: list of strings (or string)
1030
    @param names: the names of the locks which shall be acquired
1031
        (special lock names, or instance/node names)
1032
    @type shared: integer (0/1) used as a boolean
1033
    @param shared: whether to acquire in shared mode; by default an
1034
        exclusive lock will be acquired
1035
    @type timeout: float or None
1036
    @param timeout: Maximum time to acquire all locks
1037
    @type priority: integer
1038
    @param priority: Priority for acquiring locks
1039
    @type test_notify: callable or None
1040
    @param test_notify: Special callback function for unittesting
1041

1042
    @return: Set of all locks successfully acquired or None in case of timeout
1043

1044
    @raise errors.LockError: when any lock we try to acquire has
1045
        been deleted before we succeed. In this case none of the
1046
        locks requested will be acquired.
1047

1048
    """
1049
    assert timeout is None or timeout >= 0.0
1050

    
1051
    # Check we don't already own locks at this level
1052
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
1053
                                  " (lockset %s)" % self.name)
1054

    
1055
    if priority is None:
1056
      priority = _DEFAULT_PRIORITY
1057

    
1058
    # We need to keep track of how long we spent waiting for a lock. The
1059
    # timeout passed to this function is over all lock acquires.
1060
    running_timeout = utils.RunningTimeout(timeout, False)
1061

    
1062
    try:
1063
      if names is not None:
1064
        # Support passing in a single resource to acquire rather than many
1065
        if isinstance(names, basestring):
1066
          names = [names]
1067

    
1068
        return self.__acquire_inner(names, False, shared, priority,
1069
                                    running_timeout.Remaining, test_notify)
1070

    
1071
      else:
1072
        # If no names are given acquire the whole set by not letting new names
1073
        # being added before we release, and getting the current list of names.
1074
        # Some of them may then be deleted later, but we'll cope with this.
1075
        #
1076
        # We'd like to acquire this lock in a shared way, as it's nice if
1077
        # everybody else can use the instances at the same time. If we are
1078
        # acquiring them exclusively though they won't be able to do this
1079
        # anyway, though, so we'll get the list lock exclusively as well in
1080
        # order to be able to do add() on the set while owning it.
1081
        if not self.__lock.acquire(shared=shared, priority=priority,
1082
                                   timeout=running_timeout.Remaining()):
1083
          raise _AcquireTimeout()
1084
        try:
1085
          # note we own the set-lock
1086
          self._add_owned()
1087

    
1088
          return self.__acquire_inner(self.__names(), True, shared, priority,
1089
                                      running_timeout.Remaining, test_notify)
1090
        except:
1091
          # We shouldn't have problems adding the lock to the owners list, but
1092
          # if we did we'll try to release this lock and re-raise exception.
1093
          # Of course something is going to be really wrong, after this.
1094
          self.__lock.release()
1095
          self._del_owned()
1096
          raise
1097

    
1098
    except _AcquireTimeout:
1099
      return None
1100

    
1101
  def __acquire_inner(self, names, want_all, shared, priority,
1102
                      timeout_fn, test_notify):
1103
    """Inner logic for acquiring a number of locks.
1104

1105
    @param names: Names of the locks to be acquired
1106
    @param want_all: Whether all locks in the set should be acquired
1107
    @param shared: Whether to acquire in shared mode
1108
    @param timeout_fn: Function returning remaining timeout
1109
    @param priority: Priority for acquiring locks
1110
    @param test_notify: Special callback function for unittesting
1111

1112
    """
1113
    acquire_list = []
1114

    
1115
    # First we look the locks up on __lockdict. We have no way of being sure
1116
    # they will still be there after, but this makes it a lot faster should
1117
    # just one of them be the already wrong. Using a sorted sequence to prevent
1118
    # deadlocks.
1119
    for lname in sorted(utils.UniqueSequence(names)):
1120
      try:
1121
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1122
      except KeyError:
1123
        if want_all:
1124
          # We are acquiring all the set, it doesn't matter if this particular
1125
          # element is not there anymore.
1126
          continue
1127

    
1128
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1129
                               " been removed)" % (lname, self.name))
1130

    
1131
      acquire_list.append((lname, lock))
1132

    
1133
    # This will hold the locknames we effectively acquired.
1134
    acquired = set()
1135

    
1136
    try:
1137
      # Now acquire_list contains a sorted list of resources and locks we
1138
      # want.  In order to get them we loop on this (private) list and
1139
      # acquire() them.  We gave no real guarantee they will still exist till
1140
      # this is done but .acquire() itself is safe and will alert us if the
1141
      # lock gets deleted.
1142
      for (lname, lock) in acquire_list:
1143
        if __debug__ and callable(test_notify):
1144
          test_notify_fn = lambda: test_notify(lname)
1145
        else:
1146
          test_notify_fn = None
1147

    
1148
        timeout = timeout_fn()
1149

    
1150
        try:
1151
          # raises LockError if the lock was deleted
1152
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1153
                                     priority=priority,
1154
                                     test_notify=test_notify_fn)
1155
        except errors.LockError:
1156
          if want_all:
1157
            # We are acquiring all the set, it doesn't matter if this
1158
            # particular element is not there anymore.
1159
            continue
1160

    
1161
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1162
                                 " have been removed)" % (lname, self.name))
1163

    
1164
        if not acq_success:
1165
          # Couldn't get lock or timeout occurred
1166
          if timeout is None:
1167
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1168
            # blocking.
1169
            raise errors.LockError("Failed to get lock %s (set %s)" %
1170
                                   (lname, self.name))
1171

    
1172
          raise _AcquireTimeout()
1173

    
1174
        try:
1175
          # now the lock cannot be deleted, we have it!
1176
          self._add_owned(name=lname)
1177
          acquired.add(lname)
1178

    
1179
        except:
1180
          # We shouldn't have problems adding the lock to the owners list, but
1181
          # if we did we'll try to release this lock and re-raise exception.
1182
          # Of course something is going to be really wrong after this.
1183
          if lock._is_owned():
1184
            lock.release()
1185
          raise
1186

    
1187
    except:
1188
      # Release all owned locks
1189
      self._release_and_delete_owned()
1190
      raise
1191

    
1192
    return acquired
1193

    
1194
  def downgrade(self, names=None):
1195
    """Downgrade a set of resource locks from exclusive to shared mode.
1196

1197
    The locks must have been acquired in exclusive mode.
1198

1199
    """
1200
    assert self._is_owned(), ("downgrade on lockset %s while not owning any"
1201
                              " lock" % self.name)
1202

    
1203
    # Support passing in a single resource to downgrade rather than many
1204
    if isinstance(names, basestring):
1205
      names = [names]
1206

    
1207
    owned = self._list_owned()
1208

    
1209
    if names is None:
1210
      names = owned
1211
    else:
1212
      names = set(names)
1213
      assert owned.issuperset(names), \
1214
        ("downgrade() on unheld resources %s (set %s)" %
1215
         (names.difference(owned), self.name))
1216

    
1217
    for lockname in names:
1218
      self.__lockdict[lockname].downgrade()
1219

    
1220
    # Do we own the lockset in exclusive mode?
1221
    if self.__lock._is_owned(shared=0):
1222
      # Have all locks been downgraded?
1223
      if not compat.any(lock._is_owned(shared=0)
1224
                        for lock in self.__lockdict.values()):
1225
        self.__lock.downgrade()
1226
        assert self.__lock._is_owned(shared=1)
1227

    
1228
    return True
1229

    
1230
  def release(self, names=None):
1231
    """Release a set of resource locks, at the same level.
1232

1233
    You must have acquired the locks, either in shared or in exclusive mode,
1234
    before releasing them.
1235

1236
    @type names: list of strings, or None
1237
    @param names: the names of the locks which shall be released
1238
        (defaults to all the locks acquired at that level).
1239

1240
    """
1241
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1242
                              self.name)
1243

    
1244
    # Support passing in a single resource to release rather than many
1245
    if isinstance(names, basestring):
1246
      names = [names]
1247

    
1248
    if names is None:
1249
      names = self._list_owned()
1250
    else:
1251
      names = set(names)
1252
      assert self._list_owned().issuperset(names), (
1253
               "release() on unheld resources %s (set %s)" %
1254
               (names.difference(self._list_owned()), self.name))
1255

    
1256
    # First of all let's release the "all elements" lock, if set.
1257
    # After this 'add' can work again
1258
    if self.__lock._is_owned():
1259
      self.__lock.release()
1260
      self._del_owned()
1261

    
1262
    for lockname in names:
1263
      # If we are sure the lock doesn't leave __lockdict without being
1264
      # exclusively held we can do this...
1265
      self.__lockdict[lockname].release()
1266
      self._del_owned(name=lockname)
1267

    
1268
  def add(self, names, acquired=0, shared=0):
1269
    """Add a new set of elements to the set
1270

1271
    @type names: list of strings
1272
    @param names: names of the new elements to add
1273
    @type acquired: integer (0/1) used as a boolean
1274
    @param acquired: pre-acquire the new resource?
1275
    @type shared: integer (0/1) used as a boolean
1276
    @param shared: is the pre-acquisition shared?
1277

1278
    """
1279
    # Check we don't already own locks at this level
1280
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1281
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1282
       self.name)
1283

    
1284
    # Support passing in a single resource to add rather than many
1285
    if isinstance(names, basestring):
1286
      names = [names]
1287

    
1288
    # If we don't already own the set-level lock acquired in an exclusive way
1289
    # we'll get it and note we need to release it later.
1290
    release_lock = False
1291
    if not self.__lock._is_owned():
1292
      release_lock = True
1293
      self.__lock.acquire()
1294

    
1295
    try:
1296
      invalid_names = set(self.__names()).intersection(names)
1297
      if invalid_names:
1298
        # This must be an explicit raise, not an assert, because assert is
1299
        # turned off when using optimization, and this can happen because of
1300
        # concurrency even if the user doesn't want it.
1301
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1302
                               (invalid_names, self.name))
1303

    
1304
      for lockname in names:
1305
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1306

    
1307
        if acquired:
1308
          # No need for priority or timeout here as this lock has just been
1309
          # created
1310
          lock.acquire(shared=shared)
1311
          # now the lock cannot be deleted, we have it!
1312
          try:
1313
            self._add_owned(name=lockname)
1314
          except:
1315
            # We shouldn't have problems adding the lock to the owners list,
1316
            # but if we did we'll try to release this lock and re-raise
1317
            # exception.  Of course something is going to be really wrong,
1318
            # after this.  On the other hand the lock hasn't been added to the
1319
            # __lockdict yet so no other threads should be pending on it. This
1320
            # release is just a safety measure.
1321
            lock.release()
1322
            raise
1323

    
1324
        self.__lockdict[lockname] = lock
1325

    
1326
    finally:
1327
      # Only release __lock if we were not holding it previously.
1328
      if release_lock:
1329
        self.__lock.release()
1330

    
1331
    return True
1332

    
1333
  def remove(self, names):
1334
    """Remove elements from the lock set.
1335

1336
    You can either not hold anything in the lockset or already hold a superset
1337
    of the elements you want to delete, exclusively.
1338

1339
    @type names: list of strings
1340
    @param names: names of the resource to remove.
1341

1342
    @return: a list of locks which we removed; the list is always
1343
        equal to the names list if we were holding all the locks
1344
        exclusively
1345

1346
    """
1347
    # Support passing in a single resource to remove rather than many
1348
    if isinstance(names, basestring):
1349
      names = [names]
1350

    
1351
    # If we own any subset of this lock it must be a superset of what we want
1352
    # to delete. The ownership must also be exclusive, but that will be checked
1353
    # by the lock itself.
1354
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1355
      "remove() on acquired lockset %s while not owning all elements" %
1356
      self.name)
1357

    
1358
    removed = []
1359

    
1360
    for lname in names:
1361
      # Calling delete() acquires the lock exclusively if we don't already own
1362
      # it, and causes all pending and subsequent lock acquires to fail. It's
1363
      # fine to call it out of order because delete() also implies release(),
1364
      # and the assertion above guarantees that if we either already hold
1365
      # everything we want to delete, or we hold none.
1366
      try:
1367
        self.__lockdict[lname].delete()
1368
        removed.append(lname)
1369
      except (KeyError, errors.LockError):
1370
        # This cannot happen if we were already holding it, verify:
1371
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1372
                                      % self.name)
1373
      else:
1374
        # If no LockError was raised we are the ones who deleted the lock.
1375
        # This means we can safely remove it from lockdict, as any further or
1376
        # pending delete() or acquire() will fail (and nobody can have the lock
1377
        # since before our call to delete()).
1378
        #
1379
        # This is done in an else clause because if the exception was thrown
1380
        # it's the job of the one who actually deleted it.
1381
        del self.__lockdict[lname]
1382
        # And let's remove it from our private list if we owned it.
1383
        if self._is_owned():
1384
          self._del_owned(name=lname)
1385

    
1386
    return removed
1387

    
1388

    
1389
# Locking levels, must be acquired in increasing order.
1390
# Current rules are:
1391
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1392
#   acquired before performing any operation, either in shared or in exclusive
1393
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1394
#   avoided.
1395
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1396
#   If you need more than one node, or more than one instance, acquire them at
1397
#   the same time.
1398
LEVEL_CLUSTER = 0
1399
LEVEL_INSTANCE = 1
1400
LEVEL_NODEGROUP = 2
1401
LEVEL_NODE = 3
1402

    
1403
LEVELS = [LEVEL_CLUSTER,
1404
          LEVEL_INSTANCE,
1405
          LEVEL_NODEGROUP,
1406
          LEVEL_NODE]
1407

    
1408
# Lock levels which are modifiable
1409
LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1410

    
1411
LEVEL_NAMES = {
1412
  LEVEL_CLUSTER: "cluster",
1413
  LEVEL_INSTANCE: "instance",
1414
  LEVEL_NODEGROUP: "nodegroup",
1415
  LEVEL_NODE: "node",
1416
  }
1417

    
1418
# Constant for the big ganeti lock
1419
BGL = 'BGL'
1420

    
1421

    
1422
class GanetiLockManager:
1423
  """The Ganeti Locking Library
1424

1425
  The purpose of this small library is to manage locking for ganeti clusters
1426
  in a central place, while at the same time doing dynamic checks against
1427
  possible deadlocks. It will also make it easier to transition to a different
1428
  lock type should we migrate away from python threads.
1429

1430
  """
1431
  _instance = None
1432

    
1433
  def __init__(self, nodes, nodegroups, instances):
1434
    """Constructs a new GanetiLockManager object.
1435

1436
    There should be only a GanetiLockManager object at any time, so this
1437
    function raises an error if this is not the case.
1438

1439
    @param nodes: list of node names
1440
    @param nodegroups: list of nodegroup uuids
1441
    @param instances: list of instance names
1442

1443
    """
1444
    assert self.__class__._instance is None, \
1445
           "double GanetiLockManager instance"
1446

    
1447
    self.__class__._instance = self
1448

    
1449
    self._monitor = LockMonitor()
1450

    
1451
    # The keyring contains all the locks, at their level and in the correct
1452
    # locking order.
1453
    self.__keyring = {
1454
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1455
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1456
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1457
      LEVEL_INSTANCE: LockSet(instances, "instances",
1458
                              monitor=self._monitor),
1459
      }
1460

    
1461
  def AddToLockMonitor(self, provider):
1462
    """Registers a new lock with the monitor.
1463

1464
    See L{LockMonitor.RegisterLock}.
1465

1466
    """
1467
    return self._monitor.RegisterLock(provider)
1468

    
1469
  def QueryLocks(self, fields):
1470
    """Queries information from all locks.
1471

1472
    See L{LockMonitor.QueryLocks}.
1473

1474
    """
1475
    return self._monitor.QueryLocks(fields)
1476

    
1477
  def OldStyleQueryLocks(self, fields):
1478
    """Queries information from all locks, returning old-style data.
1479

1480
    See L{LockMonitor.OldStyleQueryLocks}.
1481

1482
    """
1483
    return self._monitor.OldStyleQueryLocks(fields)
1484

    
1485
  def _names(self, level):
1486
    """List the lock names at the given level.
1487

1488
    This can be used for debugging/testing purposes.
1489

1490
    @param level: the level whose list of locks to get
1491

1492
    """
1493
    assert level in LEVELS, "Invalid locking level %s" % level
1494
    return self.__keyring[level]._names()
1495

    
1496
  def _is_owned(self, level):
1497
    """Check whether we are owning locks at the given level
1498

1499
    """
1500
    return self.__keyring[level]._is_owned()
1501

    
1502
  is_owned = _is_owned
1503

    
1504
  def _list_owned(self, level):
1505
    """Get the set of owned locks at the given level
1506

1507
    """
1508
    return self.__keyring[level]._list_owned()
1509

    
1510
  list_owned = _list_owned
1511

    
1512
  def _upper_owned(self, level):
1513
    """Check that we don't own any lock at a level greater than the given one.
1514

1515
    """
1516
    # This way of checking only works if LEVELS[i] = i, which we check for in
1517
    # the test cases.
1518
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1519

    
1520
  def _BGL_owned(self): # pylint: disable=C0103
1521
    """Check if the current thread owns the BGL.
1522

1523
    Both an exclusive or a shared acquisition work.
1524

1525
    """
1526
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1527

    
1528
  @staticmethod
1529
  def _contains_BGL(level, names): # pylint: disable=C0103
1530
    """Check if the level contains the BGL.
1531

1532
    Check if acting on the given level and set of names will change
1533
    the status of the Big Ganeti Lock.
1534

1535
    """
1536
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1537

    
1538
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1539
    """Acquire a set of resource locks, at the same level.
1540

1541
    @type level: member of locking.LEVELS
1542
    @param level: the level at which the locks shall be acquired
1543
    @type names: list of strings (or string)
1544
    @param names: the names of the locks which shall be acquired
1545
        (special lock names, or instance/node names)
1546
    @type shared: integer (0/1) used as a boolean
1547
    @param shared: whether to acquire in shared mode; by default
1548
        an exclusive lock will be acquired
1549
    @type timeout: float
1550
    @param timeout: Maximum time to acquire all locks
1551
    @type priority: integer
1552
    @param priority: Priority for acquiring lock
1553

1554
    """
1555
    assert level in LEVELS, "Invalid locking level %s" % level
1556

    
1557
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1558
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1559
    # so even if we've migrated we need to at least share the BGL to be
1560
    # compatible with them. Of course if we own the BGL exclusively there's no
1561
    # point in acquiring any other lock, unless perhaps we are half way through
1562
    # the migration of the current opcode.
1563
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1564
            "You must own the Big Ganeti Lock before acquiring any other")
1565

    
1566
    # Check we don't own locks at the same or upper levels.
1567
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1568
           " while owning some at a greater one")
1569

    
1570
    # Acquire the locks in the set.
1571
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1572
                                         priority=priority)
1573

    
1574
  def downgrade(self, level, names=None):
1575
    """Downgrade a set of resource locks from exclusive to shared mode.
1576

1577
    You must have acquired the locks in exclusive mode.
1578

1579
    @type level: member of locking.LEVELS
1580
    @param level: the level at which the locks shall be downgraded
1581
    @type names: list of strings, or None
1582
    @param names: the names of the locks which shall be downgraded
1583
        (defaults to all the locks acquired at the level)
1584

1585
    """
1586
    assert level in LEVELS, "Invalid locking level %s" % level
1587

    
1588
    return self.__keyring[level].downgrade(names=names)
1589

    
1590
  def release(self, level, names=None):
1591
    """Release a set of resource locks, at the same level.
1592

1593
    You must have acquired the locks, either in shared or in exclusive
1594
    mode, before releasing them.
1595

1596
    @type level: member of locking.LEVELS
1597
    @param level: the level at which the locks shall be released
1598
    @type names: list of strings, or None
1599
    @param names: the names of the locks which shall be released
1600
        (defaults to all the locks acquired at that level)
1601

1602
    """
1603
    assert level in LEVELS, "Invalid locking level %s" % level
1604
    assert (not self._contains_BGL(level, names) or
1605
            not self._upper_owned(LEVEL_CLUSTER)), (
1606
            "Cannot release the Big Ganeti Lock while holding something"
1607
            " at upper levels (%r)" %
1608
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1609
                              for i in self.__keyring.keys()]), ))
1610

    
1611
    # Release will complain if we don't own the locks already
1612
    return self.__keyring[level].release(names)
1613

    
1614
  def add(self, level, names, acquired=0, shared=0):
1615
    """Add locks at the specified level.
1616

1617
    @type level: member of locking.LEVELS_MOD
1618
    @param level: the level at which the locks shall be added
1619
    @type names: list of strings
1620
    @param names: names of the locks to acquire
1621
    @type acquired: integer (0/1) used as a boolean
1622
    @param acquired: whether to acquire the newly added locks
1623
    @type shared: integer (0/1) used as a boolean
1624
    @param shared: whether the acquisition will be shared
1625

1626
    """
1627
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1628
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1629
           " operations")
1630
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1631
           " while owning some at a greater one")
1632
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1633

    
1634
  def remove(self, level, names):
1635
    """Remove locks from the specified level.
1636

1637
    You must either already own the locks you are trying to remove
1638
    exclusively or not own any lock at an upper level.
1639

1640
    @type level: member of locking.LEVELS_MOD
1641
    @param level: the level at which the locks shall be removed
1642
    @type names: list of strings
1643
    @param names: the names of the locks which shall be removed
1644
        (special lock names, or instance/node names)
1645

1646
    """
1647
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1648
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1649
           " operations")
1650
    # Check we either own the level or don't own anything from here
1651
    # up. LockSet.remove() will check the case in which we don't own
1652
    # all the needed resources, or we have a shared ownership.
1653
    assert self._is_owned(level) or not self._upper_owned(level), (
1654
           "Cannot remove locks at a level while not owning it or"
1655
           " owning some at a greater one")
1656
    return self.__keyring[level].remove(names)
1657

    
1658

    
1659
def _MonitorSortKey((item, idx, num)):
1660
  """Sorting key function.
1661

1662
  Sort by name, registration order and then order of information. This provides
1663
  a stable sort order over different providers, even if they return the same
1664
  name.
1665

1666
  """
1667
  (name, _, _, _) = item
1668

    
1669
  return (utils.NiceSortKey(name), num, idx)
1670

    
1671

    
1672
class LockMonitor(object):
1673
  _LOCK_ATTR = "_lock"
1674

    
1675
  def __init__(self):
1676
    """Initializes this class.
1677

1678
    """
1679
    self._lock = SharedLock("LockMonitor")
1680

    
1681
    # Counter for stable sorting
1682
    self._counter = itertools.count(0)
1683

    
1684
    # Tracked locks. Weak references are used to avoid issues with circular
1685
    # references and deletion.
1686
    self._locks = weakref.WeakKeyDictionary()
1687

    
1688
  @ssynchronized(_LOCK_ATTR)
1689
  def RegisterLock(self, provider):
1690
    """Registers a new lock.
1691

1692
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1693
      a single C{set} containing the requested information items
1694
    @note: It would be nicer to only receive the function generating the
1695
      requested information but, as it turns out, weak references to bound
1696
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1697
      workarounds, but none of the ones I found works properly in combination
1698
      with a standard C{WeakKeyDictionary}
1699

1700
    """
1701
    assert provider not in self._locks, "Duplicate registration"
1702

    
1703
    # There used to be a check for duplicate names here. As it turned out, when
1704
    # a lock is re-created with the same name in a very short timeframe, the
1705
    # previous instance might not yet be removed from the weakref dictionary.
1706
    # By keeping track of the order of incoming registrations, a stable sort
1707
    # ordering can still be guaranteed.
1708

    
1709
    self._locks[provider] = self._counter.next()
1710

    
1711
  def _GetLockInfo(self, requested):
1712
    """Get information from all locks.
1713

1714
    """
1715
    # Must hold lock while getting consistent list of tracked items
1716
    self._lock.acquire(shared=1)
1717
    try:
1718
      items = self._locks.items()
1719
    finally:
1720
      self._lock.release()
1721

    
1722
    return [(info, idx, num)
1723
            for (provider, num) in items
1724
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1725

    
1726
  def _Query(self, fields):
1727
    """Queries information from all locks.
1728

1729
    @type fields: list of strings
1730
    @param fields: List of fields to return
1731

1732
    """
1733
    qobj = query.Query(query.LOCK_FIELDS, fields)
1734

    
1735
    # Get all data with internal lock held and then sort by name and incoming
1736
    # order
1737
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1738
                      key=_MonitorSortKey)
1739

    
1740
    # Extract lock information and build query data
1741
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1742

    
1743
  def QueryLocks(self, fields):
1744
    """Queries information from all locks.
1745

1746
    @type fields: list of strings
1747
    @param fields: List of fields to return
1748

1749
    """
1750
    (qobj, ctx) = self._Query(fields)
1751

    
1752
    # Prepare query response
1753
    return query.GetQueryResponse(qobj, ctx)
1754

    
1755
  def OldStyleQueryLocks(self, fields):
1756
    """Queries information from all locks, returning old-style data.
1757

1758
    @type fields: list of strings
1759
    @param fields: List of fields to return
1760

1761
    """
1762
    (qobj, ctx) = self._Query(fields)
1763

    
1764
    return qobj.OldStyleQuery(ctx)