Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 87ed6b79

History | View | Annotate | Download (60.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36
import time
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50
#: Minimum timeout required to consider scheduling a pending acquisition
51
#: (seconds)
52
_LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
53

    
54
# Internal lock acquisition modes for L{LockSet}
55
(_LS_ACQUIRE_EXACT,
56
 _LS_ACQUIRE_ALL,
57
 _LS_ACQUIRE_OPPORTUNISTIC) = range(1, 4)
58

    
59
_LS_ACQUIRE_MODES = compat.UniqueFrozenset([
60
  _LS_ACQUIRE_EXACT,
61
  _LS_ACQUIRE_ALL,
62
  _LS_ACQUIRE_OPPORTUNISTIC,
63
  ])
64

    
65

    
66
def ssynchronized(mylock, shared=0):
67
  """Shared Synchronization decorator.
68

69
  Calls the function holding the given lock, either in exclusive or shared
70
  mode. It requires the passed lock to be a SharedLock (or support its
71
  semantics).
72

73
  @type mylock: lockable object or string
74
  @param mylock: lock to acquire or class member name of the lock to acquire
75

76
  """
77
  def wrap(fn):
78
    def sync_function(*args, **kwargs):
79
      if isinstance(mylock, basestring):
80
        assert args, "cannot ssynchronize on non-class method: self not found"
81
        # args[0] is "self"
82
        lock = getattr(args[0], mylock)
83
      else:
84
        lock = mylock
85
      lock.acquire(shared=shared)
86
      try:
87
        return fn(*args, **kwargs)
88
      finally:
89
        lock.release()
90
    return sync_function
91
  return wrap
92

    
93

    
94
class _SingleNotifyPipeConditionWaiter(object):
95
  """Helper class for SingleNotifyPipeCondition
96

97
  """
98
  __slots__ = [
99
    "_fd",
100
    ]
101

    
102
  def __init__(self, fd):
103
    """Constructor for _SingleNotifyPipeConditionWaiter
104

105
    @type fd: int
106
    @param fd: File descriptor to wait for
107

108
    """
109
    object.__init__(self)
110
    self._fd = fd
111

    
112
  def __call__(self, timeout):
113
    """Wait for something to happen on the pipe.
114

115
    @type timeout: float or None
116
    @param timeout: Timeout for waiting (can be None)
117

118
    """
119
    running_timeout = utils.RunningTimeout(timeout, True)
120
    poller = select.poll()
121
    poller.register(self._fd, select.POLLHUP)
122

    
123
    while True:
124
      remaining_time = running_timeout.Remaining()
125

    
126
      if remaining_time is not None:
127
        if remaining_time < 0.0:
128
          break
129

    
130
        # Our calculation uses seconds, poll() wants milliseconds
131
        remaining_time *= 1000
132

    
133
      try:
134
        result = poller.poll(remaining_time)
135
      except EnvironmentError, err:
136
        if err.errno != errno.EINTR:
137
          raise
138
        result = None
139

    
140
      # Check whether we were notified
141
      if result and result[0][0] == self._fd:
142
        break
143

    
144

    
145
class _BaseCondition(object):
146
  """Base class containing common code for conditions.
147

148
  Some of this code is taken from python's threading module.
149

150
  """
151
  __slots__ = [
152
    "_lock",
153
    "acquire",
154
    "release",
155
    "_is_owned",
156
    "_acquire_restore",
157
    "_release_save",
158
    ]
159

    
160
  def __init__(self, lock):
161
    """Constructor for _BaseCondition.
162

163
    @type lock: threading.Lock
164
    @param lock: condition base lock
165

166
    """
167
    object.__init__(self)
168

    
169
    try:
170
      self._release_save = lock._release_save
171
    except AttributeError:
172
      self._release_save = self._base_release_save
173
    try:
174
      self._acquire_restore = lock._acquire_restore
175
    except AttributeError:
176
      self._acquire_restore = self._base_acquire_restore
177
    try:
178
      self._is_owned = lock.is_owned
179
    except AttributeError:
180
      self._is_owned = self._base_is_owned
181

    
182
    self._lock = lock
183

    
184
    # Export the lock's acquire() and release() methods
185
    self.acquire = lock.acquire
186
    self.release = lock.release
187

    
188
  def _base_is_owned(self):
189
    """Check whether lock is owned by current thread.
190

191
    """
192
    if self._lock.acquire(0):
193
      self._lock.release()
194
      return False
195
    return True
196

    
197
  def _base_release_save(self):
198
    self._lock.release()
199

    
200
  def _base_acquire_restore(self, _):
201
    self._lock.acquire()
202

    
203
  def _check_owned(self):
204
    """Raise an exception if the current thread doesn't own the lock.
205

206
    """
207
    if not self._is_owned():
208
      raise RuntimeError("cannot work with un-aquired lock")
209

    
210

    
211
class SingleNotifyPipeCondition(_BaseCondition):
212
  """Condition which can only be notified once.
213

214
  This condition class uses pipes and poll, internally, to be able to wait for
215
  notification with a timeout, without resorting to polling. It is almost
216
  compatible with Python's threading.Condition, with the following differences:
217
    - notifyAll can only be called once, and no wait can happen after that
218
    - notify is not supported, only notifyAll
219

220
  """
221

    
222
  __slots__ = [
223
    "_read_fd",
224
    "_write_fd",
225
    "_nwaiters",
226
    "_notified",
227
    ]
228

    
229
  _waiter_class = _SingleNotifyPipeConditionWaiter
230

    
231
  def __init__(self, lock):
232
    """Constructor for SingleNotifyPipeCondition
233

234
    """
235
    _BaseCondition.__init__(self, lock)
236
    self._nwaiters = 0
237
    self._notified = False
238
    self._read_fd = None
239
    self._write_fd = None
240

    
241
  def _check_unnotified(self):
242
    """Throws an exception if already notified.
243

244
    """
245
    if self._notified:
246
      raise RuntimeError("cannot use already notified condition")
247

    
248
  def _Cleanup(self):
249
    """Cleanup open file descriptors, if any.
250

251
    """
252
    if self._read_fd is not None:
253
      os.close(self._read_fd)
254
      self._read_fd = None
255

    
256
    if self._write_fd is not None:
257
      os.close(self._write_fd)
258
      self._write_fd = None
259

    
260
  def wait(self, timeout):
261
    """Wait for a notification.
262

263
    @type timeout: float or None
264
    @param timeout: Waiting timeout (can be None)
265

266
    """
267
    self._check_owned()
268
    self._check_unnotified()
269

    
270
    self._nwaiters += 1
271
    try:
272
      if self._read_fd is None:
273
        (self._read_fd, self._write_fd) = os.pipe()
274

    
275
      wait_fn = self._waiter_class(self._read_fd)
276
      state = self._release_save()
277
      try:
278
        # Wait for notification
279
        wait_fn(timeout)
280
      finally:
281
        # Re-acquire lock
282
        self._acquire_restore(state)
283
    finally:
284
      self._nwaiters -= 1
285
      if self._nwaiters == 0:
286
        self._Cleanup()
287

    
288
  def notifyAll(self): # pylint: disable=C0103
289
    """Close the writing side of the pipe to notify all waiters.
290

291
    """
292
    self._check_owned()
293
    self._check_unnotified()
294
    self._notified = True
295
    if self._write_fd is not None:
296
      os.close(self._write_fd)
297
      self._write_fd = None
298

    
299

    
300
class PipeCondition(_BaseCondition):
301
  """Group-only non-polling condition with counters.
302

303
  This condition class uses pipes and poll, internally, to be able to wait for
304
  notification with a timeout, without resorting to polling. It is almost
305
  compatible with Python's threading.Condition, but only supports notifyAll and
306
  non-recursive locks. As an additional features it's able to report whether
307
  there are any waiting threads.
308

309
  """
310
  __slots__ = [
311
    "_waiters",
312
    "_single_condition",
313
    ]
314

    
315
  _single_condition_class = SingleNotifyPipeCondition
316

    
317
  def __init__(self, lock):
318
    """Initializes this class.
319

320
    """
321
    _BaseCondition.__init__(self, lock)
322
    self._waiters = set()
323
    self._single_condition = self._single_condition_class(self._lock)
324

    
325
  def wait(self, timeout):
326
    """Wait for a notification.
327

328
    @type timeout: float or None
329
    @param timeout: Waiting timeout (can be None)
330

331
    """
332
    self._check_owned()
333

    
334
    # Keep local reference to the pipe. It could be replaced by another thread
335
    # notifying while we're waiting.
336
    cond = self._single_condition
337

    
338
    self._waiters.add(threading.currentThread())
339
    try:
340
      cond.wait(timeout)
341
    finally:
342
      self._check_owned()
343
      self._waiters.remove(threading.currentThread())
344

    
345
  def notifyAll(self): # pylint: disable=C0103
346
    """Notify all currently waiting threads.
347

348
    """
349
    self._check_owned()
350
    self._single_condition.notifyAll()
351
    self._single_condition = self._single_condition_class(self._lock)
352

    
353
  def get_waiting(self):
354
    """Returns a list of all waiting threads.
355

356
    """
357
    self._check_owned()
358

    
359
    return self._waiters
360

    
361
  def has_waiting(self):
362
    """Returns whether there are active waiters.
363

364
    """
365
    self._check_owned()
366

    
367
    return bool(self._waiters)
368

    
369
  def __repr__(self):
370
    return ("<%s.%s waiters=%s at %#x>" %
371
            (self.__class__.__module__, self.__class__.__name__,
372
             self._waiters, id(self)))
373

    
374

    
375
class _PipeConditionWithMode(PipeCondition):
376
  __slots__ = [
377
    "shared",
378
    ]
379

    
380
  def __init__(self, lock, shared):
381
    """Initializes this class.
382

383
    """
384
    self.shared = shared
385
    PipeCondition.__init__(self, lock)
386

    
387

    
388
class SharedLock(object):
389
  """Implements a shared lock.
390

391
  Multiple threads can acquire the lock in a shared way by calling
392
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
393
  threads can call C{acquire(shared=0)}.
394

395
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
396
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
397
  ...]}. Each per-priority queue contains a normal in-order list of conditions
398
  to be notified when the lock can be acquired. Shared locks are grouped
399
  together by priority and the condition for them is stored in
400
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
401
  references for the per-priority queues indexed by priority for faster access.
402

403
  @type name: string
404
  @ivar name: the name of the lock
405

406
  """
407
  __slots__ = [
408
    "__weakref__",
409
    "__deleted",
410
    "__exc",
411
    "__lock",
412
    "__pending",
413
    "__pending_by_prio",
414
    "__pending_shared",
415
    "__shr",
416
    "__time_fn",
417
    "name",
418
    ]
419

    
420
  __condition_class = _PipeConditionWithMode
421

    
422
  def __init__(self, name, monitor=None, _time_fn=time.time):
423
    """Construct a new SharedLock.
424

425
    @param name: the name of the lock
426
    @type monitor: L{LockMonitor}
427
    @param monitor: Lock monitor with which to register
428

429
    """
430
    object.__init__(self)
431

    
432
    self.name = name
433

    
434
    # Used for unittesting
435
    self.__time_fn = _time_fn
436

    
437
    # Internal lock
438
    self.__lock = threading.Lock()
439

    
440
    # Queue containing waiting acquires
441
    self.__pending = []
442
    self.__pending_by_prio = {}
443
    self.__pending_shared = {}
444

    
445
    # Current lock holders
446
    self.__shr = set()
447
    self.__exc = None
448

    
449
    # is this lock in the deleted state?
450
    self.__deleted = False
451

    
452
    # Register with lock monitor
453
    if monitor:
454
      logging.debug("Adding lock %s to monitor", name)
455
      monitor.RegisterLock(self)
456

    
457
  def __repr__(self):
458
    return ("<%s.%s name=%s at %#x>" %
459
            (self.__class__.__module__, self.__class__.__name__,
460
             self.name, id(self)))
461

    
462
  def GetLockInfo(self, requested):
463
    """Retrieves information for querying locks.
464

465
    @type requested: set
466
    @param requested: Requested information, see C{query.LQ_*}
467

468
    """
469
    self.__lock.acquire()
470
    try:
471
      # Note: to avoid unintentional race conditions, no references to
472
      # modifiable objects should be returned unless they were created in this
473
      # function.
474
      mode = None
475
      owner_names = None
476

    
477
      if query.LQ_MODE in requested:
478
        if self.__deleted:
479
          mode = _DELETED_TEXT
480
          assert not (self.__exc or self.__shr)
481
        elif self.__exc:
482
          mode = _EXCLUSIVE_TEXT
483
        elif self.__shr:
484
          mode = _SHARED_TEXT
485

    
486
      # Current owner(s) are wanted
487
      if query.LQ_OWNER in requested:
488
        if self.__exc:
489
          owner = [self.__exc]
490
        else:
491
          owner = self.__shr
492

    
493
        if owner:
494
          assert not self.__deleted
495
          owner_names = [i.getName() for i in owner]
496

    
497
      # Pending acquires are wanted
498
      if query.LQ_PENDING in requested:
499
        pending = []
500

    
501
        # Sorting instead of copying and using heaq functions for simplicity
502
        for (_, prioqueue) in sorted(self.__pending):
503
          for cond in prioqueue:
504
            if cond.shared:
505
              pendmode = _SHARED_TEXT
506
            else:
507
              pendmode = _EXCLUSIVE_TEXT
508

    
509
            # List of names will be sorted in L{query._GetLockPending}
510
            pending.append((pendmode, [i.getName()
511
                                       for i in cond.get_waiting()]))
512
      else:
513
        pending = None
514

    
515
      return [(self.name, mode, owner_names, pending)]
516
    finally:
517
      self.__lock.release()
518

    
519
  def __check_deleted(self):
520
    """Raises an exception if the lock has been deleted.
521

522
    """
523
    if self.__deleted:
524
      raise errors.LockError("Deleted lock %s" % self.name)
525

    
526
  def __is_sharer(self):
527
    """Is the current thread sharing the lock at this time?
528

529
    """
530
    return threading.currentThread() in self.__shr
531

    
532
  def __is_exclusive(self):
533
    """Is the current thread holding the lock exclusively at this time?
534

535
    """
536
    return threading.currentThread() == self.__exc
537

    
538
  def __is_owned(self, shared=-1):
539
    """Is the current thread somehow owning the lock at this time?
540

541
    This is a private version of the function, which presumes you're holding
542
    the internal lock.
543

544
    """
545
    if shared < 0:
546
      return self.__is_sharer() or self.__is_exclusive()
547
    elif shared:
548
      return self.__is_sharer()
549
    else:
550
      return self.__is_exclusive()
551

    
552
  def is_owned(self, shared=-1):
553
    """Is the current thread somehow owning the lock at this time?
554

555
    @param shared:
556
        - < 0: check for any type of ownership (default)
557
        - 0: check for exclusive ownership
558
        - > 0: check for shared ownership
559

560
    """
561
    self.__lock.acquire()
562
    try:
563
      return self.__is_owned(shared=shared)
564
    finally:
565
      self.__lock.release()
566

    
567
  #: Necessary to remain compatible with threading.Condition, which tries to
568
  #: retrieve a locks' "_is_owned" attribute
569
  _is_owned = is_owned
570

    
571
  def _count_pending(self):
572
    """Returns the number of pending acquires.
573

574
    @rtype: int
575

576
    """
577
    self.__lock.acquire()
578
    try:
579
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
580
    finally:
581
      self.__lock.release()
582

    
583
  def _check_empty(self):
584
    """Checks whether there are any pending acquires.
585

586
    @rtype: bool
587

588
    """
589
    self.__lock.acquire()
590
    try:
591
      # Order is important: __find_first_pending_queue modifies __pending
592
      (_, prioqueue) = self.__find_first_pending_queue()
593

    
594
      return not (prioqueue or
595
                  self.__pending or
596
                  self.__pending_by_prio or
597
                  self.__pending_shared)
598
    finally:
599
      self.__lock.release()
600

    
601
  def __do_acquire(self, shared):
602
    """Actually acquire the lock.
603

604
    """
605
    if shared:
606
      self.__shr.add(threading.currentThread())
607
    else:
608
      self.__exc = threading.currentThread()
609

    
610
  def __can_acquire(self, shared):
611
    """Determine whether lock can be acquired.
612

613
    """
614
    if shared:
615
      return self.__exc is None
616
    else:
617
      return len(self.__shr) == 0 and self.__exc is None
618

    
619
  def __find_first_pending_queue(self):
620
    """Tries to find the topmost queued entry with pending acquires.
621

622
    Removes empty entries while going through the list.
623

624
    """
625
    while self.__pending:
626
      (priority, prioqueue) = self.__pending[0]
627

    
628
      if prioqueue:
629
        return (priority, prioqueue)
630

    
631
      # Remove empty queue
632
      heapq.heappop(self.__pending)
633
      del self.__pending_by_prio[priority]
634
      assert priority not in self.__pending_shared
635

    
636
    return (None, None)
637

    
638
  def __is_on_top(self, cond):
639
    """Checks whether the passed condition is on top of the queue.
640

641
    The caller must make sure the queue isn't empty.
642

643
    """
644
    (_, prioqueue) = self.__find_first_pending_queue()
645

    
646
    return cond == prioqueue[0]
647

    
648
  def __acquire_unlocked(self, shared, timeout, priority):
649
    """Acquire a shared lock.
650

651
    @param shared: whether to acquire in shared mode; by default an
652
        exclusive lock will be acquired
653
    @param timeout: maximum waiting time before giving up
654
    @type priority: integer
655
    @param priority: Priority for acquiring lock
656

657
    """
658
    self.__check_deleted()
659

    
660
    # We cannot acquire the lock if we already have it
661
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
662
                                   " %s" % self.name)
