Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ f2f57b6e

History | View | Annotate | Download (55.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import errno
32
import weakref
33
import logging
34
import heapq
35
import itertools
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import compat
40
from ganeti import query
41

    
42

    
43
_EXCLUSIVE_TEXT = "exclusive"
44
_SHARED_TEXT = "shared"
45
_DELETED_TEXT = "deleted"
46

    
47
_DEFAULT_PRIORITY = 0
48

    
49

    
50
def ssynchronized(mylock, shared=0):
51
  """Shared Synchronization decorator.
52

53
  Calls the function holding the given lock, either in exclusive or shared
54
  mode. It requires the passed lock to be a SharedLock (or support its
55
  semantics).
56

57
  @type mylock: lockable object or string
58
  @param mylock: lock to acquire or class member name of the lock to acquire
59

60
  """
61
  def wrap(fn):
62
    def sync_function(*args, **kwargs):
63
      if isinstance(mylock, basestring):
64
        assert args, "cannot ssynchronize on non-class method: self not found"
65
        # args[0] is "self"
66
        lock = getattr(args[0], mylock)
67
      else:
68
        lock = mylock
69
      lock.acquire(shared=shared)
70
      try:
71
        return fn(*args, **kwargs)
72
      finally:
73
        lock.release()
74
    return sync_function
75
  return wrap
76

    
77

    
78
class _SingleNotifyPipeConditionWaiter(object):
79
  """Helper class for SingleNotifyPipeCondition
80

81
  """
82
  __slots__ = [
83
    "_fd",
84
    "_poller",
85
    ]
86

    
87
  def __init__(self, poller, fd):
88
    """Constructor for _SingleNotifyPipeConditionWaiter
89

90
    @type poller: select.poll
91
    @param poller: Poller object
92
    @type fd: int
93
    @param fd: File descriptor to wait for
94

95
    """
96
    object.__init__(self)
97
    self._poller = poller
98
    self._fd = fd
99

    
100
  def __call__(self, timeout):
101
    """Wait for something to happen on the pipe.
102

103
    @type timeout: float or None
104
    @param timeout: Timeout for waiting (can be None)
105

106
    """
107
    running_timeout = utils.RunningTimeout(timeout, True)
108

    
109
    while True:
110
      remaining_time = running_timeout.Remaining()
111

    
112
      if remaining_time is not None:
113
        if remaining_time < 0.0:
114
          break
115

    
116
        # Our calculation uses seconds, poll() wants milliseconds
117
        remaining_time *= 1000
118

    
119
      try:
120
        result = self._poller.poll(remaining_time)
121
      except EnvironmentError, err:
122
        if err.errno != errno.EINTR:
123
          raise
124
        result = None
125

    
126
      # Check whether we were notified
127
      if result and result[0][0] == self._fd:
128
        break
129

    
130

    
131
class _BaseCondition(object):
132
  """Base class containing common code for conditions.
133

134
  Some of this code is taken from python's threading module.
135

136
  """
137
  __slots__ = [
138
    "_lock",
139
    "acquire",
140
    "release",
141
    "_is_owned",
142
    "_acquire_restore",
143
    "_release_save",
144
    ]
145

    
146
  def __init__(self, lock):
147
    """Constructor for _BaseCondition.
148

149
    @type lock: threading.Lock
150
    @param lock: condition base lock
151

152
    """
153
    object.__init__(self)
154

    
155
    try:
156
      self._release_save = lock._release_save
157
    except AttributeError:
158
      self._release_save = self._base_release_save
159
    try:
160
      self._acquire_restore = lock._acquire_restore
161
    except AttributeError:
162
      self._acquire_restore = self._base_acquire_restore
163
    try:
164
      self._is_owned = lock.is_owned
165
    except AttributeError:
166
      self._is_owned = self._base_is_owned
167

    
168
    self._lock = lock
169

    
170
    # Export the lock's acquire() and release() methods
171
    self.acquire = lock.acquire
172
    self.release = lock.release
173

    
174
  def _base_is_owned(self):
175
    """Check whether lock is owned by current thread.
176

177
    """
178
    if self._lock.acquire(0):
179
      self._lock.release()
180
      return False
181
    return True
182

    
183
  def _base_release_save(self):
184
    self._lock.release()
185

    
186
  def _base_acquire_restore(self, _):
187
    self._lock.acquire()
188

    
189
  def _check_owned(self):
190
    """Raise an exception if the current thread doesn't own the lock.
191

192
    """
193
    if not self._is_owned():
194
      raise RuntimeError("cannot work with un-aquired lock")
195

    
196

    
197
class SingleNotifyPipeCondition(_BaseCondition):
198
  """Condition which can only be notified once.
199

200
  This condition class uses pipes and poll, internally, to be able to wait for
201
  notification with a timeout, without resorting to polling. It is almost
202
  compatible with Python's threading.Condition, with the following differences:
203
    - notifyAll can only be called once, and no wait can happen after that
204
    - notify is not supported, only notifyAll
205

206
  """
207

    
208
  __slots__ = [
209
    "_poller",
210
    "_read_fd",
211
    "_write_fd",
212
    "_nwaiters",
213
    "_notified",
214
    ]
215

    
216
  _waiter_class = _SingleNotifyPipeConditionWaiter
217

    
218
  def __init__(self, lock):
219
    """Constructor for SingleNotifyPipeCondition
220

221
    """
222
    _BaseCondition.__init__(self, lock)
223
    self._nwaiters = 0
224
    self._notified = False
225
    self._read_fd = None
226
    self._write_fd = None
227
    self._poller = None
228

    
229
  def _check_unnotified(self):
230
    """Throws an exception if already notified.
231

232
    """
233
    if self._notified:
234
      raise RuntimeError("cannot use already notified condition")
235

    
236
  def _Cleanup(self):
237
    """Cleanup open file descriptors, if any.
238

239
    """
240
    if self._read_fd is not None:
241
      os.close(self._read_fd)
242
      self._read_fd = None
243

    
244
    if self._write_fd is not None:
245
      os.close(self._write_fd)
246
      self._write_fd = None
247
    self._poller = None
248

    
249
  def wait(self, timeout):
250
    """Wait for a notification.
251

252
    @type timeout: float or None
253
    @param timeout: Waiting timeout (can be None)
254

255
    """
256
    self._check_owned()
257
    self._check_unnotified()
258

    
259
    self._nwaiters += 1
260
    try:
261
      if self._poller is None:
262
        (self._read_fd, self._write_fd) = os.pipe()
263
        self._poller = select.poll()
264
        self._poller.register(self._read_fd, select.POLLHUP)
265

    
266
      wait_fn = self._waiter_class(self._poller, self._read_fd)
267
      state = self._release_save()
268
      try:
269
        # Wait for notification
270
        wait_fn(timeout)
271
      finally:
272
        # Re-acquire lock
273
        self._acquire_restore(state)
274
    finally:
275
      self._nwaiters -= 1
276
      if self._nwaiters == 0:
277
        self._Cleanup()
278

    
279
  def notifyAll(self): # pylint: disable=C0103
280
    """Close the writing side of the pipe to notify all waiters.
281

282
    """
283
    self._check_owned()
284
    self._check_unnotified()
285
    self._notified = True
286
    if self._write_fd is not None:
287
      os.close(self._write_fd)
288
      self._write_fd = None
289

    
290

    
291
class PipeCondition(_BaseCondition):
292
  """Group-only non-polling condition with counters.
293

294
  This condition class uses pipes and poll, internally, to be able to wait for
295
  notification with a timeout, without resorting to polling. It is almost
296
  compatible with Python's threading.Condition, but only supports notifyAll and
297
  non-recursive locks. As an additional features it's able to report whether
298
  there are any waiting threads.
299

300
  """
301
  __slots__ = [
302
    "_waiters",
303
    "_single_condition",
304
    ]
305

    
306
  _single_condition_class = SingleNotifyPipeCondition
307

    
308
  def __init__(self, lock):
309
    """Initializes this class.
310

311
    """
312
    _BaseCondition.__init__(self, lock)
313
    self._waiters = set()
314
    self._single_condition = self._single_condition_class(self._lock)
315

    
316
  def wait(self, timeout):
317
    """Wait for a notification.
318

319
    @type timeout: float or None
320
    @param timeout: Waiting timeout (can be None)
321

322
    """
323
    self._check_owned()
324

    
325
    # Keep local reference to the pipe. It could be replaced by another thread
326
    # notifying while we're waiting.
327
    cond = self._single_condition
328

    
329
    self._waiters.add(threading.currentThread())
330
    try:
331
      cond.wait(timeout)
332
    finally:
333
      self._check_owned()
334
      self._waiters.remove(threading.currentThread())
335

    
336
  def notifyAll(self): # pylint: disable=C0103
337
    """Notify all currently waiting threads.
338

339
    """
340
    self._check_owned()
341
    self._single_condition.notifyAll()
342
    self._single_condition = self._single_condition_class(self._lock)
343

    
344
  def get_waiting(self):
345
    """Returns a list of all waiting threads.
346

347
    """
348
    self._check_owned()
349

    
350
    return self._waiters
351

    
352
  def has_waiting(self):
353
    """Returns whether there are active waiters.
354

355
    """
356
    self._check_owned()
357

    
358
    return bool(self._waiters)
359

    
360

    
361
class _PipeConditionWithMode(PipeCondition):
362
  __slots__ = [
363
    "shared",
364
    ]
365

    
366
  def __init__(self, lock, shared):
367
    """Initializes this class.
368

369
    """
370
    self.shared = shared
371
    PipeCondition.__init__(self, lock)
372

    
373

    
374
class SharedLock(object):
375
  """Implements a shared lock.
376

377
  Multiple threads can acquire the lock in a shared way by calling
378
  C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
379
  threads can call C{acquire(shared=0)}.
380

381
  Notes on data structures: C{__pending} contains a priority queue (heapq) of
382
  all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
383
  ...]}. Each per-priority queue contains a normal in-order list of conditions
384
  to be notified when the lock can be acquired. Shared locks are grouped
385
  together by priority and the condition for them is stored in
386
  C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
387
  references for the per-priority queues indexed by priority for faster access.
388

389
  @type name: string
390
  @ivar name: the name of the lock
391

392
  """
393
  __slots__ = [
394
    "__weakref__",
395
    "__deleted",
396
    "__exc",
397
    "__lock",
398
    "__pending",
399
    "__pending_by_prio",
400
    "__pending_shared",
401
    "__shr",
402
    "name",
403
    ]
404

    
405
  __condition_class = _PipeConditionWithMode
406

    
407
  def __init__(self, name, monitor=None):
408
    """Construct a new SharedLock.
409

410
    @param name: the name of the lock
411
    @type monitor: L{LockMonitor}
412
    @param monitor: Lock monitor with which to register
413

414
    """
415
    object.__init__(self)
416

    
417
    self.name = name
418

    
419
    # Internal lock
420
    self.__lock = threading.Lock()
421

    
422
    # Queue containing waiting acquires
423
    self.__pending = []
424
    self.__pending_by_prio = {}
425
    self.__pending_shared = {}
426

    
427
    # Current lock holders
428
    self.__shr = set()
429
    self.__exc = None
430

    
431
    # is this lock in the deleted state?
432
    self.__deleted = False
433

    
434
    # Register with lock monitor
435
    if monitor:
436
      logging.debug("Adding lock %s to monitor", name)
437
      monitor.RegisterLock(self)
438

    
439
  def GetLockInfo(self, requested):
440
    """Retrieves information for querying locks.
441

442
    @type requested: set
443
    @param requested: Requested information, see C{query.LQ_*}
444

445
    """
446
    self.__lock.acquire()
447
    try:
448
      # Note: to avoid unintentional race conditions, no references to
449
      # modifiable objects should be returned unless they were created in this
450
      # function.
451
      mode = None
452
      owner_names = None
453

    
454
      if query.LQ_MODE in requested:
455
        if self.__deleted:
456
          mode = _DELETED_TEXT
457
          assert not (self.__exc or self.__shr)
458
        elif self.__exc:
459
          mode = _EXCLUSIVE_TEXT
460
        elif self.__shr:
461
          mode = _SHARED_TEXT
462

    
463
      # Current owner(s) are wanted
464
      if query.LQ_OWNER in requested:
465
        if self.__exc:
466
          owner = [self.__exc]
467
        else:
468
          owner = self.__shr
469

    
470
        if owner:
471
          assert not self.__deleted
472
          owner_names = [i.getName() for i in owner]
473

    
474
      # Pending acquires are wanted
475
      if query.LQ_PENDING in requested:
476
        pending = []
477

    
478
        # Sorting instead of copying and using heaq functions for simplicity
479
        for (_, prioqueue) in sorted(self.__pending):
480
          for cond in prioqueue:
481
            if cond.shared:
482
              pendmode = _SHARED_TEXT
483
            else:
484
              pendmode = _EXCLUSIVE_TEXT
485

    
486
            # List of names will be sorted in L{query._GetLockPending}
487
            pending.append((pendmode, [i.getName()
488
                                       for i in cond.get_waiting()]))
489
      else:
490
        pending = None
491

    
492
      return [(self.name, mode, owner_names, pending)]
493
    finally:
494
      self.__lock.release()
495

    
496
  def __check_deleted(self):
497
    """Raises an exception if the lock has been deleted.
498

499
    """
500
    if self.__deleted:
501
      raise errors.LockError("Deleted lock %s" % self.name)
502

    
503
  def __is_sharer(self):
504
    """Is the current thread sharing the lock at this time?
505

506
    """
507
    return threading.currentThread() in self.__shr
508

    
509
  def __is_exclusive(self):
510
    """Is the current thread holding the lock exclusively at this time?
511

512
    """
513
    return threading.currentThread() == self.__exc
514

    
515
  def __is_owned(self, shared=-1):
516
    """Is the current thread somehow owning the lock at this time?
517

518
    This is a private version of the function, which presumes you're holding
519
    the internal lock.
520

521
    """
522
    if shared < 0:
523
      return self.__is_sharer() or self.__is_exclusive()
524
    elif shared:
525
      return self.__is_sharer()
526
    else:
527
      return self.__is_exclusive()
528

    
529
  def is_owned(self, shared=-1):
530
    """Is the current thread somehow owning the lock at this time?
531

532
    @param shared:
533
        - < 0: check for any type of ownership (default)
534
        - 0: check for exclusive ownership
535
        - > 0: check for shared ownership
536

537
    """
538
    self.__lock.acquire()
539
    try:
540
      return self.__is_owned(shared=shared)
541
    finally:
542
      self.__lock.release()
543

    
544
  #: Necessary to remain compatible with threading.Condition, which tries to
545
  #: retrieve a locks' "_is_owned" attribute
546
  _is_owned = is_owned
547

    
548
  def _count_pending(self):
549
    """Returns the number of pending acquires.
550

551
    @rtype: int
552

553
    """
554
    self.__lock.acquire()
555
    try:
556
      return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
557
    finally:
558
      self.__lock.release()
559

    
560
  def _check_empty(self):
561
    """Checks whether there are any pending acquires.
562

563
    @rtype: bool
564

565
    """
566
    self.__lock.acquire()
567
    try:
568
      # Order is important: __find_first_pending_queue modifies __pending
569
      (_, prioqueue) = self.__find_first_pending_queue()
570

    
571
      return not (prioqueue or
572
                  self.__pending or
573
                  self.__pending_by_prio or
574
                  self.__pending_shared)
575
    finally:
576
      self.__lock.release()
577

    
578
  def __do_acquire(self, shared):
579
    """Actually acquire the lock.
580

581
    """
582
    if shared:
583
      self.__shr.add(threading.currentThread())
584
    else:
585
      self.__exc = threading.currentThread()
586

    
587
  def __can_acquire(self, shared):
588
    """Determine whether lock can be acquired.
589

590
    """
591
    if shared:
592
      return self.__exc is None
593
    else:
594
      return len(self.__shr) == 0 and self.__exc is None
595

    
596
  def __find_first_pending_queue(self):
597
    """Tries to find the topmost queued entry with pending acquires.
598

599
    Removes empty entries while going through the list.
600

601
    """
602
    while self.__pending:
603
      (priority, prioqueue) = self.__pending[0]
604

    
605
      if prioqueue:
606
        return (priority, prioqueue)
607

    
608
      # Remove empty queue
609
      heapq.heappop(self.__pending)
610
      del self.__pending_by_prio[priority]
611
      assert priority not in self.__pending_shared
612

    
613
    return (None, None)
614

    
615
  def __is_on_top(self, cond):
616
    """Checks whether the passed condition is on top of the queue.
617

618
    The caller must make sure the queue isn't empty.
619

620
    """
621
    (_, prioqueue) = self.__find_first_pending_queue()
622

    
623
    return cond == prioqueue[0]
624

    
625
  def __acquire_unlocked(self, shared, timeout, priority):
626
    """Acquire a shared lock.
627

628
    @param shared: whether to acquire in shared mode; by default an
629
        exclusive lock will be acquired
630
    @param timeout: maximum waiting time before giving up
631
    @type priority: integer
632
    @param priority: Priority for acquiring lock
633

634
    """
635
    self.__check_deleted()
636

    
637
    # We cannot acquire the lock if we already have it
638
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
639
                                   " %s" % self.name)
