Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 6c0a75db

History | View | Annotate | Download (56.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36
import time
37

    
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import compat
41
from ganeti import query
42

    
43

    
44
_EXCLUSIVE_TEXT = "exclusive"
45
_SHARED_TEXT = "shared"
46
_DELETED_TEXT = "deleted"
47

    
48
_DEFAULT_PRIORITY = 0
49

    
50
#: Minimum timeout required to consider scheduling a pending acquisition
51
#: (seconds)
52
_LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
53

    
54

    
55
def ssynchronized(mylock, shared=0):
56
  """Shared Synchronization decorator.
57

58
  Calls the function holding the given lock, either in exclusive or shared
59
  mode. It requires the passed lock to be a SharedLock (or support its
60
  semantics).
61

62
  @type mylock: lockable object or string
63
  @param mylock: lock to acquire or class member name of the lock to acquire
64

65
  """
66
  def wrap(fn):
67
    def sync_function(*args, **kwargs):
68
      if isinstance(mylock, basestring):
69
        assert args, "cannot ssynchronize on non-class method: self not found"
70
        # args[0] is "self"
71
        lock = getattr(args[0], mylock)
72
      else:
73
        lock = mylock
74
      lock.acquire(shared=shared)
75
      try:
76
        return fn(*args, **kwargs)
77
      finally:
78
        lock.release()
79
    return sync_function
80
  return wrap
81

    
82

    
83
class _SingleNotifyPipeConditionWaiter(object):
84
  """Helper class for SingleNotifyPipeCondition
85

86
  """
87
  __slots__ = [
88
    "_fd",
89
    "_poller",
90
    ]
91

    
92
  def __init__(self, poller, fd):
93
    """Constructor for _SingleNotifyPipeConditionWaiter
94

95
    @type poller: select.poll
96
    @param poller: Poller object
97
    @type fd: int
98
    @param fd: File descriptor to wait for
99

100
    """
101
    object.__init__(self)
102
    self._poller = poller
103
    self._fd = fd
104

    
105
  def __call__(self, timeout):
106
    """Wait for something to happen on the pipe.
107

108
    @type timeout: float or None
109
    @param timeout: Timeout for waiting (can be None)
110

111
    """
112
    running_timeout = utils.RunningTimeout(timeout, True)
113

    
114
    while True:
115
      remaining_time = running_timeout.Remaining()
116

    
117
      if remaining_time is not None:
118
        if remaining_time < 0.0:
119
          break
120

    
121
        # Our calculation uses seconds, poll() wants milliseconds
122
        remaining_time *= 1000
123

    
124
      try:
125
        result = self._poller.poll(remaining_time)
126
      except EnvironmentError, err:
127
        if err.errno != errno.EINTR:
128
          raise
129
        result = None
130

    
131
      # Check whether we were notified
132
      if result and result[0][0] == self._fd:
133
        break
134

    
135

    
136
class _BaseCondition(object):
137
  """Base class containing common code for conditions.
138

139
  Some of this code is taken from python's threading module.
140

141
  """
142
  __slots__ = [
143
    "_lock",
144
    "acquire",
145
    "release",
146
    "_is_owned",
147
    "_acquire_restore",
148
    "_release_save",
149
    ]
150

    
151
  def __init__(self, lock):
152
    """Constructor for _BaseCondition.
153

154
    @type lock: threading.Lock
155
    @param lock: condition base lock
156

157
    """
158
    object.__init__(self)
159

    
160
    try:
161
      self._release_save = lock._release_save
162
    except AttributeError:
163
      self._release_save = self._base_release_save
164
    try:
165
      self._acquire_restore = lock._acquire_restore
166
    except AttributeError:
167
      self._acquire_restore = self._base_acquire_restore
168
    try:
169
      self._is_owned = lock.is_owned
170
    except AttributeError:
171
      self._is_owned = self._base_is_owned
172

    
173
    self._lock = lock
174

    
175
    # Export the lock's acquire() and release() methods
176
    self.acquire = lock.acquire
177
    self.release = lock.release
178

    
179
  def _base_is_owned(self):
180
    """Check whether lock is owned by current thread.
181

182
    """
183
    if self._lock.acquire(0):
184
      self._lock.release()
185
      return False
186
    return True
187

    
188
  def _base_release_save(self):
189
    self._lock.release()
190

    
191
  def _base_acquire_restore(self, _):
192
    self._lock.acquire()
193

    
194
  def _check_owned(self):
195
    """Raise an exception if the current thread doesn't own the lock.
196

197
    """
198
    if not self._is_owned():
199
      raise RuntimeError("cannot work with un-aquired lock")
200

    
201

    
202
class SingleNotifyPipeCondition(_BaseCondition):
203
  """Condition which can only be notified once.
204

205
  This condition class uses pipes and poll, internally, to be able to wait for
206
  notification with a timeout, without resorting to polling. It is almost
207
  compatible with Python's threading.Condition, with the following differences:
208
    - notifyAll can only be called once, and no wait can happen after that
209
    - notify is not supported, only notifyAll
210

211
  """
212

    
213
  __slots__ = [
214
    "_poller",
215
    "_read_fd",
216
    "_write_fd",
217
    "_nwaiters",
218
    "_notified",
219
    ]
220

    
221
  _waiter_class = _SingleNotifyPipeConditionWaiter
222

    
223
  def __init__(self, lock):
224
    """Constructor for SingleNotifyPipeCondition
225

226
    """
227
    _BaseCondition.__init__(self, lock)
228
    self._nwaiters = 0
229
    self._notified = False
230
    self._read_fd = None
231
    self._write_fd = None
232
    self._poller = None
233

    
234
  def _check_unnotified(self):
235
    """Throws an exception if already notified.
236

237
    """
238
    if self._notified:
239
      raise RuntimeError("cannot use already notified condition")
240

    
241
  def _Cleanup(self):
242
    """Cleanup open file descriptors, if any.
243

244
    """
245
    if self._read_fd is not None:
246
      os.close(self._read_fd)
247
      self._read_fd = None
248

    
249
    if self._write_fd is not None:
250
      os.close(self._write_fd)
251
      self._write_fd = None
252
    self._poller = None
253

    
254
  def wait(self, timeout):
255
    """Wait for a notification.
256

257
    @type timeout: float or None
258
    @param timeout: Waiting timeout (can be None)
259

260
    """
261
    self._check_owned()
262
    self._check_unnotified()
263

    
264
    self._nwaiters += 1
265
    try:
266
      if self._poller is None:
267
        (self._read_fd, self._write_fd) = os.pipe()
268
        self._poller = select.poll()
269
        self._poller.register(self._read_fd, select.POLLHUP)
270

    
271
      wait_fn = self._waiter_class(self._poller, self._read_fd)
272
      state = self._release_save()
273
      try:
274
        # Wait for notification
275
        wait_fn(timeout)
276
      finally:
277
        # Re-acquire lock
278
        self._acquire_restore(state)
279
    finally:
280
      self._nwaiters -= 1
281
      if self._nwaiters == 0:
282
        self._Cleanup()
283

    
284
  def notifyAll(self): # pylint: disable=C0103
285
    """Close the writing side of the pipe to notify all waiters.
286

287
    """
288
    self._check_owned()
289
    self._check_unnotified()
290
    self._notified = True
291
    if self._write_fd is not None:
292
      os.close(self._write_fd)
293
      self._write_fd = None
294

    
295

    
296
class PipeCondition(_BaseCondition):
297
  """Group-only non-polling condition with counters.
298

299
  This condition class uses pipes and poll, internally, to be able to wait for
300
  notification with a timeout, without resorting to polling. It is almost
301
  compatible with Python's threading.Condition, but only supports notifyAll and
302
  non-recursive locks. As an additional features it's able to report whether
303
  there are any waiting threads.
304

305
  """
306
  __slots__ = [
307
    "_waiters",
308
    "_single_condition",
309
    ]
310

    
311
  _single_condition_class = SingleNotifyPipeCondition
312

    
313
  def __init__(self, lock):
314
    """Initializes this class.
315

316
    """
317
    _BaseCondition.__init__(self, lock)
318
    self._waiters = set()
319
    self._single_condition = self._single_condition_class(self._lock)
320

    
321
  def wait(self, timeout):
322
    """Wait for a notification.
323

324
    @type timeout: float or None
325
    @param timeout: Waiting timeout (can be None)
326

327
    """
328
    self._check_owned()
329

    
330
    # Keep local reference to the pipe. It could be replaced by another thread
331
    # notifying while we're waiting.
332
    cond = self._single_condition
333

    
334
    self._waiters.add(threading.currentThread())
335
    try:
336
      cond.wait(timeout)
337
    finally:
338
      self._check_owned()
339
      self._waiters.remove(threading.currentThread())
340

    
341
  def notifyAll(self): # pylint: disable=C0103
342
    """Notify all currently waiting threads.
343

344
    """
345
    self._check_owned()
346
    self._single_condition.notifyAll()
347
    self._single_condition = self._single_condition_class(self._lock)
348

    
349
  def get_waiting(self):
350
    """Returns a list of all waiting threads.
351

352
    """
353
    self._check_owned()
354

    
355
    return self._waiters
356

    
357
  def has_waiting(self):
358
    """Returns whether there are active waiters.
359

360
    """
361
    self._check_owned()
362

    
363
    return bool(self._waiters)
364

    
365
  def __repr__(self):
366
    return ("<%s.%s waiters=%s at %#x>" %
367
            (self.__class__.__module__, self.__class__.__name__,
368
             self._waiters, id(self)))
369

    
370

    
371
class _PipeConditionWithMode(PipeCondition):
372
  __slots__ = [
373
    "shared",
374
    ]
375

    
376
  def __init__(self, lock, shared):
377
    """Initializes this class.
378

379
    """
380
    self.shared = shared
381
    PipeCondition.__init__(self, lock)
382

    
383

    
384
class SharedLock(object):
385
  """Implements a shared lock.
386

387
  Multiple threads can acquire the lock in a shared way by calling
388
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
389
  threads can call C{acquire(shared=0)}.
390

391
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
392
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
393
  ...]}. Each per-priority queue contains a normal in-order list of conditions
394
  to be notified when the lock can be acquired. Shared locks are grouped
395
  together by priority and the condition for them is stored in
396
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
397
  references for the per-priority queues indexed by priority for faster access.
398

399
  @type name: string
400
  @ivar name: the name of the lock
401

402
  """
403
  __slots__ = [
404
    "__weakref__",
405
    "__deleted",
406
    "__exc",
407
    "__lock",
408
    "__pending",
409
    "__pending_by_prio",
410
    "__pending_shared",
411
    "__shr",
412
    "__time_fn",
413
    "name",
414
    ]
415

    
416
  __condition_class = _PipeConditionWithMode
417

    
418
  def __init__(self, name, monitor=None, _time_fn=time.time):
419
    """Construct a new SharedLock.
420

421
    @param name: the name of the lock
422
    @type monitor: L{LockMonitor}
423
    @param monitor: Lock monitor with which to register
424

425
    """
426
    object.__init__(self)
427

    
428
    self.name = name
429

    
430
    # Used for unittesting
431
    self.__time_fn = _time_fn
432

    
433
    # Internal lock
434
    self.__lock = threading.Lock()
435

    
436
    # Queue containing waiting acquires
437
    self.__pending = []
438
    self.__pending_by_prio = {}
439
    self.__pending_shared = {}
440

    
441
    # Current lock holders
442
    self.__shr = set()
443
    self.__exc = None
444

    
445
    # is this lock in the deleted state?
446
    self.__deleted = False
447

    
448
    # Register with lock monitor
449
    if monitor:
450
      logging.debug("Adding lock %s to monitor", name)
451
      monitor.RegisterLock(self)
452

    
453
  def __repr__(self):
454
    return ("<%s.%s name=%s at %#x>" %
455
            (self.__class__.__module__, self.__class__.__name__,
456
             self.name, id(self)))
457

    
458
  def GetLockInfo(self, requested):
459
    """Retrieves information for querying locks.
460

461
    @type requested: set
462
    @param requested: Requested information, see C{query.LQ_*}
463

464
    """
465
    self.__lock.acquire()
466
    try:
467
      # Note: to avoid unintentional race conditions, no references to
468
      # modifiable objects should be returned unless they were created in this
469
      # function.
470
      mode = None
471
      owner_names = None
472

    
473
      if query.LQ_MODE in requested:
474
        if self.__deleted:
475
          mode = _DELETED_TEXT
476
          assert not (self.__exc or self.__shr)
477
        elif self.__exc:
478
          mode = _EXCLUSIVE_TEXT
479
        elif self.__shr:
480
          mode = _SHARED_TEXT
481

    
482
      # Current owner(s) are wanted
483
      if query.LQ_OWNER in requested:
484
        if self.__exc:
485
          owner = [self.__exc]
486
        else:
487
          owner = self.__shr
488

    
489
        if owner:
490
          assert not self.__deleted
491
          owner_names = [i.getName() for i in owner]
492

    
493
      # Pending acquires are wanted
494
      if query.LQ_PENDING in requested:
495
        pending = []
496

    
497
        # Sorting instead of copying and using heaq functions for simplicity
498
        for (_, prioqueue) in sorted(self.__pending):
499
          for cond in prioqueue:
500
            if cond.shared:
501
              pendmode = _SHARED_TEXT
502
            else:
503
              pendmode = _EXCLUSIVE_TEXT
504

    
505
            # List of names will be sorted in L{query._GetLockPending}
506
            pending.append((pendmode, [i.getName()
507
                                       for i in cond.get_waiting()]))
508
      else:
509
        pending = None
510

    
511
      return [(self.name, mode, owner_names, pending)]
512
    finally:
513
      self.__lock.release()
514

    
515
  def __check_deleted(self):
516
    """Raises an exception if the lock has been deleted.
517

518
    """
519
    if self.__deleted:
520
      raise errors.LockError("Deleted lock %s" % self.name)
521

    
522
  def __is_sharer(self):
523
    """Is the current thread sharing the lock at this time?
524

525
    """
526
    return threading.currentThread() in self.__shr
527

    
528
  def __is_exclusive(self):
529
    """Is the current thread holding the lock exclusively at this time?
530

531
    """
532
    return threading.currentThread() == self.__exc
533

    
534
  def __is_owned(self, shared=-1):
535
    """Is the current thread somehow owning the lock at this time?
536

537
    This is a private version of the function, which presumes you're holding
538
    the internal lock.
539

540
    """
541
    if shared < 0:
542
      return self.__is_sharer() or self.__is_exclusive()
543
    elif shared:
544
      return self.__is_sharer()
545
    else:
546
      return self.__is_exclusive()
547

    
548
  def is_owned(self, shared=-1):
549
    """Is the current thread somehow owning the lock at this time?
550

551
    @param shared:
552
        - < 0: check for any type of ownership (default)
553
        - 0: check for exclusive ownership
554
        - > 0: check for shared ownership
555

556
    """
557
    self.__lock.acquire()
558
    try:
559
      return self.__is_owned(shared=shared)
560
    finally:
561
      self.__lock.release()
562

    
563
  #: Necessary to remain compatible with threading.Condition, which tries to
564
  #: retrieve a locks' "_is_owned" attribute
565
  _is_owned = is_owned
566

    
567
  def _count_pending(self):
568
    """Returns the number of pending acquires.
569

570
    @rtype: int
571

572
    """
573
    self.__lock.acquire()
574
    try:
575
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
576
    finally:
577
      self.__lock.release()
578

    
579
  def _check_empty(self):
580
    """Checks whether there are any pending acquires.
581

582
    @rtype: bool
583

584
    """
585
    self.__lock.acquire()
586
    try:
587
      # Order is important: __find_first_pending_queue modifies __pending
588
      (_, prioqueue) = self.__find_first_pending_queue()
589

    
590
      return not (prioqueue or
591
                  self.__pending or
592
                  self.__pending_by_prio or
593
                  self.__pending_shared)
594
    finally:
595
      self.__lock.release()
596

    
597
  def __do_acquire(self, shared):
598
    """Actually acquire the lock.
599

600
    """
601
    if shared:
602
      self.__shr.add(threading.currentThread())
603
    else:
604
      self.__exc = threading.currentThread()
605

    
606
  def __can_acquire(self, shared):
607
    """Determine whether lock can be acquired.
608

609
    """
610
    if shared:
611
      return self.__exc is None
612
    else:
613
      return len(self.__shr) == 0 and self.__exc is None
614

    
615
  def __find_first_pending_queue(self):
616
    """Tries to find the topmost queued entry with pending acquires.
617

618
    Removes empty entries while going through the list.
619

620
    """
621
    while self.__pending:
622
      (priority, prioqueue) = self.__pending[0]
623

    
624
      if prioqueue:
625
        return (priority, prioqueue)
626

    
627
      # Remove empty queue
628
      heapq.heappop(self.__pending)
629
      del self.__pending_by_prio[priority]
630
      assert priority not in self.__pending_shared
631

    
632
    return (None, None)
633

    
634
  def __is_on_top(self, cond):
635
    """Checks whether the passed condition is on top of the queue.
636

637
    The caller must make sure the queue isn't empty.
638

639
    """
640
    (_, prioqueue) = self.__find_first_pending_queue()
641

    
642
    return cond == prioqueue[0]
643

    
644
  def __acquire_unlocked(self, shared, timeout, priority):
645
    """Acquire a shared lock.
646

647
    @param shared: whether to acquire in shared mode; by default an
648
        exclusive lock will be acquired
649
    @param timeout: maximum waiting time before giving up
650
    @type priority: integer
651
    @param priority: Priority for acquiring lock
652

653
    """
654
    self.__check_deleted()
655

    
656
    # We cannot acquire the lock if we already have it
657
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
658
                                   " %s" % self.name)
