Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 19b9ba9a

History | View | Annotate | Download (44.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33
import weakref
34
import logging
35

    
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import compat
39

    
40

    
41
def ssynchronized(mylock, shared=0):
42
  """Shared Synchronization decorator.
43

44
  Calls the function holding the given lock, either in exclusive or shared
45
  mode. It requires the passed lock to be a SharedLock (or support its
46
  semantics).
47

48
  @type mylock: lockable object or string
49
  @param mylock: lock to acquire or class member name of the lock to acquire
50

51
  """
52
  def wrap(fn):
53
    def sync_function(*args, **kwargs):
54
      if isinstance(mylock, basestring):
55
        assert args, "cannot ssynchronize on non-class method: self not found"
56
        # args[0] is "self"
57
        lock = getattr(args[0], mylock)
58
      else:
59
        lock = mylock
60
      lock.acquire(shared=shared)
61
      try:
62
        return fn(*args, **kwargs)
63
      finally:
64
        lock.release()
65
    return sync_function
66
  return wrap
67

    
68

    
69
class RunningTimeout(object):
70
  """Class to calculate remaining timeout when doing several operations.
71

72
  """
73
  __slots__ = [
74
    "_allow_negative",
75
    "_start_time",
76
    "_time_fn",
77
    "_timeout",
78
    ]
79

    
80
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
81
    """Initializes this class.
82

83
    @type timeout: float
84
    @param timeout: Timeout duration
85
    @type allow_negative: bool
86
    @param allow_negative: Whether to return values below zero
87
    @param _time_fn: Time function for unittests
88

89
    """
90
    object.__init__(self)
91

    
92
    if timeout is not None and timeout < 0.0:
93
      raise ValueError("Timeout must not be negative")
94

    
95
    self._timeout = timeout
96
    self._allow_negative = allow_negative
97
    self._time_fn = _time_fn
98

    
99
    self._start_time = None
100

    
101
  def Remaining(self):
102
    """Returns the remaining timeout.
103

104
    """
105
    if self._timeout is None:
106
      return None
107

    
108
    # Get start time on first calculation
109
    if self._start_time is None:
110
      self._start_time = self._time_fn()
111

    
112
    # Calculate remaining time
113
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
114

    
115
    if not self._allow_negative:
116
      # Ensure timeout is always >= 0
117
      return max(0.0, remaining_timeout)
118

    
119
    return remaining_timeout
120

    
121

    
122
class _SingleNotifyPipeConditionWaiter(object):
123
  """Helper class for SingleNotifyPipeCondition
124

125
  """
126
  __slots__ = [
127
    "_fd",
128
    "_poller",
129
    ]
130

    
131
  def __init__(self, poller, fd):
132
    """Constructor for _SingleNotifyPipeConditionWaiter
133

134
    @type poller: select.poll
135
    @param poller: Poller object
136
    @type fd: int
137
    @param fd: File descriptor to wait for
138

139
    """
140
    object.__init__(self)
141
    self._poller = poller
142
    self._fd = fd
143

    
144
  def __call__(self, timeout):
145
    """Wait for something to happen on the pipe.
146

147
    @type timeout: float or None
148
    @param timeout: Timeout for waiting (can be None)
149

150
    """
151
    running_timeout = RunningTimeout(timeout, True)
152

    
153
    while True:
154
      remaining_time = running_timeout.Remaining()
155

    
156
      if remaining_time is not None:
157
        if remaining_time < 0.0:
158
          break
159

    
160
        # Our calculation uses seconds, poll() wants milliseconds
161
        remaining_time *= 1000
162

    
163
      try:
164
        result = self._poller.poll(remaining_time)
165
      except EnvironmentError, err:
166
        if err.errno != errno.EINTR:
167
          raise
168
        result = None
169

    
170
      # Check whether we were notified
171
      if result and result[0][0] == self._fd:
172
        break
173

    
174

    
175
class _BaseCondition(object):
176
  """Base class containing common code for conditions.
177

178
  Some of this code is taken from python's threading module.
179

180
  """
181
  __slots__ = [
182
    "_lock",
183
    "acquire",
184
    "release",
185
    "_is_owned",
186
    "_acquire_restore",
187
    "_release_save",
188
    ]
189

    
190
  def __init__(self, lock):
191
    """Constructor for _BaseCondition.
192

193
    @type lock: threading.Lock
194
    @param lock: condition base lock
195

196
    """
197
    object.__init__(self)
198

    
199
    try:
200
      self._release_save = lock._release_save
201
    except AttributeError:
202
      self._release_save = self._base_release_save
203
    try:
204
      self._acquire_restore = lock._acquire_restore
205
    except AttributeError:
206
      self._acquire_restore = self._base_acquire_restore
207
    try:
208
      self._is_owned = lock._is_owned
209
    except AttributeError:
210
      self._is_owned = self._base_is_owned
211

    
212
    self._lock = lock
213

    
214
    # Export the lock's acquire() and release() methods
215
    self.acquire = lock.acquire
216
    self.release = lock.release
217

    
218
  def _base_is_owned(self):
219
    """Check whether lock is owned by current thread.
220

221
    """
222
    if self._lock.acquire(0):
223
      self._lock.release()
224
      return False
225
    return True
226

    
227
  def _base_release_save(self):
228
    self._lock.release()
229

    
230
  def _base_acquire_restore(self, _):
231
    self._lock.acquire()
232

    
233
  def _check_owned(self):
234
    """Raise an exception if the current thread doesn't own the lock.
235

236
    """
237
    if not self._is_owned():
238
      raise RuntimeError("cannot work with un-aquired lock")
239

    
240

    
241
class SingleNotifyPipeCondition(_BaseCondition):
242
  """Condition which can only be notified once.
243

244
  This condition class uses pipes and poll, internally, to be able to wait for
245
  notification with a timeout, without resorting to polling. It is almost
246
  compatible with Python's threading.Condition, with the following differences:
247
    - notifyAll can only be called once, and no wait can happen after that
248
    - notify is not supported, only notifyAll
249

250
  """
251

    
252
  __slots__ = [
253
    "_poller",
254
    "_read_fd",
255
    "_write_fd",
256
    "_nwaiters",
257
    "_notified",
258
    ]
259

    
260
  _waiter_class = _SingleNotifyPipeConditionWaiter
261

    
262
  def __init__(self, lock):
263
    """Constructor for SingleNotifyPipeCondition
264

265
    """
266
    _BaseCondition.__init__(self, lock)
267
    self._nwaiters = 0
268
    self._notified = False
269
    self._read_fd = None
270
    self._write_fd = None
271
    self._poller = None
272

    
273
  def _check_unnotified(self):
274
    """Throws an exception if already notified.
275

276
    """
277
    if self._notified:
278
      raise RuntimeError("cannot use already notified condition")
279

    
280
  def _Cleanup(self):
281
    """Cleanup open file descriptors, if any.
282

283
    """
284
    if self._read_fd is not None:
285
      os.close(self._read_fd)
286
      self._read_fd = None
287

    
288
    if self._write_fd is not None:
289
      os.close(self._write_fd)
290
      self._write_fd = None
291
    self._poller = None
292

    
293
  def wait(self, timeout=None):
294
    """Wait for a notification.
295

296
    @type timeout: float or None
297
    @param timeout: Waiting timeout (can be None)
298

299
    """
300
    self._check_owned()
301
    self._check_unnotified()
302

    
303
    self._nwaiters += 1
304
    try:
305
      if self._poller is None:
306
        (self._read_fd, self._write_fd) = os.pipe()
307
        self._poller = select.poll()
308
        self._poller.register(self._read_fd, select.POLLHUP)
309

    
310
      wait_fn = self._waiter_class(self._poller, self._read_fd)
311
      state = self._release_save()
312
      try:
313
        # Wait for notification
314
        wait_fn(timeout)
315
      finally:
316
        # Re-acquire lock
317
        self._acquire_restore(state)
318
    finally:
319
      self._nwaiters -= 1
320
      if self._nwaiters == 0:
321
        self._Cleanup()
322

    
323
  def notifyAll(self): # pylint: disable-msg=C0103
324
    """Close the writing side of the pipe to notify all waiters.
325

326
    """
327
    self._check_owned()
328
    self._check_unnotified()
329
    self._notified = True
330
    if self._write_fd is not None:
331
      os.close(self._write_fd)
332
      self._write_fd = None
333

    
334

    
335
class PipeCondition(_BaseCondition):
336
  """Group-only non-polling condition with counters.
337

338
  This condition class uses pipes and poll, internally, to be able to wait for
339
  notification with a timeout, without resorting to polling. It is almost
340
  compatible with Python's threading.Condition, but only supports notifyAll and
341
  non-recursive locks. As an additional features it's able to report whether
342
  there are any waiting threads.
343

344
  """
345
  __slots__ = [
346
    "_nwaiters",
347
    "_single_condition",
348
    ]
349

    
350
  _single_condition_class = SingleNotifyPipeCondition
351

    
352
  def __init__(self, lock):
353
    """Initializes this class.
354

355
    """
356
    _BaseCondition.__init__(self, lock)
357
    self._nwaiters = 0
358
    self._single_condition = self._single_condition_class(self._lock)
359

    
360
  def wait(self, timeout=None):
361
    """Wait for a notification.
362

363
    @type timeout: float or None
364
    @param timeout: Waiting timeout (can be None)
365

366
    """
367
    self._check_owned()
368

    
369
    # Keep local reference to the pipe. It could be replaced by another thread
370
    # notifying while we're waiting.
371
    my_condition = self._single_condition
372

    
373
    assert self._nwaiters >= 0
374
    self._nwaiters += 1
375
    try:
376
      my_condition.wait(timeout)
377
    finally:
378
      assert self._nwaiters > 0
379
      self._nwaiters -= 1
380

    
381
  def notifyAll(self): # pylint: disable-msg=C0103
382
    """Notify all currently waiting threads.
383

384
    """
385
    self._check_owned()
386
    self._single_condition.notifyAll()
387
    self._single_condition = self._single_condition_class(self._lock)
388

    
389
  def has_waiting(self):
390
    """Returns whether there are active waiters.
391

392
    """
393
    self._check_owned()
394

    
395
    return bool(self._nwaiters)
396

    
397

    
398
class SharedLock(object):
399
  """Implements a shared lock.
400

401
  Multiple threads can acquire the lock in a shared way, calling
402
  acquire_shared().  In order to acquire the lock in an exclusive way threads
403
  can call acquire_exclusive().
404

405
  The lock prevents starvation but does not guarantee that threads will acquire
406
  the shared lock in the order they queued for it, just that they will
407
  eventually do so.
408

409
  @type name: string
410
  @ivar name: the name of the lock
411

412
  """
413
  __slots__ = [
414
    "__weakref__",
415
    "__active_shr_c",
416
    "__inactive_shr_c",
417
    "__deleted",
418
    "__exc",
419
    "__lock",
420
    "__pending",
421
    "__shr",
422
    "name",
423
    ]
424

    
425
  __condition_class = PipeCondition
426

    
427
  def __init__(self, name, monitor=None):
428
    """Construct a new SharedLock.
429

430
    @param name: the name of the lock
431
    @type monitor: L{LockMonitor}
432
    @param monitor: Lock monitor with which to register
433

434
    """
435
    object.__init__(self)
436

    
437
    self.name = name
438

    
439
    # Internal lock
440
    self.__lock = threading.Lock()
441

    
442
    # Queue containing waiting acquires
443
    self.__pending = []
444

    
445
    # Active and inactive conditions for shared locks
446
    self.__active_shr_c = self.__condition_class(self.__lock)
447
    self.__inactive_shr_c = self.__condition_class(self.__lock)
448

    
449
    # Current lock holders
450
    self.__shr = set()
451
    self.__exc = None
452

    
453
    # is this lock in the deleted state?
454
    self.__deleted = False
455

    
456
    # Register with lock monitor
457
    if monitor:
458
      monitor.RegisterLock(self)
459

    
460
  def GetInfo(self, fields):
461
    """Retrieves information for querying locks.
462

463
    @type fields: list of strings
464
    @param fields: List of fields to return
465

466
    """
467
    self.__lock.acquire()
468
    try:
469
      info = []
470

    
471
      # Note: to avoid unintentional race conditions, no references to
472
      # modifiable objects should be returned unless they were created in this
473
      # function.
474
      for fname in fields:
475
        if fname == "name":
476
          info.append(self.name)
477
        elif fname == "mode":
478
          if self.__deleted:
479
            info.append("deleted")
480
            assert not (self.__exc or self.__shr)
481
          elif self.__exc:
482
            info.append("exclusive")
483
          elif self.__shr:
484
            info.append("shared")
485
          else:
486
            info.append(None)
487
        elif fname == "owner":
488
          if self.__exc:
489
            owner = [self.__exc]
490
          else:
491
            owner = self.__shr
492

    
493
          if owner:
494
            assert not self.__deleted
495
            info.append([i.getName() for i in owner])
496
          else:
497
            info.append(None)
498
        else:
499
          raise errors.OpExecError("Invalid query field '%s'" % fname)
500

    
501
      return info
502
    finally:
503
      self.__lock.release()
504

    
505
  def __check_deleted(self):
506
    """Raises an exception if the lock has been deleted.
507

508
    """
509
    if self.__deleted:
510
      raise errors.LockError("Deleted lock %s" % self.name)
511

    
512
  def __is_sharer(self):
513
    """Is the current thread sharing the lock at this time?
514

515
    """
516
    return threading.currentThread() in self.__shr
517

    
518
  def __is_exclusive(self):
519
    """Is the current thread holding the lock exclusively at this time?
520

521
    """
522
    return threading.currentThread() == self.__exc
523

    
524
  def __is_owned(self, shared=-1):
525
    """Is the current thread somehow owning the lock at this time?
526

527
    This is a private version of the function, which presumes you're holding
528
    the internal lock.
529

530
    """
531
    if shared < 0:
532
      return self.__is_sharer() or self.__is_exclusive()
533
    elif shared:
534
      return self.__is_sharer()
535
    else:
536
      return self.__is_exclusive()
537

    
538
  def _is_owned(self, shared=-1):
539
    """Is the current thread somehow owning the lock at this time?
540

541
    @param shared:
542
        - < 0: check for any type of ownership (default)
543
        - 0: check for exclusive ownership
544
        - > 0: check for shared ownership
545

546
    """
547
    self.__lock.acquire()
548
    try:
549
      return self.__is_owned(shared=shared)
550
    finally:
551
      self.__lock.release()
552

    
553
  def _count_pending(self):
554
    """Returns the number of pending acquires.
555

556
    @rtype: int
557

558
    """
559
    self.__lock.acquire()
560
    try:
561
      return len(self.__pending)
562
    finally:
563
      self.__lock.release()
564

    
565
  def __do_acquire(self, shared):
566
    """Actually acquire the lock.
567

568
    """
569
    if shared:
570
      self.__shr.add(threading.currentThread())
571
    else:
572
      self.__exc = threading.currentThread()
573

    
574
  def __can_acquire(self, shared):
575
    """Determine whether lock can be acquired.
576

577
    """
578
    if shared:
579
      return self.__exc is None
580
    else:
581
      return len(self.__shr) == 0 and self.__exc is None
582

    
583
  def __is_on_top(self, cond):
584
    """Checks whether the passed condition is on top of the queue.
585

586
    The caller must make sure the queue isn't empty.
587

588
    """
589
    return self.__pending[0] == cond
590

    
591
  def __acquire_unlocked(self, shared, timeout):
592
    """Acquire a shared lock.
593

594
    @param shared: whether to acquire in shared mode; by default an
595
        exclusive lock will be acquired
596
    @param timeout: maximum waiting time before giving up
597

598
    """
599
    self.__check_deleted()
600

    
601
    # We cannot acquire the lock if we already have it
602
    assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
603
                                   " %s" % self.name)