640

    
641
    # Remove empty entries from queue
642
    self.__find_first_pending_queue()
643

    
644
    # Check whether someone else holds the lock or there are pending acquires.
645
    if not self.__pending and self.__can_acquire(shared):
646
      # Apparently not, can acquire lock directly.
647
      self.__do_acquire(shared)
648
      return True
649

    
650
    prioqueue = self.__pending_by_prio.get(priority, None)
651

    
652
    if shared:
653
      # Try to re-use condition for shared acquire
654
      wait_condition = self.__pending_shared.get(priority, None)
655
      assert (wait_condition is None or
656
              (wait_condition.shared and wait_condition in prioqueue))
657
    else:
658
      wait_condition = None
659

    
660
    if wait_condition is None:
661
      if prioqueue is None:
662
        assert priority not in self.__pending_by_prio
663

    
664
        prioqueue = []
665
        heapq.heappush(self.__pending, (priority, prioqueue))
666
        self.__pending_by_prio[priority] = prioqueue
667

    
668
      wait_condition = self.__condition_class(self.__lock, shared)
669
      prioqueue.append(wait_condition)
670

    
671
      if shared:
672
        # Keep reference for further shared acquires on same priority. This is
673
        # better than trying to find it in the list of pending acquires.
674
        assert priority not in self.__pending_shared