659

    
660
    # Remove empty entries from queue
661
    self.__find_first_pending_queue()
662

    
663
    # Check whether someone else holds the lock or there are pending acquires.
664
    if not self.__pending and self.__can_acquire(shared):
665
      # Apparently not, can acquire lock directly.
666
      self.__do_acquire(shared)
667
      return True
668

    
669
    # The lock couldn't be acquired right away, so if a timeout is given and is
670
    # considered too short, return right away as scheduling a pending
671
    # acquisition is quite expensive
672
    if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
673
      return False
674

    
675
    prioqueue = self.__pending_by_prio.get(priority, None)
676

    
677
    if shared:
678
      # Try to re-use condition for shared acquire
679
      wait_condition = self.__pending_shared.get(priority, None)
680
      assert (wait_condition is None or
681
              (wait_condition.shared and wait_condition in prioqueue))
682
    else:
683
      wait_condition = None
684

    
685
    if wait_condition is None:
686
      if prioqueue is None:
687
        assert priority not in self.__pending_by_prio
688

    
689
        prioqueue = []
690
        heapq.heappush(self.__pending, (priority, prioqueue))
691
        self.__pending_by_prio[priority] = prioqueue
692

    
693
      wait_condition = self.__condition_class(self.__lock, shared)
694
      prioqueue.append(wait_condition)