663

    
664
    # Remove empty entries from queue
665
    self.__find_first_pending_queue()
666

    
667
    # Check whether someone else holds the lock or there are pending acquires.
668
    if not self.__pending and self.__can_acquire(shared):
669
      # Apparently not, can acquire lock directly.
670
      self.__do_acquire(shared)
671
      return True
672

    
673
    # The lock couldn't be acquired right away, so if a timeout is given and is
674
    # considered too short, return right away as scheduling a pending
675
    # acquisition is quite expensive
676
    if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
677
      return False
678

    
679
    prioqueue = self.__pending_by_prio.get(priority, None)
680

    
681
    if shared:
682
      # Try to re-use condition for shared acquire
683
      wait_condition = self.__pending_shared.get(priority, None)
684
      assert (wait_condition is None or
685
              (wait_condition.shared and wait_condition in prioqueue))
686
    else:
687
      wait_condition = None
688

    
689
    if wait_condition is None:
690
      if prioqueue is None:
691
        assert priority not in self.__pending_by_prio
692

    
693
        prioqueue = []
694
        heapq.heappush(self.__pending, (priority, prioqueue))
695
        self.__pending_by_prio[priority] = prioqueue
696

    
697
      wait_condition = self.__condition_class(self.__lock, shared)
698
      prioqueue.append(wait_condition)