604

    
605
    # Check whether someone else holds the lock or there are pending acquires.
606
    if not self.__pending and self.__can_acquire(shared):
607
      # Apparently not, can acquire lock directly.
608
      self.__do_acquire(shared)
609
      return True
610

    
611
    if shared:
612
      wait_condition = self.__active_shr_c
613

    
614
      # Check if we're not yet in the queue
615
      if wait_condition not in self.__pending:
616
        self.__pending.append(wait_condition)
617
    else:
618
      wait_condition = self.__condition_class(self.__lock)
619
      # Always add to queue
620
      self.__pending.append(wait_condition)
621

    
622
    try:
623
      # Wait until we become the topmost acquire in the queue or the timeout
624
      # expires.
625
      while not (self.__is_on_top(wait_condition) and
626
                 self.__can_acquire(shared)):
627
        # Wait for notification
628
        wait_condition.wait(timeout)
629
        self.__check_deleted()
630

    
631
        # A lot of code assumes blocking acquires always succeed. Loop
632
        # internally for that case.
633
        if timeout is not None:
634
          break
635

    
636
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
637
        self.__do_acquire(shared)
638
        return True
639
    finally:
640
      # Remove condition from queue if there are no more waiters
641
      if not wait_condition.has_waiting() and not self.__deleted:
642
        self.__pending.remove(wait_condition)
643

    
644
    return False
645

    
646
  def acquire(self, shared=0, timeout=None, test_notify=None):
647
    """Acquire a shared lock.
648

649
    @type shared: integer (0/1) used as a boolean
650
    @param shared: whether to acquire in shared mode; by default an
651
        exclusive lock will be acquired
652
    @type timeout: float
653
    @param timeout: maximum waiting time before giving up
654
    @type test_notify: callable or None
655
    @param test_notify: Special callback function for unittesting
656

657
    """
658
    self.__lock.acquire()
659
    try:
660
      # We already got the lock, notify now
661
      if __debug__ and callable(test_notify):
662
        test_notify()
663

    
664
      return self.__acquire_unlocked(shared, timeout)
665
    finally:
666
      self.__lock.release()
667

    
668
  def release(self):
669
    """Release a Shared Lock.
670

671
    You must have acquired the lock, either in shared or in exclusive mode,
672
    before calling this function.
673

674
    """
675
    self.__lock.acquire()
676
    try:
677
      assert self.__is_exclusive() or self.__is_sharer(), \
678
        "Cannot release non-owned lock"
679

    
680
      # Autodetect release type
681
      if self.__is_exclusive():
682
        self.__exc = None
683
      else:
684
        self.__shr.remove(threading.currentThread())
685

    
686
      # Notify topmost condition in queue
687
      if self.__pending:
688
        first_condition = self.__pending[0]
689
        first_condition.notifyAll()
690

    
691
        if first_condition == self.__active_shr_c:
692
          self.__active_shr_c = self.__inactive_shr_c
693
          self.__inactive_shr_c = first_condition
694

    
695
    finally:
696
      self.__lock.release()
697

    
698
  def delete(self, timeout=None):
699
    """Delete a Shared Lock.
700

701
    This operation will declare the lock for removal. First the lock will be
702
    acquired in exclusive mode if you don't already own it, then the lock
703
    will be put in a state where any future and pending acquire() fail.
704

705
    @type timeout: float
706
    @param timeout: maximum waiting time before giving up
707

708
    """
709
    self.__lock.acquire()
710
    try:
711
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
712

    
713
      self.__check_deleted()
714

    
715
      # The caller is allowed to hold the lock exclusively already.
716
      acquired = self.__is_exclusive()
717

    
718
      if not acquired:
719
        acquired = self.__acquire_unlocked(0, timeout)
720

    
721
        assert self.__is_exclusive() and not self.__is_sharer(), \
722
          "Lock wasn't acquired in exclusive mode"
723

    
724
      if acquired:
725
        self.__deleted = True
726
        self.__exc = None
727

    
728
        assert not (self.__exc or self.__shr), "Found owner during deletion"
729

    
730
        # Notify all acquires. They'll throw an error.
731
        while self.__pending:
732
          self.__pending.pop().notifyAll()
733

    
734
      return acquired
735
    finally:
736
      self.__lock.release()
737

    
738
  def _release_save(self):
739
    shared = self.__is_sharer()
740
    self.release()
741
    return shared
742

    
743
  def _acquire_restore(self, shared):
744
    self.acquire(shared=shared)
745

    
746

    
747
# Whenever we want to acquire a full LockSet we pass None as the value
748
# to acquire.  Hide this behind this nicely named constant.
749
ALL_SET = None
750

    
751

    
752
class _AcquireTimeout(Exception):
753
  """Internal exception to abort an acquire on a timeout.
754

755
  """
756

    
757

    
758
class LockSet:
759
  """Implements a set of locks.
760

761
  This abstraction implements a set of shared locks for the same resource type,
762
  distinguished by name. The user can lock a subset of the resources and the
763
  LockSet will take care of acquiring the locks always in the same order, thus
764
  preventing deadlock.
765

766
  All the locks needed in the same set must be acquired together, though.
767

768
  @type name: string
769
  @ivar name: the name of the lockset
770

771
  """
772
  def __init__(self, members, name, monitor=None):
773
    """Constructs a new LockSet.
774

775
    @type members: list of strings
776
    @param members: initial members of the set
777
    @type monitor: L{LockMonitor}
778
    @param monitor: Lock monitor with which to register member locks
779

780
    """
781
    assert members is not None, "members parameter is not a list"
782
    self.name = name
783

    
784
    # Lock monitor
785
    self.__monitor = monitor
786

    
787
    # Used internally to guarantee coherency.
788
    self.__lock = SharedLock(name)
789

    
790
    # The lockdict indexes the relationship name -> lock
791
    # The order-of-locking is implied by the alphabetical order of names
792
    self.__lockdict = {}
793

    
794
    for mname in members:
795
      self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
796
                                          monitor=monitor)
797

    
798
    # The owner dict contains the set of locks each thread owns. For
799
    # performance each thread can access its own key without a global lock on
800
    # this structure. It is paramount though that *no* other type of access is
801
    # done to this structure (eg. no looping over its keys). *_owner helper
802
    # function are defined to guarantee access is correct, but in general never
803
    # do anything different than __owners[threading.currentThread()], or there
804
    # will be trouble.
805
    self.__owners = {}
806

    
807
  def _GetLockName(self, mname):
808
    """Returns the name for a member lock.
809

810
    """
811
    return "%s/%s" % (self.name, mname)
812

    
813
  def _is_owned(self):
814
    """Is the current thread a current level owner?"""
815
    return threading.currentThread() in self.__owners
816

    
817
  def _add_owned(self, name=None):
818
    """Note the current thread owns the given lock"""
819
    if name is None:
820
      if not self._is_owned():
821
        self.__owners[threading.currentThread()] = set()
822
    else:
823
      if self._is_owned():
824
        self.__owners[threading.currentThread()].add(name)