695

    
696
      if shared:
697
        # Keep reference for further shared acquires on same priority. This is
698
        # better than trying to find it in the list of pending acquires.
699
        assert priority not in self.__pending_shared
700
        self.__pending_shared[priority] = wait_condition
701

    
702
    wait_start = self.__time_fn()
703
    acquired = False
704

    
705
    try:
706
      # Wait until we become the topmost acquire in the queue or the timeout
707
      # expires.
708
      while True:
709
        if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
710
          self.__do_acquire(shared)
711
          acquired = True
712
          break
713

    
714
        # A lot of code assumes blocking acquires always succeed, therefore we
715
        # can never return False for a blocking acquire
716
        if (timeout is not None and
717
            utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
718
          break
719

    
720
        # Wait for notification
721
        wait_condition.wait(timeout)
722
        self.__check_deleted()
723
    finally:
724
      # Remove condition from queue if there are no more waiters
725
      if not wait_condition.has_waiting():
726
        prioqueue.remove(wait_condition)
727
        if wait_condition.shared:
728
          # Remove from list of shared acquires if it wasn't while releasing
729
          # (e.g. on lock deletion)
730
          self.__pending_shared.pop(priority, None)
731

    
732
    return acquired
733

    
734
  def acquire(self, shared=0, timeout=None, priority=None,
735
              test_notify=None):
736
    """Acquire a shared lock.
737

738
    @type shared: integer (0/1) used as a boolean
739
    @param shared: whether to acquire in shared mode; by default an
740
        exclusive lock will be acquired
741
    @type timeout: float
742
    @param timeout: maximum waiting time before giving up
743
    @type priority: integer
744
    @param priority: Priority for acquiring lock
745
    @type test_notify: callable or None
746
    @param test_notify: Special callback function for unittesting
747

748
    """
749
    if priority is None:
750
      priority = _DEFAULT_PRIORITY
751

    
752
    self.__lock.acquire()
753
    try:
754
      # We already got the lock, notify now
755
      if __debug__ and callable(test_notify):
756
        test_notify()
757

    
758
      return self.__acquire_unlocked(shared, timeout, priority)
759
    finally:
760
      self.__lock.release()
761

    
762
  def downgrade(self):
763
    """Changes the lock mode from exclusive to shared.
764

765
    Pending acquires in shared mode on the same priority will go ahead.
766

767
    """
768
    self.__lock.acquire()
769
    try:
770
      assert self.__is_owned(), "Lock must be owned"
771

    
772
      if self.__is_exclusive():
773
        # Do nothing if the lock is already acquired in shared mode
774
        self.__exc = None
775
        self.__do_acquire(1)
776

    
777
        # Important: pending shared acquires should only jump ahead if there
778
        # was a transition from exclusive to shared, otherwise an owner of a
779
        # shared lock can keep calling this function to push incoming shared
780
        # acquires
781
        (priority, prioqueue) = self.__find_first_pending_queue()
782
        if prioqueue:
783
          # Is there a pending shared acquire on this priority?
784
          cond = self.__pending_shared.pop(priority, None)
785
          if cond:
786
            assert cond.shared
787
            assert cond in prioqueue
788

    
789
            # Ensure shared acquire is on top of queue
790
            if len(prioqueue) > 1:
791
              prioqueue.remove(cond)
792
              prioqueue.insert(0, cond)
793

    
794
            # Notify
795
            cond.notifyAll()
796

    
797
      assert not self.__is_exclusive()
798
      assert self.__is_sharer()
799

    
800
      return True
801
    finally:
802
      self.__lock.release()
803

    
804
  def release(self):
805
    """Release a Shared Lock.
806

807
    You must have acquired the lock, either in shared or in exclusive mode,
808
    before calling this function.
809

810
    """
811
    self.__lock.acquire()
812
    try:
813
      assert self.__is_exclusive() or self.__is_sharer(), \
814
        "Cannot release non-owned lock"
815

    
816
      # Autodetect release type
817
      if self.__is_exclusive():
818
        self.__exc = None
819
        notify = True
820
      else:
821
        self.__shr.remove(threading.currentThread())
822
        notify = not self.__shr
823

    
824
      # Notify topmost condition in queue if there are no owners left (for
825
      # shared locks)
826
      if notify:
827
        self.__notify_topmost()
828
    finally:
829
      self.__lock.release()
830

    
831
  def __notify_topmost(self):
832
    """Notifies topmost condition in queue of pending acquires.
833

834
    """
835
    (priority, prioqueue) = self.__find_first_pending_queue()
836
    if prioqueue:
837
      cond = prioqueue[0]
838
      cond.notifyAll()
839
      if cond.shared:
840
        # Prevent further shared acquires from sneaking in while waiters are
841
        # notified
842
        self.__pending_shared.pop(priority, None)
843

    
844
  def _notify_topmost(self):
845
    """Exported version of L{__notify_topmost}.
846

847
    """
848
    self.__lock.acquire()
849
    try:
850
      return self.__notify_topmost()
851
    finally:
852
      self.__lock.release()
853

    
854
  def delete(self, timeout=None, priority=None):
855
    """Delete a Shared Lock.
856

857
    This operation will declare the lock for removal. First the lock will be
858
    acquired in exclusive mode if you don't already own it, then the lock
859
    will be put in a state where any future and pending acquire() fail.
860

861
    @type timeout: float
862
    @param timeout: maximum waiting time before giving up
863
    @type priority: integer
864
    @param priority: Priority for acquiring lock
865

866
    """
867
    if priority is None:
868
      priority = _DEFAULT_PRIORITY
869

    
870
    self.__lock.acquire()
871
    try:
872
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
873

    
874
      self.__check_deleted()
875

    
876
      # The caller is allowed to hold the lock exclusively already.
877
      acquired = self.__is_exclusive()
878

    
879
      if not acquired:
880
        acquired = self.__acquire_unlocked(0, timeout, priority)
881

    
882
      if acquired:
883
        assert self.__is_exclusive() and not self.__is_sharer(), \
884
          "Lock wasn't acquired in exclusive mode"
885

    
886
        self.__deleted = True
887
        self.__exc = None
888

    
889
        assert not (self.__exc or self.__shr), "Found owner during deletion"
890

    
891
        # Notify all acquires. They'll throw an error.
892
        for (_, prioqueue) in self.__pending:
893
          for cond in prioqueue:
894
            cond.notifyAll()
895

    
896
        assert self.__deleted
897

    
898
      return acquired
899
    finally:
900
      self.__lock.release()
901

    
902
  def _release_save(self):
903
    shared = self.__is_sharer()
904
    self.release()
905
    return shared
906

    
907
  def _acquire_restore(self, shared):
908
    self.acquire(shared=shared)
909

    
910

    
911
# Whenever we want to acquire a full LockSet we pass None as the value
912
# to acquire.  Hide this behind this nicely named constant.
913
ALL_SET = None
914

    
915

    
916
class _AcquireTimeout(Exception):
917
  """Internal exception to abort an acquire on a timeout.
918

919
  """
920

    
921

    
922
class LockSet:
923
  """Implements a set of locks.
924

925
  This abstraction implements a set of shared locks for the same resource type,
926
  distinguished by name. The user can lock a subset of the resources and the
927
  LockSet will take care of acquiring the locks always in the same order, thus
928
  preventing deadlock.
929

930
  All the locks needed in the same set must be acquired together, though.
931

932
  @type name: string
933
  @ivar name: the name of the lockset
934

935
  """
936
  def __init__(self, members, name, monitor=None):
937
    """Constructs a new LockSet.
938

939
    @type members: list of strings
940
    @param members: initial members of the set
941
    @type monitor: L{LockMonitor}
942
    @param monitor: Lock monitor with which to register member locks
943

944
    """
945
    assert members is not None, "members parameter is not a list"
946
    self.name = name
947

    
948
    # Lock monitor
949
    self.__monitor = monitor
950

    
951
    # Used internally to guarantee coherency
952
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
953

    
954
    # The lockdict indexes the relationship name -> lock
955
    # The order-of-locking is implied by the alphabetical order of names
956
    self.__lockdict = {}
957

    
958
    for mname in members:
959
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
960
                                          monitor=monitor)