699

    
700
      if shared:
701
        # Keep reference for further shared acquires on same priority. This is
702
        # better than trying to find it in the list of pending acquires.
703
        assert priority not in self.__pending_shared
704
        self.__pending_shared[priority] = wait_condition
705

    
706
    wait_start = self.__time_fn()
707
    acquired = False
708

    
709
    try:
710
      # Wait until we become the topmost acquire in the queue or the timeout
711
      # expires.
712
      while True:
713
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
714
          self.__do_acquire(shared)
715
          acquired = True
716
          break
717

    
718
        # A lot of code assumes blocking acquires always succeed, therefore we
719
        # can never return False for a blocking acquire
720
        if (timeout is not None and
721
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
722
          break
723

    
724
        # Wait for notification
725
        wait_condition.wait(timeout)
726
        self.__check_deleted()
727
    finally:
728
      # Remove condition from queue if there are no more waiters
729
      if not wait_condition.has_waiting():
730
        prioqueue.remove(wait_condition)
731
        if wait_condition.shared:
732
          # Remove from list of shared acquires if it wasn't while releasing
733
          # (e.g. on lock deletion)
734
          self.__pending_shared.pop(priority, None)
735

    
736
    return acquired
737

    
738
  def acquire(self, shared=0, timeout=None, priority=None,
739
              test_notify=None):
740
    """Acquire a shared lock.
741

742
    @type shared: integer (0/1) used as a boolean
743
    @param shared: whether to acquire in shared mode; by default an
744
        exclusive lock will be acquired
745
    @type timeout: float
746
    @param timeout: maximum waiting time before giving up
747
    @type priority: integer
748
    @param priority: Priority for acquiring lock
749
    @type test_notify: callable or None
750
    @param test_notify: Special callback function for unittesting
751

752
    """
753
    if priority is None:
754
      priority = _DEFAULT_PRIORITY
755

    
756
    self.__lock.acquire()
757
    try:
758
      # We already got the lock, notify now
759
      if __debug__ and callable(test_notify):
760
        test_notify()
761

    
762
      return self.__acquire_unlocked(shared, timeout, priority)
763
    finally:
764
      self.__lock.release()
765

    
766
  def downgrade(self):
767
    """Changes the lock mode from exclusive to shared.
768

769
    Pending acquires in shared mode on the same priority will go ahead.
770

771
    """
772
    self.__lock.acquire()
773
    try:
774
      assert self.__is_owned(), "Lock must be owned"
775

    
776
      if self.__is_exclusive():
777
        # Do nothing if the lock is already acquired in shared mode
778
        self.__exc = None
779
        self.__do_acquire(1)
780

    
781
        # Important: pending shared acquires should only jump ahead if there
782
        # was a transition from exclusive to shared, otherwise an owner of a
783
        # shared lock can keep calling this function to push incoming shared
784
        # acquires
785
        (priority, prioqueue) = self.__find_first_pending_queue()
786
        if prioqueue:
787
          # Is there a pending shared acquire on this priority?
788
          cond = self.__pending_shared.pop(priority, None)
789
          if cond:
790
            assert cond.shared
791
            assert cond in prioqueue
792

    
793
            # Ensure shared acquire is on top of queue
794
            if len(prioqueue) > 1:
795
              prioqueue.remove(cond)
796
              prioqueue.insert(0, cond)
797

    
798
            # Notify
799
            cond.notifyAll()
800

    
801
      assert not self.__is_exclusive()
802
      assert self.__is_sharer()
803

    
804
      return True
805
    finally:
806
      self.__lock.release()
807

    
808
  def release(self):
809
    """Release a Shared Lock.
810

811
    You must have acquired the lock, either in shared or in exclusive mode,
812
    before calling this function.
813

814
    """
815
    self.__lock.acquire()
816
    try:
817
      assert self.__is_exclusive() or self.__is_sharer(), \
818
        "Cannot release non-owned lock"
819

    
820
      # Autodetect release type
821
      if self.__is_exclusive():
822
        self.__exc = None
823
        notify = True
824
      else:
825
        self.__shr.remove(threading.currentThread())
826
        notify = not self.__shr
827

    
828
      # Notify topmost condition in queue if there are no owners left (for
829
      # shared locks)
830
      if notify:
831
        self.__notify_topmost()
832
    finally:
833
      self.__lock.release()
834

    
835
  def __notify_topmost(self):
836
    """Notifies topmost condition in queue of pending acquires.
837

838
    """
839
    (priority, prioqueue) = self.__find_first_pending_queue()
840
    if prioqueue:
841
      cond = prioqueue[0]
842
      cond.notifyAll()
843
      if cond.shared:
844
        # Prevent further shared acquires from sneaking in while waiters are
845
        # notified
846
        self.__pending_shared.pop(priority, None)
847

    
848
  def _notify_topmost(self):
849
    """Exported version of L{__notify_topmost}.
850

851
    """
852
    self.__lock.acquire()
853
    try:
854
      return self.__notify_topmost()
855
    finally:
856
      self.__lock.release()
857

    
858
  def delete(self, timeout=None, priority=None):
859
    """Delete a Shared Lock.
860

861
    This operation will declare the lock for removal. First the lock will be
862
    acquired in exclusive mode if you don't already own it, then the lock
863
    will be put in a state where any future and pending acquire() fail.
864

865
    @type timeout: float
866
    @param timeout: maximum waiting time before giving up
867
    @type priority: integer
868
    @param priority: Priority for acquiring lock
869

870
    """
871
    if priority is None:
872
      priority = _DEFAULT_PRIORITY
873

    
874
    self.__lock.acquire()
875
    try:
876
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
877

    
878
      self.__check_deleted()
879

    
880
      # The caller is allowed to hold the lock exclusively already.
881
      acquired = self.__is_exclusive()
882

    
883
      if not acquired:
884
        acquired = self.__acquire_unlocked(0, timeout, priority)
885

    
886
      if acquired:
887
        assert self.__is_exclusive() and not self.__is_sharer(), \
888
          "Lock wasn't acquired in exclusive mode"
889

    
890
        self.__deleted = True
891
        self.__exc = None
892

    
893
        assert not (self.__exc or self.__shr), "Found owner during deletion"
894

    
895
        # Notify all acquires. They'll throw an error.
896
        for (_, prioqueue) in self.__pending:
897
          for cond in prioqueue:
898
            cond.notifyAll()
899

    
900
        assert self.__deleted
901

    
902
      return acquired
903
    finally:
904
      self.__lock.release()
905

    
906
  def _release_save(self):
907
    shared = self.__is_sharer()
908
    self.release()
909
    return shared
910

    
911
  def _acquire_restore(self, shared):
912
    self.acquire(shared=shared)
913

    
914

    
915
# Whenever we want to acquire a full LockSet we pass None as the value
916
# to acquire.  Hide this behind this nicely named constant.
917
ALL_SET = None
918

    
919
LOCKSET_NAME = "[lockset]"
920

    
921

    
922
def _TimeoutZero():
923
  """Returns the number zero.
924

925
  """
926
  return 0
927

    
928

    
929
def _GetLsAcquireModeAndTimeouts(want_all, timeout, opportunistic):
930
  """Determines modes and timeouts for L{LockSet.acquire}.
931

932
  @type want_all: boolean
933
  @param want_all: Whether all locks in set should be acquired
934
  @param timeout: Timeout in seconds or C{None}
935
  @param opportunistic: Whther locks should be acquired opportunistically
936
  @rtype: tuple
937
  @return: Tuple containing mode to be passed to L{LockSet.__acquire_inner}
938
    (one of L{_LS_ACQUIRE_MODES}), a function to calculate timeout for
939
    acquiring the lockset-internal lock (might be C{None}) and a function to
940
    calculate the timeout for acquiring individual locks
941

942
  """
943
  # Short circuit when no running timeout is needed
944
  if opportunistic and not want_all:
945
    assert timeout is None, "Got timeout for an opportunistic acquisition"
946
    return (_LS_ACQUIRE_OPPORTUNISTIC, None, _TimeoutZero)
947

    
948
  # We need to keep track of how long we spent waiting for a lock. The
949
  # timeout passed to this function is over all lock acquisitions.
950
  running_timeout = utils.RunningTimeout(timeout, False)
951

    
952
  if want_all:
953
    mode = _LS_ACQUIRE_ALL
954
    ls_timeout_fn = running_timeout.Remaining
955
  else:
956
    mode = _LS_ACQUIRE_EXACT
957
    ls_timeout_fn = None
958

    
959
  if opportunistic:
960
    mode = _LS_ACQUIRE_OPPORTUNISTIC
961
    timeout_fn = _TimeoutZero
962
  else:
963
    timeout_fn = running_timeout.Remaining
964

    
965
  return (mode, ls_timeout_fn, timeout_fn)
966

    
967

    
968
class _AcquireTimeout(Exception):
969
  """Internal exception to abort an acquire on a timeout.
970

971
  """
972

    
973

    
974
class LockSet:
975
  """Implements a set of locks.
976

977
  This abstraction implements a set of shared locks for the same resource type,
978
  distinguished by name. The user can lock a subset of the resources and the
979
  LockSet will take care of acquiring the locks always in the same order, thus
980
  preventing deadlock.
981

982
  All the locks needed in the same set must be acquired together, though.
983

984
  @type name: string
985
  @ivar name: the name of the lockset
986

987
  """
988
  def __init__(self, members, name, monitor=None):
989
    """Constructs a new LockSet.
990

991
    @type members: list of strings
992
    @param members: initial members of the set
993
    @type monitor: L{LockMonitor}
994
    @param monitor: Lock monitor with which to register member locks
995

996
    """
997
    assert members is not None, "members parameter is not a list"
998
    self.name = name
999

    
1000
    # Lock monitor
1001
    self.__monitor = monitor
1002

    
1003
    # Used internally to guarantee coherency
1004
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
1005

    
1006
    # The lockdict indexes the relationship name -> lock
1007
    # The order-of-locking is implied by the alphabetical order of names
1008
    self.__lockdict = {}
1009

    
1010
    for mname in members:
1011
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
1012
                                          monitor=monitor)