825
      else:
826
        self.__owners[threading.currentThread()] = set([name])
827

    
828
  def _del_owned(self, name=None):
829
    """Note the current thread owns the given lock"""
830

    
831
    assert not (name is None and self.__lock._is_owned()), \
832
           "Cannot hold internal lock when deleting owner status"
833

    
834
    if name is not None:
835
      self.__owners[threading.currentThread()].remove(name)
836

    
837
    # Only remove the key if we don't hold the set-lock as well
838
    if (not self.__lock._is_owned() and
839
        not self.__owners[threading.currentThread()]):
840
      del self.__owners[threading.currentThread()]
841

    
842
  def _list_owned(self):
843
    """Get the set of resource names owned by the current thread"""
844
    if self._is_owned():
845
      return self.__owners[threading.currentThread()].copy()
846
    else:
847
      return set()
848

    
849
  def _release_and_delete_owned(self):
850
    """Release and delete all resources owned by the current thread"""
851
    for lname in self._list_owned():
852
      lock = self.__lockdict[lname]
853
      if lock._is_owned():
854
        lock.release()
855
      self._del_owned(name=lname)
856

    
857
  def __names(self):
858
    """Return the current set of names.
859

860
    Only call this function while holding __lock and don't iterate on the
861
    result after releasing the lock.
862

863
    """
864
    return self.__lockdict.keys()
865

    
866
  def _names(self):
867
    """Return a copy of the current set of elements.
868

869
    Used only for debugging purposes.
870

871
    """
872
    # If we don't already own the set-level lock acquired
873
    # we'll get it and note we need to release it later.
874
    release_lock = False
875
    if not self.__lock._is_owned():
876
      release_lock = True
877
      self.__lock.acquire(shared=1)
878
    try:
879
      result = self.__names()
880
    finally:
881
      if release_lock:
882
        self.__lock.release()
883
    return set(result)
884

    
885
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
886
    """Acquire a set of resource locks.
887

888
    @type names: list of strings (or string)
889
    @param names: the names of the locks which shall be acquired
890
        (special lock names, or instance/node names)
891
    @type shared: integer (0/1) used as a boolean
892
    @param shared: whether to acquire in shared mode; by default an
893
        exclusive lock will be acquired
894
    @type timeout: float or None
895
    @param timeout: Maximum time to acquire all locks
896
    @type test_notify: callable or None
897
    @param test_notify: Special callback function for unittesting
898

899
    @return: Set of all locks successfully acquired or None in case of timeout
900

901
    @raise errors.LockError: when any lock we try to acquire has
902
        been deleted before we succeed. In this case none of the
903
        locks requested will be acquired.
904

905
    """
906
    assert timeout is None or timeout >= 0.0
907

    
908
    # Check we don't already own locks at this level
909
    assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
910
                                  " (lockset %s)" % self.name)
911

    
912
    # We need to keep track of how long we spent waiting for a lock. The
913
    # timeout passed to this function is over all lock acquires.
914
    running_timeout = RunningTimeout(timeout, False)
915

    
916
    try:
917
      if names is not None:
918
        # Support passing in a single resource to acquire rather than many
919
        if isinstance(names, basestring):
920
          names = [names]
921

    
922
        return self.__acquire_inner(names, False, shared,
923
                                    running_timeout.Remaining, test_notify)
924

    
925
      else:
926
        # If no names are given acquire the whole set by not letting new names
927
        # being added before we release, and getting the current list of names.
928
        # Some of them may then be deleted later, but we'll cope with this.
929
        #
930
        # We'd like to acquire this lock in a shared way, as it's nice if
931
        # everybody else can use the instances at the same time. If are
932
        # acquiring them exclusively though they won't be able to do this
933
        # anyway, though, so we'll get the list lock exclusively as well in
934
        # order to be able to do add() on the set while owning it.
935
        if not self.__lock.acquire(shared=shared,
936
                                   timeout=running_timeout.Remaining()):
937
          raise _AcquireTimeout()
938
        try:
939
          # note we own the set-lock
940
          self._add_owned()
941

    
942
          return self.__acquire_inner(self.__names(), True, shared,
943
                                      running_timeout.Remaining, test_notify)
944
        except:
945
          # We shouldn't have problems adding the lock to the owners list, but
946
          # if we did we'll try to release this lock and re-raise exception.
947
          # Of course something is going to be really wrong, after this.
948
          self.__lock.release()
949
          self._del_owned()
950
          raise
951

    
952
    except _AcquireTimeout:
953
      return None
954

    
955
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
956
    """Inner logic for acquiring a number of locks.
957

958
    @param names: Names of the locks to be acquired
959
    @param want_all: Whether all locks in the set should be acquired
960
    @param shared: Whether to acquire in shared mode
961
    @param timeout_fn: Function returning remaining timeout
962
    @param test_notify: Special callback function for unittesting
963

964
    """
965
    acquire_list = []
966

    
967
    # First we look the locks up on __lockdict. We have no way of being sure
968
    # they will still be there after, but this makes it a lot faster should
969
    # just one of them be the already wrong. Using a sorted sequence to prevent
970
    # deadlocks.
971
    for lname in sorted(utils.UniqueSequence(names)):
972
      try:
973
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
974
      except KeyError:
975
        if want_all:
976
          # We are acquiring all the set, it doesn't matter if this particular
977
          # element is not there anymore.
978
          continue
979

    
980
        raise errors.LockError("Non-existing lock %s in set %s" %
981
                               (lname, self.name))
982

    
983
      acquire_list.append((lname, lock))
984

    
985
    # This will hold the locknames we effectively acquired.
986
    acquired = set()
987

    
988
    try:
989
      # Now acquire_list contains a sorted list of resources and locks we
990
      # want.  In order to get them we loop on this (private) list and