961

    
962
    # The owner dict contains the set of locks each thread owns. For
963
    # performance each thread can access its own key without a global lock on
964
    # this structure. It is paramount though that *no* other type of access is
965
    # done to this structure (eg. no looping over its keys). *_owner helper
966
    # function are defined to guarantee access is correct, but in general never
967
    # do anything different than __owners[threading.currentThread()], or there
968
    # will be trouble.
969
    self.__owners = {}
970

    
971
  def _GetLockName(self, mname):
972
    """Returns the name for a member lock.
973

974
    """
975
    return "%s/%s" % (self.name, mname)
976

    
977
  def _get_lock(self):
978
    """Returns the lockset-internal lock.
979

980
    """
981
    return self.__lock
982

    
983
  def _get_lockdict(self):
984
    """Returns the lockset-internal lock dictionary.
985

986
    Accessing this structure is only safe in single-thread usage or when the
987
    lockset-internal lock is held.
988

989
    """
990
    return self.__lockdict
991

    
992
  def is_owned(self):
993
    """Is the current thread a current level owner?
994

995
    @note: Use L{check_owned} to check if a specific lock is held
996

997
    """
998
    return threading.currentThread() in self.__owners
999

    
1000
  def check_owned(self, names, shared=-1):
1001
    """Check if locks are owned in a specific mode.
1002

1003
    @type names: sequence or string
1004
    @param names: Lock names (or a single lock name)
1005
    @param shared: See L{SharedLock.is_owned}
1006
    @rtype: bool
1007
    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
1008
      L{list_owned} to get the names of all owned locks
1009

1010
    """
1011
    if isinstance(names, basestring):
1012
      names = [names]
1013

    
1014
    # Avoid check if no locks are owned anyway
1015
    if names and self.is_owned():
1016
      candidates = []