1013

    
1014
    # The owner dict contains the set of locks each thread owns. For
1015
    # performance each thread can access its own key without a global lock on
1016
    # this structure. It is paramount though that *no* other type of access is
1017
    # done to this structure (eg. no looping over its keys). *_owner helper
1018
    # function are defined to guarantee access is correct, but in general never
1019
    # do anything different than __owners[threading.currentThread()], or there
1020
    # will be trouble.
1021
    self.__owners = {}
1022

    
1023
  def _GetLockName(self, mname):
1024
    """Returns the name for a member lock.
1025

1026
    """
1027
    return "%s/%s" % (self.name, mname)
1028

    
1029
  def _get_lock(self):
1030
    """Returns the lockset-internal lock.
1031

1032
    """
1033
    return self.__lock
1034

    
1035
  def _get_lockdict(self):
1036
    """Returns the lockset-internal lock dictionary.
1037

1038
    Accessing this structure is only safe in single-thread usage or when the
1039
    lockset-internal lock is held.
1040

1041
    """
1042
    return self.__lockdict
1043

    
1044
  def is_owned(self):
1045
    """Is the current thread a current level owner?
1046

1047
    @note: Use L{check_owned} to check if a specific lock is held
1048

1049
    """
1050
    return threading.currentThread() in self.__owners
1051

    
1052
  def check_owned(self, names, shared=-1):
1053
    """Check if locks are owned in a specific mode.
1054

1055
    @type names: sequence or string
1056
    @param names: Lock names (or a single lock name)
1057
    @param shared: See L{SharedLock.is_owned}
1058
    @rtype: bool
1059
    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1060
      L{list_owned} to get the names of all owned locks
1061

1062
    """
1063
    if isinstance(names, basestring):
1064
      names = [names]
1065

    
1066
    # Avoid check if no locks are owned anyway
1067
    if names and self.is_owned():
1068
      candidates = []
1069

    
1070
      # Gather references to all locks (in case they're deleted in the meantime)
1071
      for lname in names:
1072
        try:
1073
          lock = self.__lockdict[lname]
1074
        except KeyError:
1075
          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1076
                                 " have been removed)" % (lname, self.name))
1077
        else:
1078
          candidates.append(lock)
1079

    
1080
      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1081
    else:
1082
      return False
1083

    
1084
  def owning_all(self):
1085
    """Checks whether current thread owns internal lock.
1086

1087
    Holding the internal lock is equivalent with holding all locks in the set
1088
    (the opposite does not necessarily hold as it can not be easily
1089
    determined). L{add} and L{remove} require the internal lock.
1090

1091
    @rtype: boolean
1092

1093
    """
1094
    return self.__lock.is_owned()
1095

    
1096
  def _add_owned(self, name=None):
1097
    """Note the current thread owns the given lock"""
1098
    if name is None:
1099
      if not self.is_owned():
1100
        self.__owners[threading.currentThread()] = set()
1101
    else:
1102
      if self.is_owned():