991
      # acquire() them.  We gave no real guarantee they will still exist till
992
      # this is done but .acquire() itself is safe and will alert us if the
993
      # lock gets deleted.
994
      for (lname, lock) in acquire_list:
995
        if __debug__ and callable(test_notify):
996
          test_notify_fn = lambda: test_notify(lname)
997
        else:
998
          test_notify_fn = None
999

    
1000
        timeout = timeout_fn()
1001

    
1002
        try:
1003
          # raises LockError if the lock was deleted
1004
          acq_success = lock.acquire(shared=shared, timeout=timeout,
1005
                                     test_notify=test_notify_fn)
1006
        except errors.LockError:
1007
          if want_all:
1008
            # We are acquiring all the set, it doesn't matter if this
1009
            # particular element is not there anymore.
1010
            continue
1011

    
1012
          raise errors.LockError("Non-existing lock %s in set %s" %
1013
                                 (lname, self.name))
1014

    
1015
        if not acq_success:
1016
          # Couldn't get lock or timeout occurred
1017
          if timeout is None:
1018
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
1019
            # blocking.
1020
            raise errors.LockError("Failed to get lock %s (set %s)" %
1021
                                   (lname, self.name))
1022

    
1023
          raise _AcquireTimeout()
1024

    
1025
        try:
1026
          # now the lock cannot be deleted, we have it!
1027
          self._add_owned(name=lname)
1028
          acquired.add(lname)
1029

    
1030
        except:
1031
          # We shouldn't have problems adding the lock to the owners list, but
1032
          # if we did we'll try to release this lock and re-raise exception.
1033
          # Of course something is going to be really wrong after this.
1034
          if lock._is_owned():
1035
            lock.release()
1036
          raise
1037

    
1038
    except:
1039
      # Release all owned locks
1040
      self._release_and_delete_owned()
1041
      raise
1042

    
1043
    return acquired
1044

    
1045
  def release(self, names=None):
1046
    """Release a set of resource locks, at the same level.
1047

1048
    You must have acquired the locks, either in shared or in exclusive mode,
1049
    before releasing them.
1050

1051
    @type names: list of strings, or None
1052
    @param names: the names of the locks which shall be released
1053
        (defaults to all the locks acquired at that level).
1054

1055
    """
1056
    assert self._is_owned(), ("release() on lock set %s while not owner" %
1057
                              self.name)
1058

    
1059
    # Support passing in a single resource to release rather than many
1060
    if isinstance(names, basestring):
1061
      names = [names]
1062

    
1063
    if names is None:
1064
      names = self._list_owned()
1065
    else:
1066
      names = set(names)
1067
      assert self._list_owned().issuperset(names), (
1068
               "release() on unheld resources %s (set %s)" %
1069
               (names.difference(self._list_owned()), self.name))
1070

    
1071
    # First of all let's release the "all elements" lock, if set.
1072
    # After this 'add' can work again
1073
    if self.__lock._is_owned():
1074
      self.__lock.release()
1075
      self._del_owned()
1076

    
1077
    for lockname in names:
1078
      # If we are sure the lock doesn't leave __lockdict without being
1079
      # exclusively held we can do this...
1080
      self.__lockdict[lockname].release()
1081
      self._del_owned(name=lockname)
1082

    
1083
  def add(self, names, acquired=0, shared=0):
1084
    """Add a new set of elements to the set
1085

1086
    @type names: list of strings
1087
    @param names: names of the new elements to add
1088
    @type acquired: integer (0/1) used as a boolean
1089
    @param acquired: pre-acquire the new resource?
1090
    @type shared: integer (0/1) used as a boolean
1091
    @param shared: is the pre-acquisition shared?
1092

1093
    """
1094
    # Check we don't already own locks at this level
1095
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1096
      ("Cannot add locks if the set %s is only partially owned, or shared" %
1097
       self.name)
1098

    
1099
    # Support passing in a single resource to add rather than many
1100
    if isinstance(names, basestring):
1101
      names = [names]
1102

    
1103
    # If we don't already own the set-level lock acquired in an exclusive way
1104
    # we'll get it and note we need to release it later.
1105
    release_lock = False
1106
    if not self.__lock._is_owned():
1107
      release_lock = True
1108
      self.__lock.acquire()
1109

    
1110
    try:
1111
      invalid_names = set(self.__names()).intersection(names)
1112
      if invalid_names:
1113
        # This must be an explicit raise, not an assert, because assert is
1114
        # turned off when using optimization, and this can happen because of
1115
        # concurrency even if the user doesn't want it.
1116
        raise errors.LockError("duplicate add(%s) on lockset %s" %
1117
                               (invalid_names, self.name))
1118

    
1119
      for lockname in names:
1120
        lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1121

    
1122
        if acquired:
1123
          lock.acquire(shared=shared)
1124
          # now the lock cannot be deleted, we have it!
1125
          try:
1126
            self._add_owned(name=lockname)
1127
          except:
1128
            # We shouldn't have problems adding the lock to the owners list,
1129
            # but if we did we'll try to release this lock and re-raise
1130
            # exception.  Of course something is going to be really wrong,
1131
            # after this.  On the other hand the lock hasn't been added to the
1132
            # __lockdict yet so no other threads should be pending on it. This
1133
            # release is just a safety measure.
1134
            lock.release()
1135
            raise
1136

    
1137
        self.__lockdict[lockname] = lock
1138

    
1139
    finally:
1140
      # Only release __lock if we were not holding it previously.
1141
      if release_lock:
1142
        self.__lock.release()
1143

    
1144
    return True
1145

    
1146
  def remove(self, names):
1147
    """Remove elements from the lock set.
1148

1149
    You can either not hold anything in the lockset or already hold a superset
1150
    of the elements you want to delete, exclusively.
1151

1152
    @type names: list of strings
1153
    @param names: names of the resource to remove.
1154

1155
    @return: a list of locks which we removed; the list is always
1156
        equal to the names list if we were holding all the locks
1157
        exclusively
1158

1159
    """