675
        self.__pending_shared[priority] = wait_condition
676

    
677
    try:
678
      # Wait until we become the topmost acquire in the queue or the timeout
679
      # expires.
680
      # TODO: Decrease timeout with spurious notifications
681
      while not (self.__is_on_top(wait_condition) and
682
                 self.__can_acquire(shared)):
683
        # Wait for notification
684
        wait_condition.wait(timeout)
685
        self.__check_deleted()
686

    
687
        # A lot of code assumes blocking acquires always succeed. Loop
688
        # internally for that case.
689
        if timeout is not None:
690
          break
691

    
692
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
693
        self.__do_acquire(shared)
694
        return True
695
    finally:
696
      # Remove condition from queue if there are no more waiters
697
      if not wait_condition.has_waiting():
698
        prioqueue.remove(wait_condition)
699
        if wait_condition.shared:
700
          # Remove from list of shared acquires if it wasn't while releasing
701
          # (e.g. on lock deletion)
702
          self.__pending_shared.pop(priority, None)
703

    
704
    return False
705

    
706
  def acquire(self, shared=0, timeout=None, priority=None,
707
              test_notify=None):
708
    """Acquire a shared lock.
709

710
    @type shared: integer (0/1) used as a boolean
711
    @param shared: whether to acquire in shared mode; by default an
712
        exclusive lock will be acquired
713
    @type timeout: float
714
    @param timeout: maximum waiting time before giving up
715
    @type priority: integer
716
    @param priority: Priority for acquiring lock
717
    @type test_notify: callable or None
718
    @param test_notify: Special callback function for unittesting
719

720
    """
721
    if priority is None:
722
      priority = _DEFAULT_PRIORITY
723

    
724
    self.__lock.acquire()
725
    try:
726
      # We already got the lock, notify now
727
      if __debug__ and callable(test_notify):
728
        test_notify()
729

    
730
      return self.__acquire_unlocked(shared, timeout, priority)
731
    finally:
732
      self.__lock.release()
733

    
734
  def downgrade(self):
735
    """Changes the lock mode from exclusive to shared.
736

737
    Pending acquires in shared mode on the same priority will go ahead.
738

739
    """
740
    self.__lock.acquire()
741
    try:
742
      assert self.__is_owned(), "Lock must be owned"
743

    
744
      if self.__is_exclusive():
745
        # Do nothing if the lock is already acquired in shared mode
746
        self.__exc = None
747
        self.__do_acquire(1)
748

    
749
        # Important: pending shared acquires should only jump ahead if there
750
        # was a transition from exclusive to shared, otherwise an owner of a
751
        # shared lock can keep calling this function to push incoming shared
752
        # acquires
753
        (priority, prioqueue) = self.__find_first_pending_queue()
754
        if prioqueue:
755
          # Is there a pending shared acquire on this priority?
756
          cond = self.__pending_shared.pop(priority, None)
757
          if cond:
758
            assert cond.shared
759
            assert cond in prioqueue
760

    
761
            # Ensure shared acquire is on top of queue
762
            if len(prioqueue) > 1:
763
              prioqueue.remove(cond)
764
              prioqueue.insert(0, cond)
765

    
766
            # Notify
767
            cond.notifyAll()
768

    
769
      assert not self.__is_exclusive()
770
      assert self.__is_sharer()
771

    
772
      return True
773
    finally:
774
      self.__lock.release()