1103
        self.__owners[threading.currentThread()].add(name)
1104
      else:
1105
        self.__owners[threading.currentThread()] = set([name])
1106

    
1107
  def _del_owned(self, name=None):
1108
    """Note the current thread owns the given lock"""
1109

    
1110
    assert not (name is None and self.__lock.is_owned()), \
1111
           "Cannot hold internal lock when deleting owner status"
1112

    
1113
    if name is not None:
1114
      self.__owners[threading.currentThread()].remove(name)
1115

    
1116
    # Only remove the key if we don't hold the set-lock as well
1117
    if not (self.__lock.is_owned() or
1118
            self.__owners[threading.currentThread()]):
1119
      del self.__owners[threading.currentThread()]
1120

    
1121
  def list_owned(self):
1122
    """Get the set of resource names owned by the current thread"""
1123
    if self.is_owned():
1124
      return self.__owners[threading.currentThread()].copy()
1125
    else:
1126
      return set()
1127

    
1128
  def _release_and_delete_owned(self):
1129
    """Release and delete all resources owned by the current thread"""
1130
    for lname in self.list_owned():
1131
      lock = self.__lockdict[lname]
1132
      if lock.is_owned():
1133
        lock.release()
1134
      self._del_owned(name=lname)
1135

    
1136
  def __names(self):
1137
    """Return the current set of names.
1138

1139
    Only call this function while holding __lock and don't iterate on the
1140
    result after releasing the lock.
1141

1142
    """
1143
    return self.__lockdict.keys()
1144

    
1145
  def _names(self):
1146
    """Return a copy of the current set of elements.
1147

1148
    Used only for debugging purposes.
1149

1150
    """
1151
    # If we don't already own the set-level lock acquired
1152
    # we'll get it and note we need to release it later.
1153
    release_lock = False
1154
    if not self.__lock.is_owned():
1155
      release_lock = True
1156
      self.__lock.acquire(shared=1)
1157
    try:
1158
      result = self.__names()
1159
    finally:
1160
      if release_lock:
1161
        self.__lock.release()
1162
    return set(result)
1163

    
1164
  def acquire(self, names, timeout=None, shared=0, priority=None,
1165
              opportunistic=False, test_notify=None):
1166
    """Acquire a set of resource locks.
1167

1168
    @note: When acquiring locks opportunistically, any number of locks might
1169
      actually be acquired, even zero.
1170

1171
    @type names: list of strings (or string)
1172
    @param names: the names of the locks which shall be acquired
1173
        (special lock names, or instance/node names)
1174
    @type shared: integer (0/1) used as a boolean
1175
    @param shared: whether to acquire in shared mode; by default an
1176
        exclusive lock will be acquired
1177
    @type timeout: float or None
1178
    @param timeout: Maximum time to acquire all locks; for opportunistic
1179
      acquisitions, a timeout can only be given when C{names} is C{None}, in
1180
      which case it is exclusively used for acquiring the L{LockSet}-internal
1181
      lock; opportunistic acquisitions don't use a timeout for acquiring
1182
      individual locks
1183
    @type priority: integer
1184
    @param priority: Priority for acquiring locks
1185
    @type opportunistic: boolean
1186
    @param opportunistic: Acquire locks opportunistically; use the return value
1187
      to determine which locks were actually acquired
1188
    @type test_notify: callable or None
1189
    @param test_notify: Special callback function for unittesting
1190

1191
    @return: Set of all locks successfully acquired or None in case of timeout
1192

1193
    @raise errors.LockError: when any lock we try to acquire has
1194
        been deleted before we succeed. In this case none of the
1195
        locks requested will be acquired.
1196

1197
    """
1198
    assert timeout is None or timeout >= 0.0
1199

    
1200
    # Check we don't already own locks at this level
1201
    assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1202
                                 " (lockset %s)" % self.name)
1203

    
1204
    if priority is None:
1205
      priority = _DEFAULT_PRIORITY
1206

    
1207
    try:
1208
      if names is not None:
1209
        assert timeout is None or not opportunistic, \
1210
          ("Opportunistic acquisitions can only use a timeout if no"
1211
           " names are given; see docstring for details")
1212

    
1213
        # Support passing in a single resource to acquire rather than many
1214
        if isinstance(names, basestring):
1215
          names = [names]
1216

    
1217
        (mode, _, timeout_fn) = \
1218
          _GetLsAcquireModeAndTimeouts(False, timeout, opportunistic)
1219

    
1220
        return self.__acquire_inner(names, mode, shared, priority,
1221
                                    timeout_fn, test_notify)
1222

    
1223
      else:
1224
        (mode, ls_timeout_fn, timeout_fn) = \
1225
          _GetLsAcquireModeAndTimeouts(True, timeout, opportunistic)
1226

    
1227
        # If no names are given acquire the whole set by not letting new names
1228
        # being added before we release, and getting the current list of names.
1229
        # Some of them may then be deleted later, but we'll cope with this.
1230
        #
1231
        # We'd like to acquire this lock in a shared way, as it's nice if
1232
        # everybody else can use the instances at the same time. If we are
1233
        # acquiring them exclusively though they won't be able to do this
1234
        # anyway, though, so we'll get the list lock exclusively as well in
1235
        # order to be able to do add() on the set while owning it.
1236
        if not self.__lock.acquire(shared=shared, priority=priority,
1237
                                   timeout=ls_timeout_fn()):
1238
          raise _AcquireTimeout()
1239

    
1240
        try:
1241
          # note we own the set-lock
1242
          self._add_owned()
1243

    
1244
          return self.__acquire_inner(self.__names(), mode, shared,
1245
                                      priority, timeout_fn, test_notify)
1246
        except:
1247
          # We shouldn't have problems adding the lock to the owners list, but
1248
          # if we did we'll try to release this lock and re-raise exception.
1249
          # Of course something is going to be really wrong, after this.
1250
          self.__lock.release()
1251
          self._del_owned()
1252
          raise
1253

    
1254
    except _AcquireTimeout:
1255
      return None
1256

    
1257
  def __acquire_inner(self, names, mode, shared, priority,
1258
                      timeout_fn, test_notify):
1259
    """Inner logic for acquiring a number of locks.
1260

1261
    Acquisition modes:
1262

1263
      - C{_LS_ACQUIRE_ALL}: C{names} contains names of all locks in set, but
1264
        deleted locks can be ignored as the whole set is being acquired with
1265
        its internal lock held
1266
      - C{_LS_ACQUIRE_EXACT}: The names listed in C{names} must be acquired;
1267
        timeouts and deleted locks are fatal
1268
      - C{_LS_ACQUIRE_OPPORTUNISTIC}: C{names} lists names of locks (potentially
1269
        all within the set) which should be acquired opportunistically, that is
1270
        failures are ignored
1271

1272
    @param names: Names of the locks to be acquired
1273
    @param mode: Lock acquisition mode (one of L{_LS_ACQUIRE_MODES})
1274
    @param shared: Whether to acquire in shared mode
1275
    @param timeout_fn: Function returning remaining timeout (C{None} for
1276
      opportunistic acquisitions)
1277
    @param priority: Priority for acquiring locks
1278
    @param test_notify: Special callback function for unittesting
1279

1280
    """
1281
    assert mode in _LS_ACQUIRE_MODES
1282

    
1283
    acquire_list = []
1284

    
1285
    # First we look the locks up on __lockdict. We have no way of being sure
1286
    # they will still be there after, but this makes it a lot faster should
1287
    # just one of them be the already wrong. Using a sorted sequence to prevent
1288
    # deadlocks.
1289
    for lname in sorted(frozenset(names)):
1290
      try:
1291
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1292
      except KeyError:
1293
        # We are acquiring the whole set, it doesn't matter if this particular
1294
        # element is not there anymore. If, however, only certain names should
1295
        # be acquired, not finding a lock is an error.
1296
        if mode == _LS_ACQUIRE_EXACT:
1297
          raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1298
                                 " been removed)" % (lname, self.name))
1299
      else:
1300
        acquire_list.append((lname, lock))
1301

    
1302
    # This will hold the locknames we effectively acquired.