1017

    
1018
      # Gather references to all locks (in case they're deleted in the meantime)
1019
      for lname in names:
1020
        try:
1021
          lock = self.__lockdict[lname]
1022
        except KeyError:
1023
          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
1024
                                 " have been removed)" % (lname, self.name))
1025
        else:
1026
          candidates.append(lock)
1027

    
1028
      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
1029
    else:
1030
      return False
1031

    
1032
  def _add_owned(self, name=None):
1033
    """Note the current thread owns the given lock"""
1034
    if name is None:
1035
      if not self.is_owned():
1036
        self.__owners[threading.currentThread()] = set()
1037
    else:
1038
      if self.is_owned():
1039
        self.__owners[threading.currentThread()].add(name)
1040
      else:
1041
        self.__owners[threading.currentThread()] = set([name])
1042

    
1043
  def _del_owned(self, name=None):
1044
    """Note the current thread owns the given lock"""
1045

    
1046
    assert not (name is None and self.__lock.is_owned()), \
1047
           "Cannot hold internal lock when deleting owner status"
1048

    
1049
    if name is not None:
1050
      self.__owners[threading.currentThread()].remove(name)
1051

    
1052
    # Only remove the key if we don't hold the set-lock as well
1053
    if not (self.__lock.is_owned() or
1054
            self.__owners[threading.currentThread()]):
1055
      del self.__owners[threading.currentThread()]
1056

    
1057
  def list_owned(self):
1058
    """Get the set of resource names owned by the current thread"""
1059
    if self.is_owned():
1060
      return self.__owners[threading.currentThread()].copy()
1061
    else:
1062
      return set()
1063

    
1064
  def _release_and_delete_owned(self):
1065
    """Release and delete all resources owned by the current thread"""
1066
    for lname in self.list_owned():
1067
      lock = self.__lockdict[lname]
1068
      if lock.is_owned():
1069
        lock.release()
1070
      self._del_owned(name=lname)
1071

    
1072
  def __names(self):
1073
    """Return the current set of names.
1074

1075
    Only call this function while holding __lock and don't iterate on the
1076
    result after releasing the lock.
1077

1078
    """
1079
    return self.__lockdict.keys()
1080

    
1081
  def _names(self):
1082
    """Return a copy of the current set of elements.
1083

1084
    Used only for debugging purposes.
1085

1086
    """
1087
    # If we don't already own the set-level lock acquired
1088
    # we'll get it and note we need to release it later.
1089
    release_lock = False
1090
    if not self.__lock.is_owned():
1091
      release_lock = True
1092
      self.__lock.acquire(shared=1)
1093
    try:
1094
      result = self.__names()
1095
    finally:
1096
      if release_lock:
1097
        self.__lock.release()
1098
    return set(result)
1099

    
1100
  def acquire(self, names, timeout=None, shared=0, priority=None,
1101
              test_notify=None):
1102
    """Acquire a set of resource locks.
1103

1104
    @type names: list of strings (or string)
1105
    @param names: the names of the locks which shall be acquired
1106
        (special lock names, or instance/node names)
1107
    @type shared: integer (0/1) used as a boolean
1108
    @param shared: whether to acquire in shared mode; by default an
1109
        exclusive lock will be acquired
1110
    @type timeout: float or None
1111
    @param timeout: Maximum time to acquire all locks
1112
    @type priority: integer
1113
    @param priority: Priority for acquiring locks
1114
    @type test_notify: callable or None
1115
    @param test_notify: Special callback function for unittesting
1116

1117
    @return: Set of all locks successfully acquired or None in case of timeout
1118

1119
    @raise errors.LockError: when any lock we try to acquire has
1120
        been deleted before we succeed. In this case none of the
1121
        locks requested will be acquired.
1122

1123
    """
1124
    assert timeout is None or timeout >= 0.0
1125

    
1126
    # Check we don't already own locks at this level
1127
    assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1128
                                 " (lockset %s)" % self.name)
1129

    
1130
    if priority is None:
1131
      priority = _DEFAULT_PRIORITY
1132

    
1133
    # We need to keep track of how long we spent waiting for a lock. The
1134
    # timeout passed to this function is over all lock acquires.
1135
    running_timeout = utils.RunningTimeout(timeout, False)
1136

    
1137
    try:
1138
      if names is not None:
1139
        # Support passing in a single resource to acquire rather than many
1140
        if isinstance(names, basestring):
1141
          names = [names]
1142

    
1143
        return self.__acquire_inner(names, False, shared, priority,
1144
                                    running_timeout.Remaining, test_notify)
1145

    
1146
      else:
1147
        # If no names are given acquire the whole set by not letting new names
1148
        # being added before we release, and getting the current list of names.
1149
        # Some of them may then be deleted later, but we'll cope with this.
1150
        #
1151
        # We'd like to acquire this lock in a shared way, as it's nice if
1152
        # everybody else can use the instances at the same time. If we are
1153
        # acquiring them exclusively though they won't be able to do this
1154
        # anyway, though, so we'll get the list lock exclusively as well in
1155
        # order to be able to do add() on the set while owning it.
1156
        if not self.__lock.acquire(shared=shared, priority=priority,
1157
                                   timeout=running_timeout.Remaining()):
1158
          raise _AcquireTimeout()
1159
        try:
1160
          # note we own the set-lock
1161
          self._add_owned()
1162

    
1163
          return self.__acquire_inner(self.__names(), True, shared, priority,
1164
                                      running_timeout.Remaining, test_notify)
1165
        except:
1166
          # We shouldn't have problems adding the lock to the owners list, but
1167
          # if we did we'll try to release this lock and re-raise exception.
1168
          # Of course something is going to be really wrong, after this.
1169
          self.__lock.release()
1170
          self._del_owned()
1171
          raise
1172

    
1173
    except _AcquireTimeout:
1174
      return None
1175

    
1176
  def __acquire_inner(self, names, want_all, shared, priority,
1177
                      timeout_fn, test_notify):
1178
    """Inner logic for acquiring a number of locks.
1179

1180
    @param names: Names of the locks to be acquired
1181
    @param want_all: Whether all locks in the set should be acquired
1182
    @param shared: Whether to acquire in shared mode
1183
    @param timeout_fn: Function returning remaining timeout
1184
    @param priority: Priority for acquiring locks
1185
    @param test_notify: Special callback function for unittesting
1186

1187
    """
1188
    acquire_list = []
1189

    
1190
    # First we look the locks up on __lockdict. We have no way of being sure
1191
    # they will still be there after, but this makes it a lot faster should
1192
    # just one of them be the already wrong. Using a sorted sequence to prevent
1193
    # deadlocks.
1194
    for lname in sorted(utils.UniqueSequence(names)):
1195
      try:
1196
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1197
      except KeyError:
1198
        if want_all:
1199
          # We are acquiring all the set, it doesn't matter if this particular
1200
          # element is not there anymore.
1201
          continue
1202

    
1203
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1204
                               " been removed)" % (lname, self.name))
1205

    
1206
      acquire_list.append((lname, lock))
1207

    
1208
    # This will hold the locknames we effectively acquired.
1209
    acquired = set()
1210

    
1211
    try:
1212
      # Now acquire_list contains a sorted list of resources and locks we
1213
      # want.  In order to get them we loop on this (private) list and
1214
      # acquire() them.  We gave no real guarantee they will still exist till