775

    
776
  def release(self):
777
    """Release a Shared Lock.
778

779
    You must have acquired the lock, either in shared or in exclusive mode,
780
    before calling this function.
781

782
    """
783
    self.__lock.acquire()
784
    try:
785
      assert self.__is_exclusive() or self.__is_sharer(), \
786
        "Cannot release non-owned lock"
787

    
788
      # Autodetect release type
789
      if self.__is_exclusive():
790
        self.__exc = None
791
      else:
792
        self.__shr.remove(threading.currentThread())
793

    
794
      # Notify topmost condition in queue
795
      (priority, prioqueue) = self.__find_first_pending_queue()
796
      if prioqueue:
797
        cond = prioqueue[0]
798
        cond.notifyAll()
799
        if cond.shared:
800
          # Prevent further shared acquires from sneaking in while waiters are
801
          # notified
802
          self.__pending_shared.pop(priority, None)
803

    
804
    finally:
805
      self.__lock.release()
806

    
807
  def delete(self, timeout=None, priority=None):
808
    """Delete a Shared Lock.
809

810
    This operation will declare the lock for removal. First the lock will be
811
    acquired in exclusive mode if you don't already own it, then the lock
812
    will be put in a state where any future and pending acquire() fail.
813

814
    @type timeout: float
815
    @param timeout: maximum waiting time before giving up
816
    @type priority: integer
817
    @param priority: Priority for acquiring lock
818

819
    """
820
    if priority is None:
821
      priority = _DEFAULT_PRIORITY
822

    
823
    self.__lock.acquire()
824
    try:
825
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
826

    
827
      self.__check_deleted()
828

    
829
      # The caller is allowed to hold the lock exclusively already.
830
      acquired = self.__is_exclusive()
831

    
832
      if not acquired:
833
        acquired = self.__acquire_unlocked(0, timeout, priority)
834

    
835
        assert self.__is_exclusive() and not self.__is_sharer(), \
836
          "Lock wasn't acquired in exclusive mode"
837

    
838
      if acquired:
839
        self.__deleted = True
840
        self.__exc = None
841

    
842
        assert not (self.__exc or self.__shr), "Found owner during deletion"
843

    
844
        # Notify all acquires. They'll throw an error.
845
        for (_, prioqueue) in self.__pending:
846
          for cond in prioqueue:
847
            cond.notifyAll()
848

    
849
        assert self.__deleted
850

    
851
      return acquired
852
    finally:
853
      self.__lock.release()
854

    
855
  def _release_save(self):
856
    shared = self.__is_sharer()
857
    self.release()
858
    return shared
859

    
860
  def _acquire_restore(self, shared):
861
    self.acquire(shared=shared)
862

    
863

    
864
# Whenever we want to acquire a full LockSet we pass None as the value
865
# to acquire.  Hide this behind this nicely named constant.
866
ALL_SET = None
867

    
868

    
869
class _AcquireTimeout(Exception):
870
  """Internal exception to abort an acquire on a timeout.
871

872
  """
873

    
874

    
875
class LockSet:
876
  """Implements a set of locks.
877

878
  This abstraction implements a set of shared locks for the same resource type,
879
  distinguished by name. The user can lock a subset of the resources and the
880
  LockSet will take care of acquiring the locks always in the same order, thus
881
  preventing deadlock.
882

883
  All the locks needed in the same set must be acquired together, though.
884

885
  @type name: string
886
  @ivar name: the name of the lockset
887

888
  """
889
  def __init__(self, members, name, monitor=None):
890
    """Constructs a new LockSet.
891

892
    @type members: list of strings
893
    @param members: initial members of the set
894
    @type monitor: L{LockMonitor}
895
    @param monitor: Lock monitor with which to register member locks
896

897
    """
898
    assert members is not None, "members parameter is not a list"
899
    self.name = name
900

    
901
    # Lock monitor
902
    self.__monitor = monitor
903

    
904
    # Used internally to guarantee coherency
905
    self.__lock = SharedLock(self._GetLockName("[lockset]"), monitor=monitor)
906

    
907
    # The lockdict indexes the relationship name -> lock
908
    # The order-of-locking is implied by the alphabetical order of names
909
    self.__lockdict = {}
910

    
911
    for mname in members:
912
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
913
                                          monitor=monitor)
914

    
915
    # The owner dict contains the set of locks each thread owns. For
916
    # performance each thread can access its own key without a global lock on
917
    # this structure. It is paramount though that *no* other type of access is
918
    # done to this structure (eg. no looping over its keys). *_owner helper
919
    # function are defined to guarantee access is correct, but in general never
920
    # do anything different than __owners[threading.currentThread()], or there
921
    # will be trouble.
922
    self.__owners = {}
923

    
924
  def _GetLockName(self, mname):
925
    """Returns the name for a member lock.
926

927
    """
928
    return "%s/%s" % (self.name, mname)
929

    
930
  def _get_lock(self):
931
    """Returns the lockset-internal lock.
932

933
    """
934
    return self.__lock
935

    
936
  def _get_lockdict(self):
937
    """Returns the lockset-internal lock dictionary.
938

939
    Accessing this structure is only safe in single-thread usage or when the
940
    lockset-internal lock is held.
941

942
    """
943
    return self.__lockdict
944

    
945
  def is_owned(self):
946
    """Is the current thread a current level owner?
947

948
    @note: Use L{check_owned} to check if a specific lock is held
949

950
    """
951
    return threading.currentThread() in self.__owners
952

    
953
  def check_owned(self, names, shared=-1):
954
    """Check if locks are owned in a specific mode.
955

956
    @type names: sequence or string
957
    @param names: Lock names (or a single lock name)
958
    @param shared: See L{SharedLock.is_owned}
959
    @rtype: bool
960
    @note: Use L{is_owned} to check if the current thread holds I{any} lock and
961
      L{list_owned} to get the names of all owned locks
962

963
    """
964
    if isinstance(names, basestring):
965
      names = [names]
966

    
967
    # Avoid check if no locks are owned anyway
968
    if names and self.is_owned():
969
      candidates = []
970

    
971
      # Gather references to all locks (in case they're deleted in the meantime)
972
      for lname in names:
973
        try:
974
          lock = self.__lockdict[lname]
975
        except KeyError:
976
          raise errors.LockError("Non-existing lock '%s' in set '%s' (it may"
977
                                 " have been removed)" % (lname, self.name))
978
        else:
979
          candidates.append(lock)
980

    
981
      return compat.all(lock.is_owned(shared=shared) for lock in candidates)
982
    else:
983
      return False
984

    
985
  def _add_owned(self, name=None):
986
    """Note the current thread owns the given lock"""
987
    if name is None:
988
      if not self.is_owned():
989
        self.__owners[threading.currentThread()] = set()
990
    else:
991
      if self.is_owned():
992
        self.__owners[threading.currentThread()].add(name)
993
      else:
994
        self.__owners[threading.currentThread()] = set([name])
995

    
996
  def _del_owned(self, name=None):
997
    """Note the current thread owns the given lock"""
998

    
999
    assert not (name is None and self.__lock.is_owned()), \
1000
           "Cannot hold internal lock when deleting owner status"
1001

    
1002
    if name is not None:
1003
      self.__owners[threading.currentThread()].remove(name)
1004

    
1005
    # Only remove the key if we don't hold the set-lock as well