1303
    acquired = set()
1304

    
1305
    try:
1306
      # Now acquire_list contains a sorted list of resources and locks we
1307
      # want.  In order to get them we loop on this (private) list and
1308
      # acquire() them.  We gave no real guarantee they will still exist till
1309
      # this is done but .acquire() itself is safe and will alert us if the
1310
      # lock gets deleted.
1311
      for (lname, lock) in acquire_list:
1312
        if __debug__ and callable(test_notify):
1313
          test_notify_fn = lambda: test_notify(lname)
1314
        else:
1315
          test_notify_fn = None
1316

    
1317
        timeout = timeout_fn()
1318

    
1319
        try:
1320
          # raises LockError if the lock was deleted
1321
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1322
                                     priority=priority,
1323
                                     test_notify=test_notify_fn)
1324
        except errors.LockError:
1325
          if mode in (_LS_ACQUIRE_ALL, _LS_ACQUIRE_OPPORTUNISTIC):
1326
            # We are acquiring the whole set, it doesn't matter if this
1327
            # particular element is not there anymore.
1328
            continue
1329

    
1330
          raise errors.LockError("Lock '%s' not found in set '%s' (it may have"
1331
                                 " been removed)" % (lname, self.name))
1332

    
1333
        if not acq_success:
1334
          # Couldn't get lock or timeout occurred
1335
          if mode == _LS_ACQUIRE_OPPORTUNISTIC:
1336
            # Ignore timeouts on opportunistic acquisitions
1337
            continue
1338

    
1339
          if timeout is None:
1340
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1341
            # blocking.
1342
            raise errors.LockError("Failed to get lock %s (set %s)" %
1343
                                   (lname, self.name))
1344

    
1345
          raise _AcquireTimeout()
1346

    
1347
        try:
1348
          # now the lock cannot be deleted, we have it!
1349
          self._add_owned(name=lname)
1350
          acquired.add(lname)
1351

    
1352
        except:
1353
          # We shouldn't have problems adding the lock to the owners list, but
1354
          # if we did we'll try to release this lock and re-raise exception.
1355
          # Of course something is going to be really wrong after this.
1356
          if lock.is_owned():
1357
            lock.release()
1358
          raise
1359

    
1360
    except:
1361
      # Release all owned locks
1362
      self._release_and_delete_owned()
1363
      raise
1364

    
1365
    return acquired
1366

    
1367
  def downgrade(self, names=None):
1368
    """Downgrade a set of resource locks from exclusive to shared mode.
1369

1370
    The locks must have been acquired in exclusive mode.
1371

1372
    """
1373
    assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1374
                             " lock" % self.name)
1375

    
1376
    # Support passing in a single resource to downgrade rather than many
1377
    if isinstance(names, basestring):
1378
      names = [names]
1379

    
1380
    owned = self.list_owned()
1381

    
1382
    if names is None:
1383
      names = owned
1384
    else:
1385
      names = set(names)
1386
      assert owned.issuperset(names), \
1387
        ("downgrade() on unheld resources %s (set %s)" %
1388
         (names.difference(owned), self.name))
1389

    
1390
    for lockname in names:
1391
      self.__lockdict[lockname].downgrade()
1392

    
1393
    # Do we own the lockset in exclusive mode?
1394
    if self.__lock.is_owned(shared=0):
1395
      # Have all locks been downgraded?
1396
      if not compat.any(lock.is_owned(shared=0)
1397
                        for lock in self.__lockdict.values()):
1398
        self.__lock.downgrade()
1399
        assert self.__lock.is_owned(shared=1)
1400

    
1401
    return True
1402

    
1403
  def release(self, names=None):
1404
    """Release a set of resource locks, at the same level.
1405

1406
    You must have acquired the locks, either in shared or in exclusive mode,
1407
    before releasing them.
1408

1409
    @type names: list of strings, or None
1410
    @param names: the names of the locks which shall be released
1411
        (defaults to all the locks acquired at that level).
1412

1413
    """
1414
    assert self.is_owned(), ("release() on lock set %s while not owner" %
1415
                             self.name)
1416

    
1417
    # Support passing in a single resource to release rather than many
1418
    if isinstance(names, basestring):
1419
      names = [names]
1420

    
1421
    if names is None:
1422
      names = self.list_owned()
1423
    else:
1424
      names = set(names)
1425
      assert self.list_owned().issuperset(names), (
1426
               "release() on unheld resources %s (set %s)" %
1427
               (names.difference(self.list_owned()), self.name))
1428

    
1429
    # First of all let's release the "all elements" lock, if set.
1430
    # After this 'add' can work again
1431
    if self.__lock.is_owned():
1432
      self.__lock.release()
1433
      self._del_owned()
1434

    
1435
    for lockname in names:
1436
      # If we are sure the lock doesn't leave __lockdict without being
1437
      # exclusively held we can do this...
1438
      self.__lockdict[lockname].release()
1439
      self._del_owned(name=lockname)
1440

    
1441
  def add(self, names, acquired=0, shared=0):
1442
    """Add a new set of elements to the set
1443

1444
    @type names: list of strings
1445
    @param names: names of the new elements to add
1446
    @type acquired: integer (0/1) used as a boolean
1447
    @param acquired: pre-acquire the new resource?
1448
    @type shared: integer (0/1) used as a boolean
1449
    @param shared: is the pre-acquisition shared?
1450

1451
    """
1452
    # Check we don't already own locks at this level
1453
    assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1454
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1455
       self.name)
1456

    
1457
    # Support passing in a single resource to add rather than many
1458
    if isinstance(names, basestring):
1459
      names = [names]
1460

    
1461
    # If we don't already own the set-level lock acquired in an exclusive way
1462
    # we'll get it and note we need to release it later.
1463
    release_lock = False
1464
    if not self.__lock.is_owned():
1465
      release_lock = True
1466
      self.__lock.acquire()
1467

    
1468
    try:
1469
      invalid_names = set(self.__names()).intersection(names)
1470
      if invalid_names:
1471
        # This must be an explicit raise, not an assert, because assert is
1472
        # turned off when using optimization, and this can happen because of
1473
        # concurrency even if the user doesn't want it.
1474
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1475
                               (invalid_names, self.name))
1476

    
1477
      for lockname in names:
1478
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1479

    
1480
        if acquired:
1481
          # No need for priority or timeout here as this lock has just been
1482
          # created
1483
          lock.acquire(shared=shared)
1484
          # now the lock cannot be deleted, we have it!
1485
          try:
1486
            self._add_owned(name=lockname)
1487
          except:
1488
            # We shouldn't have problems adding the lock to the owners list,
1489
            # but if we did we'll try to release this lock and re-raise
1490
            # exception.  Of course something is going to be really wrong,
1491
            # after this.  On the other hand the lock hasn't been added to the
1492
            # __lockdict yet so no other threads should be pending on it. This
1493
            # release is just a safety measure.
1494
            lock.release()
1495
            raise
1496

    
1497
        self.__lockdict[lockname] = lock
1498

    
1499
    finally:
1500
      # Only release __lock if we were not holding it previously.
1501
      if release_lock:
1502
        self.__lock.release()
1503

    
1504
    return True
1505

    
1506
  def remove(self, names):
1507
    """Remove elements from the lock set.
1508

1509
    You can either not hold anything in the lockset or already hold a superset
1510
    of the elements you want to delete, exclusively.
1511

1512
    @type names: list of strings
1513
    @param names: names of the resource to remove.
1514

1515
    @return: a list of locks which we removed; the list is always
1516
        equal to the names list if we were holding all the locks
1517
        exclusively
1518

1519
    """
1520
    # Support passing in a single resource to remove rather than many
1521
    if isinstance(names, basestring):
1522
      names = [names]
1523

    
1524
    # If we own any subset of this lock it must be a superset of what we want
1525
    # to delete. The ownership must also be exclusive, but that will be checked
1526
    # by the lock itself.
1527
    assert not self.is_owned() or self.list_owned().issuperset(names), (
1528
      "remove() on acquired lockset %s while not owning all elements" %
1529
      self.name)