1215
      # this is done but .acquire() itself is safe and will alert us if the
1216
      # lock gets deleted.
1217
      for (lname, lock) in acquire_list:
1218
        if __debug__ and callable(test_notify):
1219
          test_notify_fn = lambda: test_notify(lname)
1220
        else:
1221
          test_notify_fn = None
1222

    
1223
        timeout = timeout_fn()
1224

    
1225
        try:
1226
          # raises LockError if the lock was deleted
1227
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1228
                                     priority=priority,
1229
                                     test_notify=test_notify_fn)
1230
        except errors.LockError:
1231
          if want_all:
1232
            # We are acquiring all the set, it doesn't matter if this
1233
            # particular element is not there anymore.
1234
            continue
1235

    
1236
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1237
                                 " have been removed)" % (lname, self.name))
1238

    
1239
        if not acq_success:
1240
          # Couldn't get lock or timeout occurred
1241
          if timeout is None:
1242
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1243
            # blocking.
1244
            raise errors.LockError("Failed to get lock %s (set %s)" %
1245
                                   (lname, self.name))
1246

    
1247
          raise _AcquireTimeout()
1248

    
1249
        try:
1250
          # now the lock cannot be deleted, we have it!
1251
          self._add_owned(name=lname)
1252
          acquired.add(lname)
1253

    
1254
        except:
1255
          # We shouldn't have problems adding the lock to the owners list, but
1256
          # if we did we'll try to release this lock and re-raise exception.
1257
          # Of course something is going to be really wrong after this.
1258
          if lock.is_owned():
1259
            lock.release()
1260
          raise
1261

    
1262
    except:
1263
      # Release all owned locks
1264
      self._release_and_delete_owned()
1265
      raise
1266

    
1267
    return acquired
1268

    
1269
  def downgrade(self, names=None):
1270
    """Downgrade a set of resource locks from exclusive to shared mode.
1271

1272
    The locks must have been acquired in exclusive mode.
1273

1274
    """
1275
    assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1276
                             " lock" % self.name)
1277

    
1278
    # Support passing in a single resource to downgrade rather than many
1279
    if isinstance(names, basestring):
1280
      names = [names]
1281

    
1282
    owned = self.list_owned()
1283

    
1284
    if names is None:
1285
      names = owned
1286
    else:
1287
      names = set(names)
1288
      assert owned.issuperset(names), \
1289
        ("downgrade() on unheld resources %s (set %s)" %
1290
         (names.difference(owned), self.name))
1291

    
1292
    for lockname in names:
1293
      self.__lockdict[lockname].downgrade()
1294

    
1295
    # Do we own the lockset in exclusive mode?
1296
    if self.__lock.is_owned(shared=0):
1297
      # Have all locks been downgraded?
1298
      if not compat.any(lock.is_owned(shared=0)
1299
                        for lock in self.__lockdict.values()):
1300
        self.__lock.downgrade()
1301
        assert self.__lock.is_owned(shared=1)
1302

    
1303
    return True
1304

    
1305
  def release(self, names=None):
1306
    """Release a set of resource locks, at the same level.
1307

1308
    You must have acquired the locks, either in shared or in exclusive mode,
1309
    before releasing them.
1310

1311
    @type names: list of strings, or None
1312
    @param names: the names of the locks which shall be released
1313
        (defaults to all the locks acquired at that level).
1314

1315
    """
1316
    assert self.is_owned(), ("release() on lock set %s while not owner" %
1317
                             self.name)
1318

    
1319
    # Support passing in a single resource to release rather than many
1320
    if isinstance(names, basestring):
1321
      names = [names]
1322

    
1323
    if names is None:
1324
      names = self.list_owned()
1325
    else:
1326
      names = set(names)
1327
      assert self.list_owned().issuperset(names), (
1328
               "release() on unheld resources %s (set %s)" %
1329
               (names.difference(self.list_owned()), self.name))
1330

    
1331
    # First of all let's release the "all elements" lock, if set.
1332
    # After this 'add' can work again
1333
    if self.__lock.is_owned():
1334
      self.__lock.release()
1335
      self._del_owned()
1336

    
1337
    for lockname in names:
1338
      # If we are sure the lock doesn't leave __lockdict without being
1339
      # exclusively held we can do this...
1340
      self.__lockdict[lockname].release()
1341
      self._del_owned(name=lockname)
1342

    
1343
  def add(self, names, acquired=0, shared=0):
1344
    """Add a new set of elements to the set
1345

1346
    @type names: list of strings
1347
    @param names: names of the new elements to add
1348
    @type acquired: integer (0/1) used as a boolean
1349
    @param acquired: pre-acquire the new resource?
1350
    @type shared: integer (0/1) used as a boolean
1351
    @param shared: is the pre-acquisition shared?
1352

1353
    """
1354
    # Check we don't already own locks at this level
1355
    assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1356
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1357
       self.name)
1358

    
1359
    # Support passing in a single resource to add rather than many
1360
    if isinstance(names, basestring):
1361
      names = [names]
1362

    
1363
    # If we don't already own the set-level lock acquired in an exclusive way
1364
    # we'll get it and note we need to release it later.
1365
    release_lock = False
1366
    if not self.__lock.is_owned():
1367
      release_lock = True
1368
      self.__lock.acquire()
1369

    
1370
    try:
1371
      invalid_names = set(self.__names()).intersection(names)
1372
      if invalid_names:
1373
        # This must be an explicit raise, not an assert, because assert is
1374
        # turned off when using optimization, and this can happen because of
1375
        # concurrency even if the user doesn't want it.
1376
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1377
                               (invalid_names, self.name))
1378

    
1379
      for lockname in names:
1380
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1381

    
1382
        if acquired:
1383
          # No need for priority or timeout here as this lock has just been
1384
          # created
1385
          lock.acquire(shared=shared)
1386
          # now the lock cannot be deleted, we have it!
1387
          try:
1388
            self._add_owned(name=lockname)
1389
          except:
1390
            # We shouldn't have problems adding the lock to the owners list,
1391
            # but if we did we'll try to release this lock and re-raise
1392
            # exception.  Of course something is going to be really wrong,
1393
            # after this.  On the other hand the lock hasn't been added to the
1394
            # __lockdict yet so no other threads should be pending on it. This
1395
            # release is just a safety measure.
1396
            lock.release()
1397
            raise
1398

    
1399
        self.__lockdict[lockname] = lock
1400

    
1401
    finally:
1402
      # Only release __lock if we were not holding it previously.
1403
      if release_lock:
1404
        self.__lock.release()
1405

    
1406
    return True
1407

    
1408
  def remove(self, names):
1409
    """Remove elements from the lock set.
1410

1411
    You can either not hold anything in the lockset or already hold a superset
1412
    of the elements you want to delete, exclusively.
1413

1414
    @type names: list of strings
1415
    @param names: names of the resource to remove.
1416

1417
    @return: a list of locks which we removed; the list is always
1418
        equal to the names list if we were holding all the locks
1419
        exclusively
1420

1421
    """
1422
    # Support passing in a single resource to remove rather than many
1423
    if isinstance(names, basestring):
1424
      names = [names]
1425

    
1426
    # If we own any subset of this lock it must be a superset of what we want
1427
    # to delete. The ownership must also be exclusive, but that will be checked
1428
    # by the lock itself.