1006
    if (not self.__lock.is_owned() and
1007
        not self.__owners[threading.currentThread()]):
1008
      del self.__owners[threading.currentThread()]
1009

    
1010
  def list_owned(self):
1011
    """Get the set of resource names owned by the current thread"""
1012
    if self.is_owned():
1013
      return self.__owners[threading.currentThread()].copy()
1014
    else:
1015
      return set()
1016

    
1017
  def _release_and_delete_owned(self):
1018
    """Release and delete all resources owned by the current thread"""
1019
    for lname in self.list_owned():
1020
      lock = self.__lockdict[lname]
1021
      if lock.is_owned():
1022
        lock.release()
1023
      self._del_owned(name=lname)
1024

    
1025
  def __names(self):
1026
    """Return the current set of names.
1027

1028
    Only call this function while holding __lock and don't iterate on the
1029
    result after releasing the lock.
1030

1031
    """
1032
    return self.__lockdict.keys()
1033

    
1034
  def _names(self):
1035
    """Return a copy of the current set of elements.
1036

1037
    Used only for debugging purposes.
1038

1039
    """
1040
    # If we don't already own the set-level lock acquired
1041
    # we'll get it and note we need to release it later.
1042
    release_lock = False
1043
    if not self.__lock.is_owned():
1044
      release_lock = True
1045
      self.__lock.acquire(shared=1)
1046
    try:
1047
      result = self.__names()
1048
    finally:
1049
      if release_lock:
1050
        self.__lock.release()
1051
    return set(result)
1052

    
1053
  def acquire(self, names, timeout=None, shared=0, priority=None,
1054
              test_notify=None):
1055
    """Acquire a set of resource locks.
1056

1057
    @type names: list of strings (or string)
1058
    @param names: the names of the locks which shall be acquired
1059
        (special lock names, or instance/node names)
1060
    @type shared: integer (0/1) used as a boolean
1061
    @param shared: whether to acquire in shared mode; by default an
1062
        exclusive lock will be acquired
1063
    @type timeout: float or None
1064
    @param timeout: Maximum time to acquire all locks
1065
    @type priority: integer
1066
    @param priority: Priority for acquiring locks
1067
    @type test_notify: callable or None
1068
    @param test_notify: Special callback function for unittesting
1069

1070
    @return: Set of all locks successfully acquired or None in case of timeout
1071

1072
    @raise errors.LockError: when any lock we try to acquire has
1073
        been deleted before we succeed. In this case none of the
1074
        locks requested will be acquired.
1075

1076
    """
1077
    assert timeout is None or timeout >= 0.0
1078

    
1079
    # Check we don't already own locks at this level
1080
    assert not self.is_owned(), ("Cannot acquire locks in the same set twice"
1081
                                 " (lockset %s)" % self.name)
1082

    
1083
    if priority is None:
1084
      priority = _DEFAULT_PRIORITY
1085

    
1086
    # We need to keep track of how long we spent waiting for a lock. The
1087
    # timeout passed to this function is over all lock acquires.
1088
    running_timeout = utils.RunningTimeout(timeout, False)
1089

    
1090
    try:
1091
      if names is not None:
1092
        # Support passing in a single resource to acquire rather than many
1093
        if isinstance(names, basestring):
1094
          names = [names]
1095

    
1096
        return self.__acquire_inner(names, False, shared, priority,
1097
                                    running_timeout.Remaining, test_notify)
1098

    
1099
      else:
1100
        # If no names are given acquire the whole set by not letting new names
1101
        # being added before we release, and getting the current list of names.
1102
        # Some of them may then be deleted later, but we'll cope with this.
1103
        #
1104
        # We'd like to acquire this lock in a shared way, as it's nice if
1105
        # everybody else can use the instances at the same time. If we are
1106
        # acquiring them exclusively though they won't be able to do this
1107
        # anyway, though, so we'll get the list lock exclusively as well in
1108
        # order to be able to do add() on the set while owning it.
1109
        if not self.__lock.acquire(shared=shared, priority=priority,
1110
                                   timeout=running_timeout.Remaining()):
1111
          raise _AcquireTimeout()
1112
        try:
1113
          # note we own the set-lock
1114
          self._add_owned()
1115

    
1116
          return self.__acquire_inner(self.__names(), True, shared, priority,
1117
                                      running_timeout.Remaining, test_notify)
1118
        except:
1119
          # We shouldn't have problems adding the lock to the owners list, but
1120
          # if we did we'll try to release this lock and re-raise exception.
1121
          # Of course something is going to be really wrong, after this.
1122
          self.__lock.release()
1123
          self._del_owned()
1124
          raise
1125

    
1126
    except _AcquireTimeout:
1127
      return None
1128

    
1129
  def __acquire_inner(self, names, want_all, shared, priority,
1130
                      timeout_fn, test_notify):
1131
    """Inner logic for acquiring a number of locks.
1132

1133
    @param names: Names of the locks to be acquired
1134
    @param want_all: Whether all locks in the set should be acquired
1135
    @param shared: Whether to acquire in shared mode
1136
    @param timeout_fn: Function returning remaining timeout
1137
    @param priority: Priority for acquiring locks
1138
    @param test_notify: Special callback function for unittesting
1139

1140
    """
1141
    acquire_list = []
1142

    
1143
    # First we look the locks up on __lockdict. We have no way of being sure
1144
    # they will still be there after, but this makes it a lot faster should
1145
    # just one of them be the already wrong. Using a sorted sequence to prevent
1146
    # deadlocks.
1147
    for lname in sorted(utils.UniqueSequence(names)):
1148
      try:
1149
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
1150
      except KeyError:
1151
        if want_all:
1152
          # We are acquiring all the set, it doesn't matter if this particular
1153
          # element is not there anymore.
1154
          continue
1155

    
1156
        raise errors.LockError("Non-existing lock %s in set %s (it may have"
1157
                               " been removed)" % (lname, self.name))
1158

    
1159
      acquire_list.append((lname, lock))
1160

    
1161
    # This will hold the locknames we effectively acquired.
1162
    acquired = set()
1163

    
1164
    try:
1165
      # Now acquire_list contains a sorted list of resources and locks we
1166
      # want.  In order to get them we loop on this (private) list and
1167
      # acquire() them.  We gave no real guarantee they will still exist till
1168
      # this is done but .acquire() itself is safe and will alert us if the
1169
      # lock gets deleted.
1170
      for (lname, lock) in acquire_list:
1171
        if __debug__ and callable(test_notify):
1172
          test_notify_fn = lambda: test_notify(lname)
1173
        else:
1174
          test_notify_fn = None
1175

    
1176
        timeout = timeout_fn()
1177

    
1178
        try:
1179
          # raises LockError if the lock was deleted
1180
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1181
                                     priority=priority,
1182
                                     test_notify=test_notify_fn)
1183
        except errors.LockError:
1184
          if want_all:
1185
            # We are acquiring all the set, it doesn't matter if this
1186
            # particular element is not there anymore.
1187
            continue
1188

    
1189
          raise errors.LockError("Non-existing lock %s in set %s (it may"
1190
                                 " have been removed)" % (lname, self.name))
1191

    
1192
        if not acq_success:
1193
          # Couldn't get lock or timeout occurred
1194
          if timeout is None:
1195
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1196
            # blocking.
1197
            raise errors.LockError("Failed to get lock %s (set %s)" %
1198
                                   (lname, self.name))
1199

    
1200
          raise _AcquireTimeout()