1530

    
1531
    removed = []
1532

    
1533
    for lname in names:
1534
      # Calling delete() acquires the lock exclusively if we don't already own
1535
      # it, and causes all pending and subsequent lock acquires to fail. It's
1536
      # fine to call it out of order because delete() also implies release(),
1537
      # and the assertion above guarantees that if we either already hold
1538
      # everything we want to delete, or we hold none.
1539
      try:
1540
        self.__lockdict[lname].delete()
1541
        removed.append(lname)
1542
      except (KeyError, errors.LockError):
1543
        # This cannot happen if we were already holding it, verify:
1544
        assert not self.is_owned(), ("remove failed while holding lockset %s" %
1545
                                     self.name)
1546
      else:
1547
        # If no LockError was raised we are the ones who deleted the lock.
1548
        # This means we can safely remove it from lockdict, as any further or
1549
        # pending delete() or acquire() will fail (and nobody can have the lock
1550
        # since before our call to delete()).
1551
        #
1552
        # This is done in an else clause because if the exception was thrown
1553
        # it's the job of the one who actually deleted it.
1554
        del self.__lockdict[lname]
1555
        # And let's remove it from our private list if we owned it.
1556
        if self.is_owned():
1557
          self._del_owned(name=lname)
1558

    
1559
    return removed
1560

    
1561

    
1562
# Locking levels, must be acquired in increasing order. Current rules are:
1563
# - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1564
#   acquired before performing any operation, either in shared or exclusive
1565
#   mode. Acquiring the BGL in exclusive mode is discouraged and should be
1566
#   avoided..
1567
# - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
1568
#   you need more than one node, or more than one instance, acquire them at the
1569
#   same time.
1570
# - LEVEL_NODE_RES is for node resources and should be used by operations with
1571
#   possibly high impact on the node's disks.
1572
# - LEVEL_NODE_ALLOC blocks instance allocations for the whole cluster
1573
#   ("NAL" is the only lock at this level). It should be acquired in shared
1574
#   mode when an opcode blocks all or a significant amount of a cluster's
1575
#   locks. Opcodes doing instance allocations should acquire in exclusive mode.
1576
#   Once the set of acquired locks for an opcode has been reduced to the working
1577
#   set, the NAL should be released as well to allow allocations to proceed.
1578
(LEVEL_CLUSTER,
1579
 LEVEL_INSTANCE,
1580
 LEVEL_NODE_ALLOC,
1581
 LEVEL_NODEGROUP,
1582
 LEVEL_NODE,
1583
 LEVEL_NODE_RES,
1584
 LEVEL_NETWORK) = range(0, 7)
1585

    
1586
LEVELS = [
1587
  LEVEL_CLUSTER,
1588
  LEVEL_INSTANCE,
1589
  LEVEL_NODE_ALLOC,
1590
  LEVEL_NODEGROUP,
1591
  LEVEL_NODE,
1592
  LEVEL_NODE_RES,
1593
  LEVEL_NETWORK,
1594
  ]
1595

    
1596
# Lock levels which are modifiable
1597
LEVELS_MOD = compat.UniqueFrozenset([
1598
  LEVEL_NODE_RES,
1599
  LEVEL_NODE,
1600
  LEVEL_NODEGROUP,
1601
  LEVEL_INSTANCE,
1602
  LEVEL_NETWORK,
1603
  ])
1604

    
1605
#: Lock level names (make sure to use singular form)
1606
LEVEL_NAMES = {
1607
  LEVEL_CLUSTER: "cluster",
1608
  LEVEL_INSTANCE: "instance",
1609
  LEVEL_NODE_ALLOC: "node-alloc",
1610
  LEVEL_NODEGROUP: "nodegroup",
1611
  LEVEL_NODE: "node",
1612
  LEVEL_NODE_RES: "node-res",
1613
  LEVEL_NETWORK: "network",
1614
  }
1615

    
1616
# Constant for the big ganeti lock
1617
BGL = "BGL"
1618

    
1619
#: Node allocation lock
1620
NAL = "NAL"
1621

    
1622

    
1623
class GanetiLockManager(object):
1624
  """The Ganeti Locking Library
1625

1626
  The purpose of this small library is to manage locking for ganeti clusters
1627
  in a central place, while at the same time doing dynamic checks against
1628
  possible deadlocks. It will also make it easier to transition to a different
1629
  lock type should we migrate away from python threads.
1630

1631
  """
1632
  _instance = None
1633

    
1634
  def __init__(self, node_uuids, nodegroups, instance_names, networks):
1635
    """Constructs a new GanetiLockManager object.
1636

1637
    There should be only a GanetiLockManager object at any time, so this
1638
    function raises an error if this is not the case.
1639

1640
    @param node_uuids: list of node UUIDs
1641
    @param nodegroups: list of nodegroup uuids
1642
    @param instance_names: list of instance names
1643

1644
    """
1645
    assert self.__class__._instance is None, \
1646
           "double GanetiLockManager instance"
1647

    
1648
    self.__class__._instance = self
1649

    
1650
    self._monitor = LockMonitor()
1651

    
1652
    # The keyring contains all the locks, at their level and in the correct
1653
    # locking order.
1654
    self.__keyring = {
1655
      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1656
      LEVEL_NODE: LockSet(node_uuids, "node", monitor=self._monitor),
1657
      LEVEL_NODE_RES: LockSet(node_uuids, "node-res", monitor=self._monitor),
1658
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1659
      LEVEL_INSTANCE: LockSet(instance_names, "instance",
1660
                              monitor=self._monitor),
1661
      LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
1662
      LEVEL_NODE_ALLOC: LockSet([NAL], "node-alloc", monitor=self._monitor),
1663
      }
1664

    
1665
    assert compat.all(ls.name == LEVEL_NAMES[level]
1666
                      for (level, ls) in self.__keyring.items()), \
1667
      "Keyring name mismatch"
1668

    
1669
  def AddToLockMonitor(self, provider):
1670
    """Registers a new lock with the monitor.
1671

1672
    See L{LockMonitor.RegisterLock}.
1673

1674
    """
1675
    return self._monitor.RegisterLock(provider)
1676

    
1677
  def QueryLocks(self, fields):
1678
    """Queries information from all locks.
1679

1680
    See L{LockMonitor.QueryLocks}.
1681

1682
    """
1683
    return self._monitor.QueryLocks(fields)
1684

    
1685
  def _names(self, level):
1686
    """List the lock names at the given level.
1687

1688
    This can be used for debugging/testing purposes.
1689

1690
    @param level: the level whose list of locks to get
1691

1692
    """
1693
    assert level in LEVELS, "Invalid locking level %s" % level
1694
    return self.__keyring[level]._names()
1695

    
1696
  def is_owned(self, level):
1697
    """Check whether we are owning locks at the given level
1698

1699
    """
1700
    return self.__keyring[level].is_owned()
1701

    
1702
  def list_owned(self, level):
1703
    """Get the set of owned locks at the given level
1704

1705
    """
1706
    return self.__keyring[level].list_owned()
1707

    
1708
  def check_owned(self, level, names, shared=-1):
1709
    """Check if locks at a certain level are owned in a specific mode.
1710

1711
    @see: L{LockSet.check_owned}
1712

1713
    """
1714
    return self.__keyring[level].check_owned(names, shared=shared)
1715

    
1716
  def owning_all(self, level):
1717
    """Checks whether current thread owns all locks at a certain level.
1718

1719
    @see: L{LockSet.owning_all}
1720

1721
    """
1722
    return self.__keyring[level].owning_all()
1723

    
1724
  def _upper_owned(self, level):
1725
    """Check that we don't own any lock at a level greater than the given one.
1726

1727
    """
1728
    # This way of checking only works if LEVELS[i] = i, which we check for in
1729
    # the test cases.
1730
    return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1731

    
1732
  def _BGL_owned(self): # pylint: disable=C0103
1733
    """Check if the current thread owns the BGL.
1734

1735
    Both an exclusive or a shared acquisition work.
1736

1737
    """
1738
    return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1739

    
1740
  @staticmethod
1741
  def _contains_BGL(level, names): # pylint: disable=C0103