1160
    # Support passing in a single resource to remove rather than many
1161
    if isinstance(names, basestring):
1162
      names = [names]
1163

    
1164
    # If we own any subset of this lock it must be a superset of what we want
1165
    # to delete. The ownership must also be exclusive, but that will be checked
1166
    # by the lock itself.
1167
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1168
      "remove() on acquired lockset %s while not owning all elements" %
1169
      self.name)
1170

    
1171
    removed = []
1172

    
1173
    for lname in names:
1174
      # Calling delete() acquires the lock exclusively if we don't already own
1175
      # it, and causes all pending and subsequent lock acquires to fail. It's
1176
      # fine to call it out of order because delete() also implies release(),
1177
      # and the assertion above guarantees that if we either already hold
1178
      # everything we want to delete, or we hold none.
1179
      try:
1180
        self.__lockdict[lname].delete()
1181
        removed.append(lname)
1182
      except (KeyError, errors.LockError):
1183
        # This cannot happen if we were already holding it, verify:
1184
        assert not self._is_owned(), ("remove failed while holding lockset %s"
1185
                                      % self.name)
1186
      else:
1187
        # If no LockError was raised we are the ones who deleted the lock.
1188
        # This means we can safely remove it from lockdict, as any further or
1189
        # pending delete() or acquire() will fail (and nobody can have the lock
1190
        # since before our call to delete()).
1191
        #
1192
        # This is done in an else clause because if the exception was thrown
1193
        # it's the job of the one who actually deleted it.
1194
        del self.__lockdict[lname]
1195
        # And let's remove it from our private list if we owned it.
1196
        if self._is_owned():
1197
          self._del_owned(name=lname)
1198

    
1199
    return removed
1200

    
1201

    
1202
# Locking levels, must be acquired in increasing order.
1203
# Current rules are:
1204
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1205
#   acquired before performing any operation, either in shared or in exclusive
1206
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1207
#   avoided.
1208
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1209
#   If you need more than one node, or more than one instance, acquire them at
1210
#   the same time.
1211
LEVEL_CLUSTER = 0
1212
LEVEL_INSTANCE = 1
1213
LEVEL_NODE = 2
1214

    
1215
LEVELS = [LEVEL_CLUSTER,
1216
          LEVEL_INSTANCE,
1217
          LEVEL_NODE]
1218

    
1219
# Lock levels which are modifiable
1220
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1221

    
1222
LEVEL_NAMES = {
1223
  LEVEL_CLUSTER: "cluster",
1224
  LEVEL_INSTANCE: "instance",
1225
  LEVEL_NODE: "node",
1226
  }
1227

    
1228
# Constant for the big ganeti lock
1229
BGL = 'BGL'
1230

    
1231

    
1232
class GanetiLockManager:
1233
  """The Ganeti Locking Library
1234

1235
  The purpose of this small library is to manage locking for ganeti clusters
1236
  in a central place, while at the same time doing dynamic checks against
1237
  possible deadlocks. It will also make it easier to transition to a different
1238
  lock type should we migrate away from python threads.
1239

1240
  """
1241
  _instance = None
1242

    
1243
  def __init__(self, nodes=None, instances=None):
1244
    """Constructs a new GanetiLockManager object.
1245

1246
    There should be only a GanetiLockManager object at any time, so this
1247
    function raises an error if this is not the case.
1248

1249
    @param nodes: list of node names
1250
    @param instances: list of instance names
1251

1252
    """
1253
    assert self.__class__._instance is None, \
1254
           "double GanetiLockManager instance"
1255

    
1256
    self.__class__._instance = self
1257

    
1258
    self._monitor = LockMonitor()
1259

    
1260
    # The keyring contains all the locks, at their level and in the correct
1261
    # locking order.
1262
    self.__keyring = {
1263
      LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1264
      LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1265
      LEVEL_INSTANCE: LockSet(instances, "instances",
1266
                              monitor=self._monitor),
1267
      }
1268

    
1269
  def QueryLocks(self, fields, sync):
1270
    """Queries information from all locks.
1271

1272
    See L{LockMonitor.QueryLocks}.
1273

1274
    """
1275
    return self._monitor.QueryLocks(fields, sync)
1276

    
1277
  def _names(self, level):
1278
    """List the lock names at the given level.
1279

1280
    This can be used for debugging/testing purposes.
1281

1282
    @param level: the level whose list of locks to get
1283

1284
    """
1285
    assert level in LEVELS, "Invalid locking level %s" % level
1286
    return self.__keyring[level]._names()
1287

    
1288
  def _is_owned(self, level):
1289
    """Check whether we are owning locks at the given level
1290

1291
    """
1292
    return self.__keyring[level]._is_owned()
1293

    
1294
  is_owned = _is_owned
1295

    
1296
  def _list_owned(self, level):
1297
    """Get the set of owned locks at the given level
1298

1299
    """
1300
    return self.__keyring[level]._list_owned()
1301

    
1302
  def _upper_owned(self, level):
1303
    """Check that we don't own any lock at a level greater than the given one.
1304

1305
    """
1306
    # This way of checking only works if LEVELS[i] = i, which we check for in
1307
    # the test cases.
1308
    return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1309

    
1310
  def _BGL_owned(self): # pylint: disable-msg=C0103
1311
    """Check if the current thread owns the BGL.
1312

1313
    Both an exclusive or a shared acquisition work.
1314

1315
    """
1316
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1317

    
1318
  @staticmethod
1319
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1320
    """Check if the level contains the BGL.
1321

1322
    Check if acting on the given level and set of names will change
1323
    the status of the Big Ganeti Lock.
1324

1325
    """