1201

    
1202
        try:
1203
          # now the lock cannot be deleted, we have it!
1204
          self._add_owned(name=lname)
1205
          acquired.add(lname)
1206

    
1207
        except:
1208
          # We shouldn't have problems adding the lock to the owners list, but
1209
          # if we did we'll try to release this lock and re-raise exception.
1210
          # Of course something is going to be really wrong after this.
1211
          if lock.is_owned():
1212
            lock.release()
1213
          raise
1214

    
1215
    except:
1216
      # Release all owned locks
1217
      self._release_and_delete_owned()
1218
      raise
1219

    
1220
    return acquired
1221

    
1222
  def downgrade(self, names=None):
1223
    """Downgrade a set of resource locks from exclusive to shared mode.
1224

1225
    The locks must have been acquired in exclusive mode.
1226

1227
    """
1228
    assert self.is_owned(), ("downgrade on lockset %s while not owning any"
1229
                             " lock" % self.name)
1230

    
1231
    # Support passing in a single resource to downgrade rather than many
1232
    if isinstance(names, basestring):
1233
      names = [names]
1234

    
1235
    owned = self.list_owned()
1236

    
1237
    if names is None:
1238
      names = owned
1239
    else:
1240
      names = set(names)
1241
      assert owned.issuperset(names), \
1242
        ("downgrade() on unheld resources %s (set %s)" %
1243
         (names.difference(owned), self.name))
1244

    
1245
    for lockname in names:
1246
      self.__lockdict[lockname].downgrade()
1247

    
1248
    # Do we own the lockset in exclusive mode?
1249
    if self.__lock.is_owned(shared=0):
1250
      # Have all locks been downgraded?
1251
      if not compat.any(lock.is_owned(shared=0)
1252
                        for lock in self.__lockdict.values()):
1253
        self.__lock.downgrade()
1254
        assert self.__lock.is_owned(shared=1)
1255

    
1256
    return True
1257

    
1258
  def release(self, names=None):
1259
    """Release a set of resource locks, at the same level.
1260

1261
    You must have acquired the locks, either in shared or in exclusive mode,
1262
    before releasing them.
1263

1264
    @type names: list of strings, or None
1265
    @param names: the names of the locks which shall be released
1266
        (defaults to all the locks acquired at that level).
1267

1268
    """
1269
    assert self.is_owned(), ("release() on lock set %s while not owner" %
1270
                             self.name)
1271

    
1272
    # Support passing in a single resource to release rather than many
1273
    if isinstance(names, basestring):
1274
      names = [names]
1275

    
1276
    if names is None:
1277
      names = self.list_owned()
1278
    else:
1279
      names = set(names)
1280
      assert self.list_owned().issuperset(names), (
1281
               "release() on unheld resources %s (set %s)" %
1282
               (names.difference(self.list_owned()), self.name))
1283

    
1284
    # First of all let's release the "all elements" lock, if set.
1285
    # After this 'add' can work again
1286
    if self.__lock.is_owned():
1287
      self.__lock.release()
1288
      self._del_owned()
1289

    
1290
    for lockname in names:
1291
      # If we are sure the lock doesn't leave __lockdict without being
1292
      # exclusively held we can do this...
1293
      self.__lockdict[lockname].release()
1294
      self._del_owned(name=lockname)
1295

    
1296
  def add(self, names, acquired=0, shared=0):
1297
    """Add a new set of elements to the set
1298

1299
    @type names: list of strings
1300
    @param names: names of the new elements to add
1301
    @type acquired: integer (0/1) used as a boolean
1302
    @param acquired: pre-acquire the new resource?
1303
    @type shared: integer (0/1) used as a boolean
1304
    @param shared: is the pre-acquisition shared?
1305

1306
    """
1307
    # Check we don't already own locks at this level
1308
    assert not self.is_owned() or self.__lock.is_owned(shared=0), \
1309
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1310
       self.name)
1311

    
1312
    # Support passing in a single resource to add rather than many
1313
    if isinstance(names, basestring):
1314
      names = [names]
1315

    
1316
    # If we don't already own the set-level lock acquired in an exclusive way
1317
    # we'll get it and note we need to release it later.
1318
    release_lock = False
1319
    if not self.__lock.is_owned():
1320
      release_lock = True
1321
      self.__lock.acquire()
1322

    
1323
    try:
1324
      invalid_names = set(self.__names()).intersection(names)
1325
      if invalid_names:
1326
        # This must be an explicit raise, not an assert, because assert is
1327
        # turned off when using optimization, and this can happen because of
1328
        # concurrency even if the user doesn't want it.
1329
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1330
                               (invalid_names, self.name))
1331

    
1332
      for lockname in names:
1333
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1334

    
1335
        if acquired:
1336
          # No need for priority or timeout here as this lock has just been
1337
          # created
1338
          lock.acquire(shared=shared)
1339
          # now the lock cannot be deleted, we have it!
1340
          try:
1341
            self._add_owned(name=lockname)
1342
          except:
1343
            # We shouldn't have problems adding the lock to the owners list,
1344
            # but if we did we'll try to release this lock and re-raise
1345
            # exception.  Of course something is going to be really wrong,
1346
            # after this.  On the other hand the lock hasn't been added to the
1347
            # __lockdict yet so no other threads should be pending on it. This
1348
            # release is just a safety measure.
1349
            lock.release()
1350
            raise
1351

    
1352
        self.__lockdict[lockname] = lock
1353

    
1354
    finally:
1355
      # Only release __lock if we were not holding it previously.
1356
      if release_lock:
1357
        self.__lock.release()
1358

    
1359
    return True
1360

    
1361
  def remove(self, names):
1362
    """Remove elements from the lock set.
1363

1364
    You can either not hold anything in the lockset or already hold a superset
1365
    of the elements you want to delete, exclusively.
1366

1367
    @type names: list of strings
1368
    @param names: names of the resource to remove.
1369

1370
    @return: a list of locks which we removed; the list is always
1371
        equal to the names list if we were holding all the locks
1372
        exclusively
1373

1374
    """
1375
    # Support passing in a single resource to remove rather than many
1376
    if isinstance(names, basestring):
1377
      names = [names]
1378

    
1379
    # If we own any subset of this lock it must be a superset of what we want
1380
    # to delete. The ownership must also be exclusive, but that will be checked
1381
    # by the lock itself.
1382
    assert not self.is_owned() or self.list_owned().issuperset(names), (
1383
      "remove() on acquired lockset %s while not owning all elements" %
1384
      self.name)
1385

    
1386
    removed = []
1387

    
1388
    for lname in names:
1389
      # Calling delete() acquires the lock exclusively if we don't already own
1390
      # it, and causes all pending and subsequent lock acquires to fail. It's
1391
      # fine to call it out of order because delete() also implies release(),
1392
      # and the assertion above guarantees that if we either already hold
1393
      # everything we want to delete, or we hold none.
1394
      try:
1395
        self.__lockdict[lname].delete()
1396
        removed.append(lname)
1397
      except (KeyError, errors.LockError):
1398
        # This cannot happen if we were already holding it, verify:
1399
        assert not self.is_owned(), ("remove failed while holding lockset %s" %
1400
                                     self.name)
1401
      else:
1402
        # If no LockError was raised we are the ones who deleted the lock.
1403
        # This means we can safely remove it from lockdict, as any further or