1429
    assert not self.is_owned() or self.list_owned().issuperset(names), (
1430
      "remove() on acquired lockset %s while not owning all elements" %
1431
      self.name)
1432

    
1433
    removed = []
1434

    
1435
    for lname in names:
1436
      # Calling delete() acquires the lock exclusively if we don't already own
1437
      # it, and causes all pending and subsequent lock acquires to fail. It's
1438
      # fine to call it out of order because delete() also implies release(),
1439
      # and the assertion above guarantees that if we either already hold
1440
      # everything we want to delete, or we hold none.
1441
      try:
1442
        self.__lockdict[lname].delete()
1443
        removed.append(lname)
1444
      except (KeyError, errors.LockError):
1445
        # This cannot happen if we were already holding it, verify:
1446
        assert not self.is_owned(), ("remove failed while holding lockset %s" %
1447
                                     self.name)
1448
      else:
1449
        # If no LockError was raised we are the ones who deleted the lock.
1450
        # This means we can safely remove it from lockdict, as any further or
1451
        # pending delete() or acquire() will fail (and nobody can have the lock
1452
        # since before our call to delete()).
1453
        #
1454
        # This is done in an else clause because if the exception was thrown
1455
        # it's the job of the one who actually deleted it.
1456
        del self.__lockdict[lname]
1457
        # And let's remove it from our private list if we owned it.
1458
        if self.is_owned():
1459
          self._del_owned(name=lname)
1460

    
1461
    return removed
1462

    
1463

    
1464
# Locking levels, must be acquired in increasing order.
1465
# Current rules are:
1466
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1467
#   acquired before performing any operation, either in shared or in exclusive
1468
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1469
#   avoided.
1470
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1471
#   If you need more than one node, or more than one instance, acquire them at
1472
#   the same time.
1473
LEVEL_CLUSTER = 0
1474
LEVEL_INSTANCE = 1
1475
LEVEL_NODEGROUP = 2
1476
LEVEL_NODE = 3
1477
#: Level for node resources, used for operations with possibly high impact on
1478
#: the node's disks.
1479
LEVEL_NODE_RES = 4
1480
LEVEL_NETWORK = 5
1481

    
1482
LEVELS = [
1483
  LEVEL_CLUSTER,
1484
  LEVEL_INSTANCE,
1485
  LEVEL_NODEGROUP,
1486
  LEVEL_NODE,
1487
  LEVEL_NODE_RES,
1488
  LEVEL_NETWORK,
1489
  ]
1490

    
1491
# Lock levels which are modifiable
1492
LEVELS_MOD = frozenset([
1493
  LEVEL_NODE_RES,
1494
  LEVEL_NODE,
1495
  LEVEL_NODEGROUP,
1496
  LEVEL_INSTANCE,
1497
  LEVEL_NETWORK,
1498
  ])
1499

    
1500
#: Lock level names (make sure to use singular form)
1501
LEVEL_NAMES = {
1502
  LEVEL_CLUSTER: "cluster",
1503
  LEVEL_INSTANCE: "instance",
1504
  LEVEL_NODEGROUP: "nodegroup",
1505
  LEVEL_NODE: "node",
1506
  LEVEL_NODE_RES: "node-res",
1507
  LEVEL_NETWORK: "network",
1508
  }
1509

    
1510
# Constant for the big ganeti lock
1511
BGL = "BGL"
1512

    
1513

    
1514
class GanetiLockManager:
1515
  """The Ganeti Locking Library
1516

1517
  The purpose of this small library is to manage locking for ganeti clusters
1518
  in a central place, while at the same time doing dynamic checks against
1519
  possible deadlocks. It will also make it easier to transition to a different
1520
  lock type should we migrate away from python threads.
1521

1522
  """
1523
  _instance = None
1524

    
1525
  def __init__(self, nodes, nodegroups, instances, networks):
1526
    """Constructs a new GanetiLockManager object.
1527

1528
    There should be only a GanetiLockManager object at any time, so this
1529
    function raises an error if this is not the case.
1530

1531
    @param nodes: list of node names
1532
    @param nodegroups: list of nodegroup uuids
1533
    @param instances: list of instance names
1534

1535
    """
1536
    assert self.__class__._instance is None, \
1537
           "double GanetiLockManager instance"
1538

    
1539
    self.__class__._instance = self
1540

    
1541
    self._monitor = LockMonitor()
1542

    
1543
    # The keyring contains all the locks, at their level and in the correct
1544
    # locking order.
1545
    self.__keyring = {
1546
      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1547
      LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1548
      LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1549
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1550
      LEVEL_INSTANCE: LockSet(instances, "instance", monitor=self._monitor),
1551
      LEVEL_NETWORK: LockSet(networks, "network", monitor=self._monitor),
1552
      }
1553

    
1554
    assert compat.all(ls.name == LEVEL_NAMES[level]
1555
                      for (level, ls) in self.__keyring.items())
1556

    
1557
  def AddToLockMonitor(self, provider):
1558
    """Registers a new lock with the monitor.
1559

1560
    See L{LockMonitor.RegisterLock}.
1561

1562
    """
1563
    return self._monitor.RegisterLock(provider)
1564

    
1565
  def QueryLocks(self, fields):
1566
    """Queries information from all locks.
1567

1568
    See L{LockMonitor.QueryLocks}.
1569

1570
    """
1571
    return self._monitor.QueryLocks(fields)
1572

    
1573
  def _names(self, level):
1574
    """List the lock names at the given level.
1575

1576
    This can be used for debugging/testing purposes.
1577

1578
    @param level: the level whose list of locks to get
1579

1580
    """
1581
    assert level in LEVELS, "Invalid locking level %s" % level
1582
    return self.__keyring[level]._names()
1583

    
1584
  def is_owned(self, level):
1585
    """Check whether we are owning locks at the given level
1586

1587
    """
1588
    return self.__keyring[level].is_owned()
1589

    
1590
  def list_owned(self, level):
1591
    """Get the set of owned locks at the given level
1592

1593
    """
1594
    return self.__keyring[level].list_owned()
1595

    
1596
  def check_owned(self, level, names, shared=-1):
1597
    """Check if locks at a certain level are owned in a specific mode.
1598

1599
    @see: L{LockSet.check_owned}
1600

1601
    """
1602
    return self.__keyring[level].check_owned(names, shared=shared)
1603

    
1604
  def _upper_owned(self, level):
1605
    """Check that we don't own any lock at a level greater than the given one.
1606

1607
    """
1608
    # This way of checking only works if LEVELS[i] = i, which we check for in
1609
    # the test cases.
1610
    return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1611

    
1612
  def _BGL_owned(self): # pylint: disable=C0103
1613
    """Check if the current thread owns the BGL.
1614

1615
    Both an exclusive or a shared acquisition work.
1616

1617
    """
1618
    return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1619

    
1620
  @staticmethod
1621
  def _contains_BGL(level, names): # pylint: disable=C0103
1622
    """Check if the level contains the BGL.
1623

1624
    Check if acting on the given level and set of names will change
1625
    the status of the Big Ganeti Lock.
1626

1627
    """