1326
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1327

    
1328
  def acquire(self, level, names, timeout=None, shared=0):
1329
    """Acquire a set of resource locks, at the same level.
1330

1331
    @type level: member of locking.LEVELS
1332
    @param level: the level at which the locks shall be acquired
1333
    @type names: list of strings (or string)
1334
    @param names: the names of the locks which shall be acquired
1335
        (special lock names, or instance/node names)
1336
    @type shared: integer (0/1) used as a boolean
1337
    @param shared: whether to acquire in shared mode; by default
1338
        an exclusive lock will be acquired
1339
    @type timeout: float
1340
    @param timeout: Maximum time to acquire all locks
1341

1342
    """
1343
    assert level in LEVELS, "Invalid locking level %s" % level
1344

    
1345
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1346
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1347
    # so even if we've migrated we need to at least share the BGL to be
1348
    # compatible with them. Of course if we own the BGL exclusively there's no
1349
    # point in acquiring any other lock, unless perhaps we are half way through
1350
    # the migration of the current opcode.
1351
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1352
            "You must own the Big Ganeti Lock before acquiring any other")
1353

    
1354
    # Check we don't own locks at the same or upper levels.
1355
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1356
           " while owning some at a greater one")
1357

    
1358
    # Acquire the locks in the set.
1359
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1360

    
1361
  def release(self, level, names=None):
1362
    """Release a set of resource locks, at the same level.
1363

1364
    You must have acquired the locks, either in shared or in exclusive
1365
    mode, before releasing them.
1366

1367
    @type level: member of locking.LEVELS
1368
    @param level: the level at which the locks shall be released
1369
    @type names: list of strings, or None
1370
    @param names: the names of the locks which shall be released
1371
        (defaults to all the locks acquired at that level)
1372

1373
    """
1374
    assert level in LEVELS, "Invalid locking level %s" % level
1375
    assert (not self._contains_BGL(level, names) or
1376
            not self._upper_owned(LEVEL_CLUSTER)), (
1377
            "Cannot release the Big Ganeti Lock while holding something"
1378
            " at upper levels (%r)" %
1379
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1380
                              for i in self.__keyring.keys()]), ))
1381

    
1382
    # Release will complain if we don't own the locks already
1383
    return self.__keyring[level].release(names)
1384

    
1385
  def add(self, level, names, acquired=0, shared=0):
1386
    """Add locks at the specified level.
1387

1388
    @type level: member of locking.LEVELS_MOD
1389
    @param level: the level at which the locks shall be added
1390
    @type names: list of strings
1391
    @param names: names of the locks to acquire
1392
    @type acquired: integer (0/1) used as a boolean
1393
    @param acquired: whether to acquire the newly added locks
1394
    @type shared: integer (0/1) used as a boolean
1395
    @param shared: whether the acquisition will be shared
1396

1397
    """
1398
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1399
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1400
           " operations")
1401
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1402
           " while owning some at a greater one")
1403
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1404

    
1405
  def remove(self, level, names):
1406
    """Remove locks from the specified level.
1407

1408
    You must either already own the locks you are trying to remove
1409
    exclusively or not own any lock at an upper level.
1410

1411
    @type level: member of locking.LEVELS_MOD
1412
    @param level: the level at which the locks shall be removed
1413
    @type names: list of strings
1414
    @param names: the names of the locks which shall be removed
1415
        (special lock names, or instance/node names)
1416

1417
    """
1418
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1419
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1420
           " operations")
1421
    # Check we either own the level or don't own anything from here
1422
    # up. LockSet.remove() will check the case in which we don't own
1423
    # all the needed resources, or we have a shared ownership.
1424
    assert self._is_owned(level) or not self._upper_owned(level), (
1425
           "Cannot remove locks at a level while not owning it or"
1426
           " owning some at a greater one")
1427
    return self.__keyring[level].remove(names)
1428

    
1429

    
1430
class LockMonitor(object):
1431
  _LOCK_ATTR = "_lock"
1432

    
1433
  def __init__(self):
1434
    """Initializes this class.
1435

1436
    """
1437
    self._lock = SharedLock("LockMonitor")
1438

    
1439
    # Tracked locks. Weak references are used to avoid issues with circular
1440
    # references and deletion.
1441
    self._locks = weakref.WeakKeyDictionary()
1442

    
1443
  @ssynchronized(_LOCK_ATTR)
1444
  def RegisterLock(self, lock):
1445
    """Registers a new lock.
1446

1447
    """
1448
    logging.debug("Registering lock %s", lock.name)
1449
    assert lock not in self._locks, "Duplicate lock registration"
1450
    assert not compat.any(lock.name == i.name for i in self._locks.keys()), \
1451
           "Found duplicate lock name"
1452
    self._locks[lock] = None
1453

    
1454
  @ssynchronized(_LOCK_ATTR)
1455
  def _GetLockInfo(self, fields):
1456
    """Get information from all locks while the monitor lock is held.
1457

1458
    """
1459
    result = {}
1460

    
1461
    for lock in self._locks.keys():
1462
      assert lock.name not in result, "Found duplicate lock name"
1463
      result[lock.name] = lock.GetInfo(fields)
1464

    
1465
    return result
1466

    
1467
  def QueryLocks(self, fields, sync):
1468
    """Queries information from all locks.
1469

1470
    @type fields: list of strings
1471
    @param fields: List of fields to return
1472
    @type sync: boolean
1473
    @param sync: Whether to operate in synchronous mode
1474

1475
    """
1476
    if sync:
1477
      raise NotImplementedError("Synchronous queries are not implemented")
1478

    
1479
    # Get all data without sorting
1480
    result = self._GetLockInfo(fields)
1481

    
1482
    # Sort by name
1483
    return [result[name] for name in utils.NiceSort(result.keys())]