1404
        # pending delete() or acquire() will fail (and nobody can have the lock
1405
        # since before our call to delete()).
1406
        #
1407
        # This is done in an else clause because if the exception was thrown
1408
        # it's the job of the one who actually deleted it.
1409
        del self.__lockdict[lname]
1410
        # And let's remove it from our private list if we owned it.
1411
        if self.is_owned():
1412
          self._del_owned(name=lname)
1413

    
1414
    return removed
1415

    
1416

    
1417
# Locking levels, must be acquired in increasing order.
1418
# Current rules are:
1419
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1420
#   acquired before performing any operation, either in shared or in exclusive
1421
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1422
#   avoided.
1423
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1424
#   If you need more than one node, or more than one instance, acquire them at
1425
#   the same time.
1426
LEVEL_CLUSTER = 0
1427
LEVEL_INSTANCE = 1
1428
LEVEL_NODEGROUP = 2
1429
LEVEL_NODE = 3
1430
LEVEL_NODE_RES = 4
1431

    
1432
LEVELS = [
1433
  LEVEL_CLUSTER,
1434
  LEVEL_INSTANCE,
1435
  LEVEL_NODEGROUP,
1436
  LEVEL_NODE,
1437
  LEVEL_NODE_RES,
1438
  ]
1439

    
1440
# Lock levels which are modifiable
1441
LEVELS_MOD = frozenset([
1442
  LEVEL_NODE_RES,
1443
  LEVEL_NODE,
1444
  LEVEL_NODEGROUP,
1445
  LEVEL_INSTANCE,
1446
  ])
1447

    
1448
#: Lock level names (make sure to use singular form)
1449
LEVEL_NAMES = {
1450
  LEVEL_CLUSTER: "cluster",
1451
  LEVEL_INSTANCE: "instance",
1452
  LEVEL_NODEGROUP: "nodegroup",
1453
  LEVEL_NODE: "node",
1454
  LEVEL_NODE_RES: "node-res",
1455
  }
1456

    
1457
# Constant for the big ganeti lock
1458
BGL = 'BGL'
1459

    
1460

    
1461
class GanetiLockManager:
1462
  """The Ganeti Locking Library
1463

1464
  The purpose of this small library is to manage locking for ganeti clusters
1465
  in a central place, while at the same time doing dynamic checks against
1466
  possible deadlocks. It will also make it easier to transition to a different
1467
  lock type should we migrate away from python threads.
1468

1469
  """
1470
  _instance = None
1471

    
1472
  def __init__(self, nodes, nodegroups, instances):
1473
    """Constructs a new GanetiLockManager object.
1474

1475
    There should be only a GanetiLockManager object at any time, so this
1476
    function raises an error if this is not the case.
1477

1478
    @param nodes: list of node names
1479
    @param nodegroups: list of nodegroup uuids
1480
    @param instances: list of instance names
1481

1482
    """
1483
    assert self.__class__._instance is None, \
1484
           "double GanetiLockManager instance"
1485

    
1486
    self.__class__._instance = self
1487

    
1488
    self._monitor = LockMonitor()
1489

    
1490
    # The keyring contains all the locks, at their level and in the correct
1491
    # locking order.
1492
    self.__keyring = {
1493
      LEVEL_CLUSTER: LockSet([BGL], "cluster", monitor=self._monitor),
1494
      LEVEL_NODE: LockSet(nodes, "node", monitor=self._monitor),
1495
      LEVEL_NODE_RES: LockSet(nodes, "node-res", monitor=self._monitor),
1496
      LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroup", monitor=self._monitor),
1497
      LEVEL_INSTANCE: LockSet(instances, "instance",
1498
                              monitor=self._monitor),
1499
      }
1500

    
1501
    assert compat.all(ls.name == LEVEL_NAMES[level]
1502
                      for (level, ls) in self.__keyring.items())
1503

    
1504
  def AddToLockMonitor(self, provider):
1505
    """Registers a new lock with the monitor.
1506

1507
    See L{LockMonitor.RegisterLock}.
1508

1509
    """
1510
    return self._monitor.RegisterLock(provider)
1511

    
1512
  def QueryLocks(self, fields):
1513
    """Queries information from all locks.
1514

1515
    See L{LockMonitor.QueryLocks}.
1516

1517
    """
1518
    return self._monitor.QueryLocks(fields)
1519

    
1520
  def OldStyleQueryLocks(self, fields):
1521
    """Queries information from all locks, returning old-style data.
1522

1523
    See L{LockMonitor.OldStyleQueryLocks}.
1524

1525
    """
1526
    return self._monitor.OldStyleQueryLocks(fields)
1527

    
1528
  def _names(self, level):
1529
    """List the lock names at the given level.
1530

1531
    This can be used for debugging/testing purposes.
1532

1533
    @param level: the level whose list of locks to get
1534

1535
    """
1536
    assert level in LEVELS, "Invalid locking level %s" % level
1537
    return self.__keyring[level]._names()
1538

    
1539
  def is_owned(self, level):
1540
    """Check whether we are owning locks at the given level
1541

1542
    """
1543
    return self.__keyring[level].is_owned()
1544

    
1545
  def list_owned(self, level):
1546
    """Get the set of owned locks at the given level
1547

1548
    """
1549
    return self.__keyring[level].list_owned()
1550

    
1551
  def check_owned(self, level, names, shared=-1):
1552
    """Check if locks at a certain level are owned in a specific mode.
1553

1554
    @see: L{LockSet.check_owned}
1555

1556
    """
1557
    return self.__keyring[level].check_owned(names, shared=shared)
1558

    
1559
  def _upper_owned(self, level):
1560
    """Check that we don't own any lock at a level greater than the given one.
1561

1562
    """
1563
    # This way of checking only works if LEVELS[i] = i, which we check for in
1564
    # the test cases.
1565
    return compat.any((self.is_owned(l) for l in LEVELS[level + 1:]))
1566

    
1567
  def _BGL_owned(self): # pylint: disable=C0103
1568
    """Check if the current thread owns the BGL.
1569

1570
    Both an exclusive or a shared acquisition work.
1571

1572
    """
1573
    return BGL in self.__keyring[LEVEL_CLUSTER].list_owned()
1574

    
1575
  @staticmethod
1576
  def _contains_BGL(level, names): # pylint: disable=C0103
1577
    """Check if the level contains the BGL.
1578

1579
    Check if acting on the given level and set of names will change
1580
    the status of the Big Ganeti Lock.
1581

1582
    """
1583
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1584

    
1585
  def acquire(self, level, names, timeout=None, shared=0, priority=None):
1586
    """Acquire a set of resource locks, at the same level.
1587

1588
    @type level: member of locking.LEVELS
1589
    @param level: the level at which the locks shall be acquired
1590
    @type names: list of strings (or string)
1591
    @param names: the names of the locks which shall be acquired
1592
        (special lock names, or instance/node names)
1593
    @type shared: integer (0/1) used as a boolean
1594
    @param shared: whether to acquire in shared mode; by default
1595
        an exclusive lock will be acquired
1596
    @type timeout: float
1597
    @param timeout: Maximum time to acquire all locks
1598
    @type priority: integer
1599
    @param priority: Priority for acquiring lock
1600

1601
    """
1602
    assert level in LEVELS, "Invalid locking level %s" % level
1603

    
1604
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1605
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1606
    # so even if we've migrated we need to at least share the BGL to be