1742
    """Check if the level contains the BGL.
1743

1744
    Check if acting on the given level and set of names will change
1745
    the status of the Big Ganeti Lock.
1746

1747
    """
1748
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1749

    
1750
  def acquire(self, level, names, timeout=None, shared=0, priority=None,
1751
              opportunistic=False):
1752
    """Acquire a set of resource locks, at the same level.
1753

1754
    @type level: member of locking.LEVELS
1755
    @param level: the level at which the locks shall be acquired
1756
    @type names: list of strings (or string)
1757
    @param names: the names of the locks which shall be acquired
1758
        (special lock names, or instance/node names)
1759
    @type shared: integer (0/1) used as a boolean
1760
    @param shared: whether to acquire in shared mode; by default
1761
        an exclusive lock will be acquired
1762
    @type timeout: float
1763
    @param timeout: Maximum time to acquire all locks
1764
    @type priority: integer
1765
    @param priority: Priority for acquiring lock
1766
    @type opportunistic: boolean
1767
    @param opportunistic: Acquire locks opportunistically; use the return value
1768
      to determine which locks were actually acquired
1769

1770
    """
1771
    assert level in LEVELS, "Invalid locking level %s" % level
1772

    
1773
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1774
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1775
    # so even if we've migrated we need to at least share the BGL to be
1776
    # compatible with them. Of course if we own the BGL exclusively there's no
1777
    # point in acquiring any other lock, unless perhaps we are half way through
1778
    # the migration of the current opcode.
1779
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1780
      "You must own the Big Ganeti Lock before acquiring any other")
1781

    
1782
    # Check we don't own locks at the same or upper levels.
1783
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1784
                                          " while owning some at a greater one")
1785

    
1786
    # Acquire the locks in the set.
1787
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1788
                                         priority=priority,
1789
                                         opportunistic=opportunistic)
1790

    
1791
  def downgrade(self, level, names=None):
1792
    """Downgrade a set of resource locks from exclusive to shared mode.
1793

1794
    You must have acquired the locks in exclusive mode.
1795

1796
    @type level: member of locking.LEVELS
1797
    @param level: the level at which the locks shall be downgraded
1798
    @type names: list of strings, or None
1799
    @param names: the names of the locks which shall be downgraded
1800
        (defaults to all the locks acquired at the level)
1801

1802
    """
1803
    assert level in LEVELS, "Invalid locking level %s" % level
1804

    
1805
    return self.__keyring[level].downgrade(names=names)
1806

    
1807
  def release(self, level, names=None):
1808
    """Release a set of resource locks, at the same level.
1809

1810
    You must have acquired the locks, either in shared or in exclusive
1811
    mode, before releasing them.
1812

1813
    @type level: member of locking.LEVELS
1814
    @param level: the level at which the locks shall be released
1815
    @type names: list of strings, or None
1816
    @param names: the names of the locks which shall be released
1817
        (defaults to all the locks acquired at that level)
1818

1819
    """
1820
    assert level in LEVELS, "Invalid locking level %s" % level
1821
    assert (not self._contains_BGL(level, names) or
1822
            not self._upper_owned(LEVEL_CLUSTER)), (
1823
              "Cannot release the Big Ganeti Lock while holding something"
1824
              " at upper levels (%r)" %
1825
              (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1826
                                for i in self.__keyring.keys()]), ))
1827

    
1828
    # Release will complain if we don't own the locks already
1829
    return self.__keyring[level].release(names)
1830

    
1831
  def add(self, level, names, acquired=0, shared=0):
1832
    """Add locks at the specified level.
1833

1834
    @type level: member of locking.LEVELS_MOD
1835
    @param level: the level at which the locks shall be added
1836
    @type names: list of strings
1837
    @param names: names of the locks to acquire
1838
    @type acquired: integer (0/1) used as a boolean
1839
    @param acquired: whether to acquire the newly added locks
1840
    @type shared: integer (0/1) used as a boolean
1841
    @param shared: whether the acquisition will be shared
1842

1843
    """
1844
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1845
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1846
                               " operations")
1847
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1848
                                          " while owning some at a greater one")
1849
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1850

    
1851
  def remove(self, level, names):
1852
    """Remove locks from the specified level.
1853

1854
    You must either already own the locks you are trying to remove
1855
    exclusively or not own any lock at an upper level.
1856

1857
    @type level: member of locking.LEVELS_MOD
1858
    @param level: the level at which the locks shall be removed
1859
    @type names: list of strings
1860
    @param names: the names of the locks which shall be removed
1861
        (special lock names, or instance/node names)
1862

1863
    """
1864
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1865
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1866
                               " operations")
1867
    # Check we either own the level or don't own anything from here
1868
    # up. LockSet.remove() will check the case in which we don't own
1869
    # all the needed resources, or we have a shared ownership.
1870
    assert self.is_owned(level) or not self._upper_owned(level), (
1871
           "Cannot remove locks at a level while not owning it or"
1872
           " owning some at a greater one")
1873
    return self.__keyring[level].remove(names)
1874

    
1875

    
1876
def _MonitorSortKey((item, idx, num)):
1877
  """Sorting key function.
1878

1879
  Sort by name, registration order and then order of information. This provides
1880
  a stable sort order over different providers, even if they return the same
1881
  name.
1882

1883
  """
1884
  (name, _, _, _) = item
1885

    
1886
  return (utils.NiceSortKey(name), num, idx)
1887

    
1888

    
1889
class LockMonitor(object):
1890
  _LOCK_ATTR = "_lock"
1891

    
1892
  def __init__(self):
1893
    """Initializes this class.
1894

1895
    """
1896
    self._lock = SharedLock("LockMonitor")
1897

    
1898
    # Counter for stable sorting
1899
    self._counter = itertools.count(0)
1900

    
1901
    # Tracked locks. Weak references are used to avoid issues with circular
1902
    # references and deletion.
1903
    self._locks = weakref.WeakKeyDictionary()
1904

    
1905
  @ssynchronized(_LOCK_ATTR)
1906
  def RegisterLock(self, provider):
1907
    """Registers a new lock.
1908

1909
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1910
      a single C{set} containing the requested information items
1911
    @note: It would be nicer to only receive the function generating the
1912
      requested information but, as it turns out, weak references to bound
1913
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1914
      workarounds, but none of the ones I found works properly in combination
1915
      with a standard C{WeakKeyDictionary}
1916

1917
    """
1918
    assert provider not in self._locks, "Duplicate registration"
1919

    
1920
    # There used to be a check for duplicate names here. As it turned out, when
1921
    # a lock is re-created with the same name in a very short timeframe, the
1922
    # previous instance might not yet be removed from the weakref dictionary.
1923
    # By keeping track of the order of incoming registrations, a stable sort
1924
    # ordering can still be guaranteed.
1925

    
1926
    self._locks[provider] = self._counter.next()
1927

    
1928
  def _GetLockInfo(self, requested):
1929
    """Get information from all locks.
1930

1931
    """
1932
    # Must hold lock while getting consistent list of tracked items
1933
    self._lock.acquire(shared=1)
1934
    try:
1935
      items = self._locks.items()
1936
    finally:
1937
      self._lock.release()
1938

    
1939
    return [(info, idx, num)
1940
            for (provider, num) in items
1941
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1942

    
1943
  def _Query(self, fields):
1944
    """Queries information from all locks.
1945

1946
    @type fields: list of strings
1947
    @param fields: List of fields to return
1948

1949
    """
1950
    qobj = query.Query(query.LOCK_FIELDS, fields)
1951

    
1952
    # Get all data with internal lock held and then sort by name and incoming
1953
    # order
1954
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1955
                      key=_MonitorSortKey)
1956

    
1957
    # Extract lock information and build query data
1958
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1959

    
1960
  def QueryLocks(self, fields):
1961
    """Queries information from all locks.
1962

1963
    @type fields: list of strings
1964
    @param fields: List of fields to return
1965

1966
    """
1967
    (qobj, ctx) = self._Query(fields)
1968

    
1969
    # Prepare query response
1970
    return query.GetQueryResponse(qobj, ctx)