1628
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1629

    
1630
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1631
    """Acquire a set of resource locks, at the same level.
1632

1633
    @type level: member of locking.LEVELS
1634
    @param level: the level at which the locks shall be acquired
1635
    @type names: list of strings (or string)
1636
    @param names: the names of the locks which shall be acquired
1637
        (special lock names, or instance/node names)
1638
    @type shared: integer (0/1) used as a boolean
1639
    @param shared: whether to acquire in shared mode; by default
1640
        an exclusive lock will be acquired
1641
    @type timeout: float
1642
    @param timeout: Maximum time to acquire all locks
1643
    @type priority: integer
1644
    @param priority: Priority for acquiring lock
1645

1646
    """
1647
    assert level in LEVELS, "Invalid locking level %s" % level
1648

    
1649
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1650
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1651
    # so even if we've migrated we need to at least share the BGL to be
1652
    # compatible with them. Of course if we own the BGL exclusively there's no
1653
    # point in acquiring any other lock, unless perhaps we are half way through
1654
    # the migration of the current opcode.
1655
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1656
      "You must own the Big Ganeti Lock before acquiring any other")
1657

    
1658
    # Check we don't own locks at the same or upper levels.
1659
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1660
                                          " while owning some at a greater one")
1661

    
1662
    # Acquire the locks in the set.
1663
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1664
                                         priority=priority)
1665

    
1666
  def downgrade(self, level, names=None):
1667
    """Downgrade a set of resource locks from exclusive to shared mode.
1668

1669
    You must have acquired the locks in exclusive mode.
1670

1671
    @type level: member of locking.LEVELS
1672
    @param level: the level at which the locks shall be downgraded
1673
    @type names: list of strings, or None
1674
    @param names: the names of the locks which shall be downgraded
1675
        (defaults to all the locks acquired at the level)
1676

1677
    """
1678
    assert level in LEVELS, "Invalid locking level %s" % level
1679

    
1680
    return self.__keyring[level].downgrade(names=names)
1681

    
1682
  def release(self, level, names=None):
1683
    """Release a set of resource locks, at the same level.
1684

1685
    You must have acquired the locks, either in shared or in exclusive
1686
    mode, before releasing them.
1687

1688
    @type level: member of locking.LEVELS
1689
    @param level: the level at which the locks shall be released
1690
    @type names: list of strings, or None
1691
    @param names: the names of the locks which shall be released
1692
        (defaults to all the locks acquired at that level)
1693

1694
    """
1695
    assert level in LEVELS, "Invalid locking level %s" % level
1696
    assert (not self._contains_BGL(level, names) or
1697
            not self._upper_owned(LEVEL_CLUSTER)), (
1698
              "Cannot release the Big Ganeti Lock while holding something"
1699
              " at upper levels (%r)" %
1700
              (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1701
                                for i in self.__keyring.keys()]), ))
1702

    
1703
    # Release will complain if we don't own the locks already
1704
    return self.__keyring[level].release(names)
1705

    
1706
  def add(self, level, names, acquired=0, shared=0):
1707
    """Add locks at the specified level.
1708

1709
    @type level: member of locking.LEVELS_MOD
1710
    @param level: the level at which the locks shall be added
1711
    @type names: list of strings
1712
    @param names: names of the locks to acquire
1713
    @type acquired: integer (0/1) used as a boolean
1714
    @param acquired: whether to acquire the newly added locks
1715
    @type shared: integer (0/1) used as a boolean
1716
    @param shared: whether the acquisition will be shared
1717

1718
    """
1719
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1720
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1721
                               " operations")
1722
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1723
                                          " while owning some at a greater one")
1724
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1725

    
1726
  def remove(self, level, names):
1727
    """Remove locks from the specified level.
1728

1729
    You must either already own the locks you are trying to remove
1730
    exclusively or not own any lock at an upper level.
1731

1732
    @type level: member of locking.LEVELS_MOD
1733
    @param level: the level at which the locks shall be removed
1734
    @type names: list of strings
1735
    @param names: the names of the locks which shall be removed
1736
        (special lock names, or instance/node names)
1737

1738
    """
1739
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1740
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1741
                               " operations")
1742
    # Check we either own the level or don't own anything from here
1743
    # up. LockSet.remove() will check the case in which we don't own
1744
    # all the needed resources, or we have a shared ownership.
1745
    assert self.is_owned(level) or not self._upper_owned(level), (
1746
           "Cannot remove locks at a level while not owning it or"
1747
           " owning some at a greater one")
1748
    return self.__keyring[level].remove(names)
1749

    
1750

    
1751
def _MonitorSortKey((item, idx, num)):
1752
  """Sorting key function.
1753

1754
  Sort by name, registration order and then order of information. This provides
1755
  a stable sort order over different providers, even if they return the same
1756
  name.
1757

1758
  """
1759
  (name, _, _, _) = item
1760

    
1761
  return (utils.NiceSortKey(name), num, idx)
1762

    
1763

    
1764
class LockMonitor(object):
1765
  _LOCK_ATTR = "_lock"
1766

    
1767
  def __init__(self):
1768
    """Initializes this class.
1769

1770
    """
1771
    self._lock = SharedLock("LockMonitor")
1772

    
1773
    # Counter for stable sorting
1774
    self._counter = itertools.count(0)
1775

    
1776
    # Tracked locks. Weak references are used to avoid issues with circular
1777
    # references and deletion.
1778
    self._locks = weakref.WeakKeyDictionary()
1779

    
1780
  @ssynchronized(_LOCK_ATTR)
1781
  def RegisterLock(self, provider):
1782
    """Registers a new lock.
1783

1784
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1785
      a single C{set} containing the requested information items
1786
    @note: It would be nicer to only receive the function generating the
1787
      requested information but, as it turns out, weak references to bound
1788
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1789
      workarounds, but none of the ones I found works properly in combination
1790
      with a standard C{WeakKeyDictionary}
1791

1792
    """
1793
    assert provider not in self._locks, "Duplicate registration"
1794

    
1795
    # There used to be a check for duplicate names here. As it turned out, when
1796
    # a lock is re-created with the same name in a very short timeframe, the
1797
    # previous instance might not yet be removed from the weakref dictionary.
1798
    # By keeping track of the order of incoming registrations, a stable sort
1799
    # ordering can still be guaranteed.
1800

    
1801
    self._locks[provider] = self._counter.next()
1802

    
1803
  def _GetLockInfo(self, requested):
1804
    """Get information from all locks.
1805

1806
    """
1807
    # Must hold lock while getting consistent list of tracked items
1808
    self._lock.acquire(shared=1)
1809
    try:
1810
      items = self._locks.items()
1811
    finally:
1812
      self._lock.release()
1813

    
1814
    return [(info, idx, num)
1815
            for (provider, num) in items
1816
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1817

    
1818
  def _Query(self, fields):
1819
    """Queries information from all locks.
1820

1821
    @type fields: list of strings
1822
    @param fields: List of fields to return
1823

1824
    """
1825
    qobj = query.Query(query.LOCK_FIELDS, fields)
1826

    
1827
    # Get all data with internal lock held and then sort by name and incoming
1828
    # order
1829
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1830
                      key=_MonitorSortKey)
1831

    
1832
    # Extract lock information and build query data
1833
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1834

    
1835
  def QueryLocks(self, fields):
1836
    """Queries information from all locks.
1837

1838
    @type fields: list of strings
1839
    @param fields: List of fields to return
1840

1841
    """
1842
    (qobj, ctx) = self._Query(fields)
1843

    
1844
    # Prepare query response
1845
    return query.GetQueryResponse(qobj, ctx)