1607
    # compatible with them. Of course if we own the BGL exclusively there's no
1608
    # point in acquiring any other lock, unless perhaps we are half way through
1609
    # the migration of the current opcode.
1610
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1611
            "You must own the Big Ganeti Lock before acquiring any other")
1612

    
1613
    # Check we don't own locks at the same or upper levels.
1614
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1615
           " while owning some at a greater one")
1616

    
1617
    # Acquire the locks in the set.
1618
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1619
                                         priority=priority)
1620

    
1621
  def downgrade(self, level, names=None):
1622
    """Downgrade a set of resource locks from exclusive to shared mode.
1623

1624
    You must have acquired the locks in exclusive mode.
1625

1626
    @type level: member of locking.LEVELS
1627
    @param level: the level at which the locks shall be downgraded
1628
    @type names: list of strings, or None
1629
    @param names: the names of the locks which shall be downgraded
1630
        (defaults to all the locks acquired at the level)
1631

1632
    """
1633
    assert level in LEVELS, "Invalid locking level %s" % level
1634

    
1635
    return self.__keyring[level].downgrade(names=names)
1636

    
1637
  def release(self, level, names=None):
1638
    """Release a set of resource locks, at the same level.
1639

1640
    You must have acquired the locks, either in shared or in exclusive
1641
    mode, before releasing them.
1642

1643
    @type level: member of locking.LEVELS
1644
    @param level: the level at which the locks shall be released
1645
    @type names: list of strings, or None
1646
    @param names: the names of the locks which shall be released
1647
        (defaults to all the locks acquired at that level)
1648

1649
    """
1650
    assert level in LEVELS, "Invalid locking level %s" % level
1651
    assert (not self._contains_BGL(level, names) or
1652
            not self._upper_owned(LEVEL_CLUSTER)), (
1653
            "Cannot release the Big Ganeti Lock while holding something"
1654
            " at upper levels (%r)" %
1655
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self.list_owned(i))
1656
                              for i in self.__keyring.keys()]), ))
1657

    
1658
    # Release will complain if we don't own the locks already
1659
    return self.__keyring[level].release(names)
1660

    
1661
  def add(self, level, names, acquired=0, shared=0):
1662
    """Add locks at the specified level.
1663

1664
    @type level: member of locking.LEVELS_MOD
1665
    @param level: the level at which the locks shall be added
1666
    @type names: list of strings
1667
    @param names: names of the locks to acquire
1668
    @type acquired: integer (0/1) used as a boolean
1669
    @param acquired: whether to acquire the newly added locks
1670
    @type shared: integer (0/1) used as a boolean
1671
    @param shared: whether the acquisition will be shared
1672

1673
    """
1674
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1675
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1676
           " operations")
1677
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1678
           " while owning some at a greater one")
1679
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1680

    
1681
  def remove(self, level, names):
1682
    """Remove locks from the specified level.
1683

1684
    You must either already own the locks you are trying to remove
1685
    exclusively or not own any lock at an upper level.
1686

1687
    @type level: member of locking.LEVELS_MOD
1688
    @param level: the level at which the locks shall be removed
1689
    @type names: list of strings
1690
    @param names: the names of the locks which shall be removed
1691
        (special lock names, or instance/node names)
1692

1693
    """
1694
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1695
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1696
           " operations")
1697
    # Check we either own the level or don't own anything from here
1698
    # up. LockSet.remove() will check the case in which we don't own
1699
    # all the needed resources, or we have a shared ownership.
1700
    assert self.is_owned(level) or not self._upper_owned(level), (
1701
           "Cannot remove locks at a level while not owning it or"
1702
           " owning some at a greater one")
1703
    return self.__keyring[level].remove(names)
1704

    
1705

    
1706
def _MonitorSortKey((item, idx, num)):
1707
  """Sorting key function.
1708

1709
  Sort by name, registration order and then order of information. This provides
1710
  a stable sort order over different providers, even if they return the same
1711
  name.
1712

1713
  """
1714
  (name, _, _, _) = item
1715

    
1716
  return (utils.NiceSortKey(name), num, idx)
1717

    
1718

    
1719
class LockMonitor(object):
1720
  _LOCK_ATTR = "_lock"
1721

    
1722
  def __init__(self):
1723
    """Initializes this class.
1724

1725
    """
1726
    self._lock = SharedLock("LockMonitor")
1727

    
1728
    # Counter for stable sorting
1729
    self._counter = itertools.count(0)
1730

    
1731
    # Tracked locks. Weak references are used to avoid issues with circular
1732
    # references and deletion.
1733
    self._locks = weakref.WeakKeyDictionary()
1734

    
1735
  @ssynchronized(_LOCK_ATTR)
1736
  def RegisterLock(self, provider):
1737
    """Registers a new lock.
1738

1739
    @param provider: Object with a callable method named C{GetLockInfo}, taking
1740
      a single C{set} containing the requested information items
1741
    @note: It would be nicer to only receive the function generating the
1742
      requested information but, as it turns out, weak references to bound
1743
      methods (e.g. C{self.GetLockInfo}) are tricky; there are several
1744
      workarounds, but none of the ones I found works properly in combination
1745
      with a standard C{WeakKeyDictionary}
1746

1747
    """
1748
    assert provider not in self._locks, "Duplicate registration"
1749

    
1750
    # There used to be a check for duplicate names here. As it turned out, when
1751
    # a lock is re-created with the same name in a very short timeframe, the
1752
    # previous instance might not yet be removed from the weakref dictionary.
1753
    # By keeping track of the order of incoming registrations, a stable sort
1754
    # ordering can still be guaranteed.
1755

    
1756
    self._locks[provider] = self._counter.next()
1757

    
1758
  def _GetLockInfo(self, requested):
1759
    """Get information from all locks.
1760

1761
    """
1762
    # Must hold lock while getting consistent list of tracked items
1763
    self._lock.acquire(shared=1)
1764
    try:
1765
      items = self._locks.items()
1766
    finally:
1767
      self._lock.release()
1768

    
1769
    return [(info, idx, num)
1770
            for (provider, num) in items
1771
            for (idx, info) in enumerate(provider.GetLockInfo(requested))]
1772

    
1773
  def _Query(self, fields):
1774
    """Queries information from all locks.
1775

1776
    @type fields: list of strings
1777
    @param fields: List of fields to return
1778

1779
    """
1780
    qobj = query.Query(query.LOCK_FIELDS, fields)
1781

    
1782
    # Get all data with internal lock held and then sort by name and incoming
1783
    # order
1784
    lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1785
                      key=_MonitorSortKey)
1786

    
1787
    # Extract lock information and build query data
1788
    return (qobj, query.LockQueryData(map(compat.fst, lockinfo)))
1789

    
1790
  def QueryLocks(self, fields):
1791
    """Queries information from all locks.
1792

1793
    @type fields: list of strings
1794
    @param fields: List of fields to return
1795

1796
    """
1797
    (qobj, ctx) = self._Query(fields)
1798

    
1799
    # Prepare query response
1800
    return query.GetQueryResponse(qobj, ctx)
1801

    
1802
  def OldStyleQueryLocks(self, fields):
1803
    """Queries information from all locks, returning old-style data.
1804

1805
    @type fields: list of strings
1806
    @param fields: List of fields to return
1807

1808
    """
1809
    (qobj, ctx) = self._Query(fields)
1810

    
1811
    return qobj.OldStyleQuery(ctx)