Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 413b7472

History | View | Annotate | Download (39.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
# pylint: disable-msg=W0212
24

    
25
# W0212 since e.g. LockSet methods use (a lot) the internals of
26
# SharedLock
27

    
28
import os
29
import select
30
import threading
31
import time
32
import errno
33

    
34
from ganeti import errors
35
from ganeti import utils
36

    
37

    
38
def ssynchronized(lock, shared=0):
39
  """Shared Synchronization decorator.
40

41
  Calls the function holding the given lock, either in exclusive or shared
42
  mode. It requires the passed lock to be a SharedLock (or support its
43
  semantics).
44

45
  """
46
  def wrap(fn):
47
    def sync_function(*args, **kwargs):
48
      lock.acquire(shared=shared)
49
      try:
50
        return fn(*args, **kwargs)
51
      finally:
52
        lock.release()
53
    return sync_function
54
  return wrap
55

    
56

    
57
class RunningTimeout(object):
58
  """Class to calculate remaining timeout when doing several operations.
59

60
  """
61
  __slots__ = [
62
    "_allow_negative",
63
    "_start_time",
64
    "_time_fn",
65
    "_timeout",
66
    ]
67

    
68
  def __init__(self, timeout, allow_negative, _time_fn=time.time):
69
    """Initializes this class.
70

71
    @type timeout: float
72
    @param timeout: Timeout duration
73
    @type allow_negative: bool
74
    @param allow_negative: Whether to return values below zero
75
    @param _time_fn: Time function for unittests
76

77
    """
78
    object.__init__(self)
79

    
80
    if timeout is not None and timeout < 0.0:
81
      raise ValueError("Timeout must not be negative")
82

    
83
    self._timeout = timeout
84
    self._allow_negative = allow_negative
85
    self._time_fn = _time_fn
86

    
87
    self._start_time = None
88

    
89
  def Remaining(self):
90
    """Returns the remaining timeout.
91

92
    """
93
    if self._timeout is None:
94
      return None
95

    
96
    # Get start time on first calculation
97
    if self._start_time is None:
98
      self._start_time = self._time_fn()
99

    
100
    # Calculate remaining time
101
    remaining_timeout = self._start_time + self._timeout - self._time_fn()
102

    
103
    if not self._allow_negative:
104
      # Ensure timeout is always >= 0
105
      return max(0.0, remaining_timeout)
106

    
107
    return remaining_timeout
108

    
109

    
110
class _SingleNotifyPipeConditionWaiter(object):
111
  """Helper class for SingleNotifyPipeCondition
112

113
  """
114
  __slots__ = [
115
    "_fd",
116
    "_poller",
117
    ]
118

    
119
  def __init__(self, poller, fd):
120
    """Constructor for _SingleNotifyPipeConditionWaiter
121

122
    @type poller: select.poll
123
    @param poller: Poller object
124
    @type fd: int
125
    @param fd: File descriptor to wait for
126

127
    """
128
    object.__init__(self)
129
    self._poller = poller
130
    self._fd = fd
131

    
132
  def __call__(self, timeout):
133
    """Wait for something to happen on the pipe.
134

135
    @type timeout: float or None
136
    @param timeout: Timeout for waiting (can be None)
137

138
    """
139
    running_timeout = RunningTimeout(timeout, True)
140

    
141
    while True:
142
      remaining_time = running_timeout.Remaining()
143

    
144
      if remaining_time is not None and remaining_time < 0.0:
145
        break
146

    
147
      try:
148
        # Our calculation uses seconds, poll() wants milliseconds
149
        result = self._poller.poll(1000 * remaining_time)
150
      except EnvironmentError, err:
151
        if err.errno != errno.EINTR:
152
          raise
153
        result = None
154

    
155
      # Check whether we were notified
156
      if result and result[0][0] == self._fd:
157
        break
158

    
159

    
160
class _BaseCondition(object):
161
  """Base class containing common code for conditions.
162

163
  Some of this code is taken from python's threading module.
164

165
  """
166
  __slots__ = [
167
    "_lock",
168
    "acquire",
169
    "release",
170
    ]
171

    
172
  def __init__(self, lock):
173
    """Constructor for _BaseCondition.
174

175
    @type lock: threading.Lock
176
    @param lock: condition base lock
177

178
    """
179
    object.__init__(self)
180

    
181
    # Recursive locks are not supported
182
    assert not hasattr(lock, "_acquire_restore")
183
    assert not hasattr(lock, "_release_save")
184

    
185
    self._lock = lock
186

    
187
    # Export the lock's acquire() and release() methods
188
    self.acquire = lock.acquire
189
    self.release = lock.release
190

    
191
  def _is_owned(self):
192
    """Check whether lock is owned by current thread.
193

194
    """
195
    if self._lock.acquire(0):
196
      self._lock.release()
197
      return False
198

    
199
    return True
200

    
201
  def _check_owned(self):
202
    """Raise an exception if the current thread doesn't own the lock.
203

204
    """
205
    if not self._is_owned():
206
      raise RuntimeError("cannot work with un-aquired lock")
207

    
208

    
209
class SingleNotifyPipeCondition(_BaseCondition):
210
  """Condition which can only be notified once.
211

212
  This condition class uses pipes and poll, internally, to be able to wait for
213
  notification with a timeout, without resorting to polling. It is almost
214
  compatible with Python's threading.Condition, with the following differences:
215
    - notifyAll can only be called once, and no wait can happen after that
216
    - notify is not supported, only notifyAll
217

218
  """
219

    
220
  __slots__ = _BaseCondition.__slots__ + [
221
    "_poller",
222
    "_read_fd",
223
    "_write_fd",
224
    "_nwaiters",
225
    "_notified",
226
    ]
227

    
228
  _waiter_class = _SingleNotifyPipeConditionWaiter
229

    
230
  def __init__(self, lock):
231
    """Constructor for SingleNotifyPipeCondition
232

233
    """
234
    _BaseCondition.__init__(self, lock)
235
    self._nwaiters = 0
236
    self._notified = False
237
    self._read_fd = None
238
    self._write_fd = None
239
    self._poller = None
240

    
241
  def _check_unnotified(self):
242
    """Throws an exception if already notified.
243

244
    """
245
    if self._notified:
246
      raise RuntimeError("cannot use already notified condition")
247

    
248
  def _Cleanup(self):
249
    """Cleanup open file descriptors, if any.
250

251
    """
252
    if self._read_fd is not None:
253
      os.close(self._read_fd)
254
      self._read_fd = None
255

    
256
    if self._write_fd is not None:
257
      os.close(self._write_fd)
258
      self._write_fd = None
259
    self._poller = None
260

    
261
  def wait(self, timeout=None):
262
    """Wait for a notification.
263

264
    @type timeout: float or None
265
    @param timeout: Waiting timeout (can be None)
266

267
    """
268
    self._check_owned()
269
    self._check_unnotified()
270

    
271
    self._nwaiters += 1
272
    try:
273
      if self._poller is None:
274
        (self._read_fd, self._write_fd) = os.pipe()
275
        self._poller = select.poll()
276
        self._poller.register(self._read_fd, select.POLLHUP)
277

    
278
      wait_fn = self._waiter_class(self._poller, self._read_fd)
279
      self.release()
280
      try:
281
        # Wait for notification
282
        wait_fn(timeout)
283
      finally:
284
        # Re-acquire lock
285
        self.acquire()
286
    finally:
287
      self._nwaiters -= 1
288
      if self._nwaiters == 0:
289
        self._Cleanup()
290

    
291
  def notifyAll(self): # pylint: disable-msg=C0103
292
    """Close the writing side of the pipe to notify all waiters.
293

294
    """
295
    self._check_owned()
296
    self._check_unnotified()
297
    self._notified = True
298
    if self._write_fd is not None:
299
      os.close(self._write_fd)
300
      self._write_fd = None
301

    
302

    
303
class PipeCondition(_BaseCondition):
304
  """Group-only non-polling condition with counters.
305

306
  This condition class uses pipes and poll, internally, to be able to wait for
307
  notification with a timeout, without resorting to polling. It is almost
308
  compatible with Python's threading.Condition, but only supports notifyAll and
309
  non-recursive locks. As an additional features it's able to report whether
310
  there are any waiting threads.
311

312
  """
313
  __slots__ = _BaseCondition.__slots__ + [
314
    "_nwaiters",
315
    "_single_condition",
316
    ]
317

    
318
  _single_condition_class = SingleNotifyPipeCondition
319

    
320
  def __init__(self, lock):
321
    """Initializes this class.
322

323
    """
324
    _BaseCondition.__init__(self, lock)
325
    self._nwaiters = 0
326
    self._single_condition = self._single_condition_class(self._lock)
327

    
328
  def wait(self, timeout=None):
329
    """Wait for a notification.
330

331
    @type timeout: float or None
332
    @param timeout: Waiting timeout (can be None)
333

334
    """
335
    self._check_owned()
336

    
337
    # Keep local reference to the pipe. It could be replaced by another thread
338
    # notifying while we're waiting.
339
    my_condition = self._single_condition
340

    
341
    assert self._nwaiters >= 0
342
    self._nwaiters += 1
343
    try:
344
      my_condition.wait(timeout)
345
    finally:
346
      assert self._nwaiters > 0
347
      self._nwaiters -= 1
348

    
349
  def notifyAll(self): # pylint: disable-msg=C0103
350
    """Notify all currently waiting threads.
351

352
    """
353
    self._check_owned()
354
    self._single_condition.notifyAll()
355
    self._single_condition = self._single_condition_class(self._lock)
356

    
357
  def has_waiting(self):
358
    """Returns whether there are active waiters.
359

360
    """
361
    self._check_owned()
362

    
363
    return bool(self._nwaiters)
364

    
365

    
366
class _CountingCondition(object):
367
  """Wrapper for Python's built-in threading.Condition class.
368

369
  This wrapper keeps a count of active waiters. We can't access the internal
370
  "__waiters" attribute of threading.Condition because it's not thread-safe.
371

372
  """
373
  __slots__ = [
374
    "_cond",
375
    "_nwaiters",
376
    ]
377

    
378
  def __init__(self, lock):
379
    """Initializes this class.
380

381
    """
382
    object.__init__(self)
383
    self._cond = threading.Condition(lock=lock)
384
    self._nwaiters = 0
385

    
386
  def notifyAll(self): # pylint: disable-msg=C0103
387
    """Notifies the condition.
388

389
    """
390
    return self._cond.notifyAll()
391

    
392
  def wait(self, timeout=None):
393
    """Waits for the condition to be notified.
394

395
    @type timeout: float or None
396
    @param timeout: Waiting timeout (can be None)
397

398
    """
399
    assert self._nwaiters >= 0
400

    
401
    self._nwaiters += 1
402
    try:
403
      return self._cond.wait(timeout=timeout)
404
    finally:
405
      self._nwaiters -= 1
406

    
407
  def has_waiting(self):
408
    """Returns whether there are active waiters.
409

410
    """
411
    return bool(self._nwaiters)
412

    
413

    
414
class SharedLock(object):
415
  """Implements a shared lock.
416

417
  Multiple threads can acquire the lock in a shared way, calling
418
  acquire_shared().  In order to acquire the lock in an exclusive way threads
419
  can call acquire_exclusive().
420

421
  The lock prevents starvation but does not guarantee that threads will acquire
422
  the shared lock in the order they queued for it, just that they will
423
  eventually do so.
424

425
  """
426
  __slots__ = [
427
    "__active_shr_c",
428
    "__inactive_shr_c",
429
    "__deleted",
430
    "__exc",
431
    "__lock",
432
    "__pending",
433
    "__shr",
434
    ]
435

    
436
  __condition_class = PipeCondition
437

    
438
  def __init__(self):
439
    """Construct a new SharedLock.
440

441
    """
442
    object.__init__(self)
443

    
444
    # Internal lock
445
    self.__lock = threading.Lock()
446

    
447
    # Queue containing waiting acquires
448
    self.__pending = []
449

    
450
    # Active and inactive conditions for shared locks
451
    self.__active_shr_c = self.__condition_class(self.__lock)
452
    self.__inactive_shr_c = self.__condition_class(self.__lock)
453

    
454
    # Current lock holders
455
    self.__shr = set()
456
    self.__exc = None
457

    
458
    # is this lock in the deleted state?
459
    self.__deleted = False
460

    
461
  def __check_deleted(self):
462
    """Raises an exception if the lock has been deleted.
463

464
    """
465
    if self.__deleted:
466
      raise errors.LockError("Deleted lock")
467

    
468
  def __is_sharer(self):
469
    """Is the current thread sharing the lock at this time?
470

471
    """
472
    return threading.currentThread() in self.__shr
473

    
474
  def __is_exclusive(self):
475
    """Is the current thread holding the lock exclusively at this time?
476

477
    """
478
    return threading.currentThread() == self.__exc
479

    
480
  def __is_owned(self, shared=-1):
481
    """Is the current thread somehow owning the lock at this time?
482

483
    This is a private version of the function, which presumes you're holding
484
    the internal lock.
485

486
    """
487
    if shared < 0:
488
      return self.__is_sharer() or self.__is_exclusive()
489
    elif shared:
490
      return self.__is_sharer()
491
    else:
492
      return self.__is_exclusive()
493

    
494
  def _is_owned(self, shared=-1):
495
    """Is the current thread somehow owning the lock at this time?
496

497
    @param shared:
498
        - < 0: check for any type of ownership (default)
499
        - 0: check for exclusive ownership
500
        - > 0: check for shared ownership
501

502
    """
503
    self.__lock.acquire()
504
    try:
505
      return self.__is_owned(shared=shared)
506
    finally:
507
      self.__lock.release()
508

    
509
  def _count_pending(self):
510
    """Returns the number of pending acquires.
511

512
    @rtype: int
513

514
    """
515
    self.__lock.acquire()
516
    try:
517
      return len(self.__pending)
518
    finally:
519
      self.__lock.release()
520

    
521
  def __do_acquire(self, shared):
522
    """Actually acquire the lock.
523

524
    """
525
    if shared:
526
      self.__shr.add(threading.currentThread())
527
    else:
528
      self.__exc = threading.currentThread()
529

    
530
  def __can_acquire(self, shared):
531
    """Determine whether lock can be acquired.
532

533
    """
534
    if shared:
535
      return self.__exc is None
536
    else:
537
      return len(self.__shr) == 0 and self.__exc is None
538

    
539
  def __is_on_top(self, cond):
540
    """Checks whether the passed condition is on top of the queue.
541

542
    The caller must make sure the queue isn't empty.
543

544
    """
545
    return self.__pending[0] == cond
546

    
547
  def __acquire_unlocked(self, shared, timeout):
548
    """Acquire a shared lock.
549

550
    @param shared: whether to acquire in shared mode; by default an
551
        exclusive lock will be acquired
552
    @param timeout: maximum waiting time before giving up
553

554
    """
555
    self.__check_deleted()
556

    
557
    # We cannot acquire the lock if we already have it
558
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
559

    
560
    # Check whether someone else holds the lock or there are pending acquires.
561
    if not self.__pending and self.__can_acquire(shared):
562
      # Apparently not, can acquire lock directly.
563
      self.__do_acquire(shared)
564
      return True
565

    
566
    if shared:
567
      wait_condition = self.__active_shr_c
568

    
569
      # Check if we're not yet in the queue
570
      if wait_condition not in self.__pending:
571
        self.__pending.append(wait_condition)
572
    else:
573
      wait_condition = self.__condition_class(self.__lock)
574
      # Always add to queue
575
      self.__pending.append(wait_condition)
576

    
577
    try:
578
      # Wait until we become the topmost acquire in the queue or the timeout
579
      # expires.
580
      while not (self.__is_on_top(wait_condition) and
581
                 self.__can_acquire(shared)):
582
        # Wait for notification
583
        wait_condition.wait(timeout)
584
        self.__check_deleted()
585

    
586
        # A lot of code assumes blocking acquires always succeed. Loop
587
        # internally for that case.
588
        if timeout is not None:
589
          break
590

    
591
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
592
        self.__do_acquire(shared)
593
        return True
594
    finally:
595
      # Remove condition from queue if there are no more waiters
596
      if not wait_condition.has_waiting() and not self.__deleted:
597
        self.__pending.remove(wait_condition)
598

    
599
    return False
600

    
601
  def acquire(self, shared=0, timeout=None, test_notify=None):
602
    """Acquire a shared lock.
603

604
    @type shared: int
605
    @param shared: whether to acquire in shared mode; by default an
606
        exclusive lock will be acquired
607
    @type timeout: float
608
    @param timeout: maximum waiting time before giving up
609
    @type test_notify: callable or None
610
    @param test_notify: Special callback function for unittesting
611

612
    """
613
    self.__lock.acquire()
614
    try:
615
      # We already got the lock, notify now
616
      if __debug__ and callable(test_notify):
617
        test_notify()
618

    
619
      return self.__acquire_unlocked(shared, timeout)
620
    finally:
621
      self.__lock.release()
622

    
623
  def release(self):
624
    """Release a Shared Lock.
625

626
    You must have acquired the lock, either in shared or in exclusive mode,
627
    before calling this function.
628

629
    """
630
    self.__lock.acquire()
631
    try:
632
      assert self.__is_exclusive() or self.__is_sharer(), \
633
        "Cannot release non-owned lock"
634

    
635
      # Autodetect release type
636
      if self.__is_exclusive():
637
        self.__exc = None
638
      else:
639
        self.__shr.remove(threading.currentThread())
640

    
641
      # Notify topmost condition in queue
642
      if self.__pending:
643
        first_condition = self.__pending[0]
644
        first_condition.notifyAll()
645

    
646
        if first_condition == self.__active_shr_c:
647
          self.__active_shr_c = self.__inactive_shr_c
648
          self.__inactive_shr_c = first_condition
649

    
650
    finally:
651
      self.__lock.release()
652

    
653
  def delete(self, timeout=None):
654
    """Delete a Shared Lock.
655

656
    This operation will declare the lock for removal. First the lock will be
657
    acquired in exclusive mode if you don't already own it, then the lock
658
    will be put in a state where any future and pending acquire() fail.
659

660
    @type timeout: float
661
    @param timeout: maximum waiting time before giving up
662

663
    """
664
    self.__lock.acquire()
665
    try:
666
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
667

    
668
      self.__check_deleted()
669

    
670
      # The caller is allowed to hold the lock exclusively already.
671
      acquired = self.__is_exclusive()
672

    
673
      if not acquired:
674
        acquired = self.__acquire_unlocked(0, timeout)
675

    
676
        assert self.__is_exclusive() and not self.__is_sharer(), \
677
          "Lock wasn't acquired in exclusive mode"
678

    
679
      if acquired:
680
        self.__deleted = True
681
        self.__exc = None
682

    
683
        # Notify all acquires. They'll throw an error.
684
        while self.__pending:
685
          self.__pending.pop().notifyAll()
686

    
687
      return acquired
688
    finally:
689
      self.__lock.release()
690

    
691

    
692
# Whenever we want to acquire a full LockSet we pass None as the value
693
# to acquire.  Hide this behind this nicely named constant.
694
ALL_SET = None
695

    
696

    
697
class _AcquireTimeout(Exception):
698
  """Internal exception to abort an acquire on a timeout.
699

700
  """
701

    
702

    
703
class LockSet:
704
  """Implements a set of locks.
705

706
  This abstraction implements a set of shared locks for the same resource type,
707
  distinguished by name. The user can lock a subset of the resources and the
708
  LockSet will take care of acquiring the locks always in the same order, thus
709
  preventing deadlock.
710

711
  All the locks needed in the same set must be acquired together, though.
712

713
  """
714
  def __init__(self, members=None):
715
    """Constructs a new LockSet.
716

717
    @param members: initial members of the set
718

719
    """
720
    # Used internally to guarantee coherency.
721
    self.__lock = SharedLock()
722

    
723
    # The lockdict indexes the relationship name -> lock
724
    # The order-of-locking is implied by the alphabetical order of names
725
    self.__lockdict = {}
726

    
727
    if members is not None:
728
      for name in members:
729
        self.__lockdict[name] = SharedLock()
730

    
731
    # The owner dict contains the set of locks each thread owns. For
732
    # performance each thread can access its own key without a global lock on
733
    # this structure. It is paramount though that *no* other type of access is
734
    # done to this structure (eg. no looping over its keys). *_owner helper
735
    # function are defined to guarantee access is correct, but in general never
736
    # do anything different than __owners[threading.currentThread()], or there
737
    # will be trouble.
738
    self.__owners = {}
739

    
740
  def _is_owned(self):
741
    """Is the current thread a current level owner?"""
742
    return threading.currentThread() in self.__owners
743

    
744
  def _add_owned(self, name=None):
745
    """Note the current thread owns the given lock"""
746
    if name is None:
747
      if not self._is_owned():
748
        self.__owners[threading.currentThread()] = set()
749
    else:
750
      if self._is_owned():
751
        self.__owners[threading.currentThread()].add(name)
752
      else:
753
        self.__owners[threading.currentThread()] = set([name])
754

    
755
  def _del_owned(self, name=None):
756
    """Note the current thread owns the given lock"""
757

    
758
    assert not (name is None and self.__lock._is_owned()), \
759
           "Cannot hold internal lock when deleting owner status"
760

    
761
    if name is not None:
762
      self.__owners[threading.currentThread()].remove(name)
763

    
764
    # Only remove the key if we don't hold the set-lock as well
765
    if (not self.__lock._is_owned() and
766
        not self.__owners[threading.currentThread()]):
767
      del self.__owners[threading.currentThread()]
768

    
769
  def _list_owned(self):
770
    """Get the set of resource names owned by the current thread"""
771
    if self._is_owned():
772
      return self.__owners[threading.currentThread()].copy()
773
    else:
774
      return set()
775

    
776
  def _release_and_delete_owned(self):
777
    """Release and delete all resources owned by the current thread"""
778
    for lname in self._list_owned():
779
      lock = self.__lockdict[lname]
780
      if lock._is_owned():
781
        lock.release()
782
      self._del_owned(name=lname)
783

    
784
  def __names(self):
785
    """Return the current set of names.
786

787
    Only call this function while holding __lock and don't iterate on the
788
    result after releasing the lock.
789

790
    """
791
    return self.__lockdict.keys()
792

    
793
  def _names(self):
794
    """Return a copy of the current set of elements.
795

796
    Used only for debugging purposes.
797

798
    """
799
    # If we don't already own the set-level lock acquired
800
    # we'll get it and note we need to release it later.
801
    release_lock = False
802
    if not self.__lock._is_owned():
803
      release_lock = True
804
      self.__lock.acquire(shared=1)
805
    try:
806
      result = self.__names()
807
    finally:
808
      if release_lock:
809
        self.__lock.release()
810
    return set(result)
811

    
812
  def acquire(self, names, timeout=None, shared=0, test_notify=None):
813
    """Acquire a set of resource locks.
814

815
    @param names: the names of the locks which shall be acquired
816
        (special lock names, or instance/node names)
817
    @param shared: whether to acquire in shared mode; by default an
818
        exclusive lock will be acquired
819
    @type timeout: float or None
820
    @param timeout: Maximum time to acquire all locks
821
    @type test_notify: callable or None
822
    @param test_notify: Special callback function for unittesting
823

824
    @return: Set of all locks successfully acquired or None in case of timeout
825

826
    @raise errors.LockError: when any lock we try to acquire has
827
        been deleted before we succeed. In this case none of the
828
        locks requested will be acquired.
829

830
    """
831
    assert timeout is None or timeout >= 0.0
832

    
833
    # Check we don't already own locks at this level
834
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
835

    
836
    # We need to keep track of how long we spent waiting for a lock. The
837
    # timeout passed to this function is over all lock acquires.
838
    running_timeout = RunningTimeout(timeout, False)
839

    
840
    try:
841
      if names is not None:
842
        # Support passing in a single resource to acquire rather than many
843
        if isinstance(names, basestring):
844
          names = [names]
845

    
846
        return self.__acquire_inner(names, False, shared,
847
                                    running_timeout.Remaining, test_notify)
848

    
849
      else:
850
        # If no names are given acquire the whole set by not letting new names
851
        # being added before we release, and getting the current list of names.
852
        # Some of them may then be deleted later, but we'll cope with this.
853
        #
854
        # We'd like to acquire this lock in a shared way, as it's nice if
855
        # everybody else can use the instances at the same time. If are
856
        # acquiring them exclusively though they won't be able to do this
857
        # anyway, though, so we'll get the list lock exclusively as well in
858
        # order to be able to do add() on the set while owning it.
859
        if not self.__lock.acquire(shared=shared,
860
                                   timeout=running_timeout.Remaining()):
861
          raise _AcquireTimeout()
862
        try:
863
          # note we own the set-lock
864
          self._add_owned()
865

    
866
          return self.__acquire_inner(self.__names(), True, shared,
867
                                      running_timeout.Remaining, test_notify)
868
        except:
869
          # We shouldn't have problems adding the lock to the owners list, but
870
          # if we did we'll try to release this lock and re-raise exception.
871
          # Of course something is going to be really wrong, after this.
872
          self.__lock.release()
873
          self._del_owned()
874
          raise
875

    
876
    except _AcquireTimeout:
877
      return None
878

    
879
  def __acquire_inner(self, names, want_all, shared, timeout_fn, test_notify):
880
    """Inner logic for acquiring a number of locks.
881

882
    @param names: Names of the locks to be acquired
883
    @param want_all: Whether all locks in the set should be acquired
884
    @param shared: Whether to acquire in shared mode
885
    @param timeout_fn: Function returning remaining timeout
886
    @param test_notify: Special callback function for unittesting
887

888
    """
889
    acquire_list = []
890

    
891
    # First we look the locks up on __lockdict. We have no way of being sure
892
    # they will still be there after, but this makes it a lot faster should
893
    # just one of them be the already wrong. Using a sorted sequence to prevent
894
    # deadlocks.
895
    for lname in sorted(utils.UniqueSequence(names)):
896
      try:
897
        lock = self.__lockdict[lname] # raises KeyError if lock is not there
898
      except KeyError:
899
        if want_all:
900
          # We are acquiring all the set, it doesn't matter if this particular
901
          # element is not there anymore.
902
          continue
903

    
904
        raise errors.LockError("Non-existing lock in set (%s)" % lname)
905

    
906
      acquire_list.append((lname, lock))
907

    
908
    # This will hold the locknames we effectively acquired.
909
    acquired = set()
910

    
911
    try:
912
      # Now acquire_list contains a sorted list of resources and locks we
913
      # want.  In order to get them we loop on this (private) list and
914
      # acquire() them.  We gave no real guarantee they will still exist till
915
      # this is done but .acquire() itself is safe and will alert us if the
916
      # lock gets deleted.
917
      for (lname, lock) in acquire_list:
918
        if __debug__ and callable(test_notify):
919
          test_notify_fn = lambda: test_notify(lname)
920
        else:
921
          test_notify_fn = None
922

    
923
        timeout = timeout_fn()
924

    
925
        try:
926
          # raises LockError if the lock was deleted
927
          acq_success = lock.acquire(shared=shared, timeout=timeout,
928
                                     test_notify=test_notify_fn)
929
        except errors.LockError:
930
          if want_all:
931
            # We are acquiring all the set, it doesn't matter if this
932
            # particular element is not there anymore.
933
            continue
934

    
935
          raise errors.LockError("Non-existing lock in set (%s)" % lname)
936

    
937
        if not acq_success:
938
          # Couldn't get lock or timeout occurred
939
          if timeout is None:
940
            # This shouldn't happen as SharedLock.acquire(timeout=None) is
941
            # blocking.
942
            raise errors.LockError("Failed to get lock %s" % lname)
943

    
944
          raise _AcquireTimeout()
945

    
946
        try:
947
          # now the lock cannot be deleted, we have it!
948
          self._add_owned(name=lname)
949
          acquired.add(lname)
950

    
951
        except:
952
          # We shouldn't have problems adding the lock to the owners list, but
953
          # if we did we'll try to release this lock and re-raise exception.
954
          # Of course something is going to be really wrong after this.
955
          if lock._is_owned():
956
            lock.release()
957
          raise
958

    
959
    except:
960
      # Release all owned locks
961
      self._release_and_delete_owned()
962
      raise
963

    
964
    return acquired
965

    
966
  def release(self, names=None):
967
    """Release a set of resource locks, at the same level.
968

969
    You must have acquired the locks, either in shared or in exclusive mode,
970
    before releasing them.
971

972
    @param names: the names of the locks which shall be released
973
        (defaults to all the locks acquired at that level).
974

975
    """
976
    assert self._is_owned(), "release() on lock set while not owner"
977

    
978
    # Support passing in a single resource to release rather than many
979
    if isinstance(names, basestring):
980
      names = [names]
981

    
982
    if names is None:
983
      names = self._list_owned()
984
    else:
985
      names = set(names)
986
      assert self._list_owned().issuperset(names), (
987
               "release() on unheld resources %s" %
988
               names.difference(self._list_owned()))
989

    
990
    # First of all let's release the "all elements" lock, if set.
991
    # After this 'add' can work again
992
    if self.__lock._is_owned():
993
      self.__lock.release()
994
      self._del_owned()
995

    
996
    for lockname in names:
997
      # If we are sure the lock doesn't leave __lockdict without being
998
      # exclusively held we can do this...
999
      self.__lockdict[lockname].release()
1000
      self._del_owned(name=lockname)
1001

    
1002
  def add(self, names, acquired=0, shared=0):
1003
    """Add a new set of elements to the set
1004

1005
    @param names: names of the new elements to add
1006
    @param acquired: pre-acquire the new resource?
1007
    @param shared: is the pre-acquisition shared?
1008

1009
    """
1010
    # Check we don't already own locks at this level
1011
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1012
      "Cannot add locks if the set is only partially owned, or shared"
1013

    
1014
    # Support passing in a single resource to add rather than many
1015
    if isinstance(names, basestring):
1016
      names = [names]
1017

    
1018
    # If we don't already own the set-level lock acquired in an exclusive way
1019
    # we'll get it and note we need to release it later.
1020
    release_lock = False
1021
    if not self.__lock._is_owned():
1022
      release_lock = True
1023
      self.__lock.acquire()
1024

    
1025
    try:
1026
      invalid_names = set(self.__names()).intersection(names)
1027
      if invalid_names:
1028
        # This must be an explicit raise, not an assert, because assert is
1029
        # turned off when using optimization, and this can happen because of
1030
        # concurrency even if the user doesn't want it.
1031
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
1032

    
1033
      for lockname in names:
1034
        lock = SharedLock()
1035

    
1036
        if acquired:
1037
          lock.acquire(shared=shared)
1038
          # now the lock cannot be deleted, we have it!
1039
          try:
1040
            self._add_owned(name=lockname)
1041
          except:
1042
            # We shouldn't have problems adding the lock to the owners list,
1043
            # but if we did we'll try to release this lock and re-raise
1044
            # exception.  Of course something is going to be really wrong,
1045
            # after this.  On the other hand the lock hasn't been added to the
1046
            # __lockdict yet so no other threads should be pending on it. This
1047
            # release is just a safety measure.
1048
            lock.release()
1049
            raise
1050

    
1051
        self.__lockdict[lockname] = lock
1052

    
1053
    finally:
1054
      # Only release __lock if we were not holding it previously.
1055
      if release_lock:
1056
        self.__lock.release()
1057

    
1058
    return True
1059

    
1060
  def remove(self, names):
1061
    """Remove elements from the lock set.
1062

1063
    You can either not hold anything in the lockset or already hold a superset
1064
    of the elements you want to delete, exclusively.
1065

1066
    @param names: names of the resource to remove.
1067

1068
    @return: a list of locks which we removed; the list is always
1069
        equal to the names list if we were holding all the locks
1070
        exclusively
1071

1072
    """
1073
    # Support passing in a single resource to remove rather than many
1074
    if isinstance(names, basestring):
1075
      names = [names]
1076

    
1077
    # If we own any subset of this lock it must be a superset of what we want
1078
    # to delete. The ownership must also be exclusive, but that will be checked
1079
    # by the lock itself.
1080
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1081
      "remove() on acquired lockset while not owning all elements")
1082

    
1083
    removed = []
1084

    
1085
    for lname in names:
1086
      # Calling delete() acquires the lock exclusively if we don't already own
1087
      # it, and causes all pending and subsequent lock acquires to fail. It's
1088
      # fine to call it out of order because delete() also implies release(),
1089
      # and the assertion above guarantees that if we either already hold
1090
      # everything we want to delete, or we hold none.
1091
      try:
1092
        self.__lockdict[lname].delete()
1093
        removed.append(lname)
1094
      except (KeyError, errors.LockError):
1095
        # This cannot happen if we were already holding it, verify:
1096
        assert not self._is_owned(), "remove failed while holding lockset"
1097
      else:
1098
        # If no LockError was raised we are the ones who deleted the lock.
1099
        # This means we can safely remove it from lockdict, as any further or
1100
        # pending delete() or acquire() will fail (and nobody can have the lock
1101
        # since before our call to delete()).
1102
        #
1103
        # This is done in an else clause because if the exception was thrown
1104
        # it's the job of the one who actually deleted it.
1105
        del self.__lockdict[lname]
1106
        # And let's remove it from our private list if we owned it.
1107
        if self._is_owned():
1108
          self._del_owned(name=lname)
1109

    
1110
    return removed
1111

    
1112

    
1113
# Locking levels, must be acquired in increasing order.
1114
# Current rules are:
1115
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1116
#   acquired before performing any operation, either in shared or in exclusive
1117
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1118
#   avoided.
1119
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1120
#   If you need more than one node, or more than one instance, acquire them at
1121
#   the same time.
1122
LEVEL_CLUSTER = 0
1123
LEVEL_INSTANCE = 1
1124
LEVEL_NODE = 2
1125

    
1126
LEVELS = [LEVEL_CLUSTER,
1127
          LEVEL_INSTANCE,
1128
          LEVEL_NODE]
1129

    
1130
# Lock levels which are modifiable
1131
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1132

    
1133
LEVEL_NAMES = {
1134
  LEVEL_CLUSTER: "cluster",
1135
  LEVEL_INSTANCE: "instance",
1136
  LEVEL_NODE: "node",
1137
  }
1138

    
1139
# Constant for the big ganeti lock
1140
BGL = 'BGL'
1141

    
1142

    
1143
class GanetiLockManager:
1144
  """The Ganeti Locking Library
1145

1146
  The purpose of this small library is to manage locking for ganeti clusters
1147
  in a central place, while at the same time doing dynamic checks against
1148
  possible deadlocks. It will also make it easier to transition to a different
1149
  lock type should we migrate away from python threads.
1150

1151
  """
1152
  _instance = None
1153

    
1154
  def __init__(self, nodes=None, instances=None):
1155
    """Constructs a new GanetiLockManager object.
1156

1157
    There should be only a GanetiLockManager object at any time, so this
1158
    function raises an error if this is not the case.
1159

1160
    @param nodes: list of node names
1161
    @param instances: list of instance names
1162

1163
    """
1164
    assert self.__class__._instance is None, \
1165
           "double GanetiLockManager instance"
1166

    
1167
    self.__class__._instance = self
1168

    
1169
    # The keyring contains all the locks, at their level and in the correct
1170
    # locking order.
1171
    self.__keyring = {
1172
      LEVEL_CLUSTER: LockSet([BGL]),
1173
      LEVEL_NODE: LockSet(nodes),
1174
      LEVEL_INSTANCE: LockSet(instances),
1175
    }
1176

    
1177
  def _names(self, level):
1178
    """List the lock names at the given level.
1179

1180
    This can be used for debugging/testing purposes.
1181

1182
    @param level: the level whose list of locks to get
1183

1184
    """
1185
    assert level in LEVELS, "Invalid locking level %s" % level
1186
    return self.__keyring[level]._names()
1187

    
1188
  def _is_owned(self, level):
1189
    """Check whether we are owning locks at the given level
1190

1191
    """
1192
    return self.__keyring[level]._is_owned()
1193

    
1194
  is_owned = _is_owned
1195

    
1196
  def _list_owned(self, level):
1197
    """Get the set of owned locks at the given level
1198

1199
    """
1200
    return self.__keyring[level]._list_owned()
1201

    
1202
  def _upper_owned(self, level):
1203
    """Check that we don't own any lock at a level greater than the given one.
1204

1205
    """
1206
    # This way of checking only works if LEVELS[i] = i, which we check for in
1207
    # the test cases.
1208
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1209

    
1210
  def _BGL_owned(self): # pylint: disable-msg=C0103
1211
    """Check if the current thread owns the BGL.
1212

1213
    Both an exclusive or a shared acquisition work.
1214

1215
    """
1216
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1217

    
1218
  @staticmethod
1219
  def _contains_BGL(level, names): # pylint: disable-msg=C0103
1220
    """Check if the level contains the BGL.
1221

1222
    Check if acting on the given level and set of names will change
1223
    the status of the Big Ganeti Lock.
1224

1225
    """
1226
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1227

    
1228
  def acquire(self, level, names, timeout=None, shared=0):
1229
    """Acquire a set of resource locks, at the same level.
1230

1231
    @param level: the level at which the locks shall be acquired;
1232
        it must be a member of LEVELS.
1233
    @param names: the names of the locks which shall be acquired
1234
        (special lock names, or instance/node names)
1235
    @param shared: whether to acquire in shared mode; by default
1236
        an exclusive lock will be acquired
1237
    @type timeout: float
1238
    @param timeout: Maximum time to acquire all locks
1239

1240
    """
1241
    assert level in LEVELS, "Invalid locking level %s" % level
1242

    
1243
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1244
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1245
    # so even if we've migrated we need to at least share the BGL to be
1246
    # compatible with them. Of course if we own the BGL exclusively there's no
1247
    # point in acquiring any other lock, unless perhaps we are half way through
1248
    # the migration of the current opcode.
1249
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1250
            "You must own the Big Ganeti Lock before acquiring any other")
1251

    
1252
    # Check we don't own locks at the same or upper levels.
1253
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1254
           " while owning some at a greater one")
1255

    
1256
    # Acquire the locks in the set.
1257
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1258

    
1259
  def release(self, level, names=None):
1260
    """Release a set of resource locks, at the same level.
1261

1262
    You must have acquired the locks, either in shared or in exclusive
1263
    mode, before releasing them.
1264

1265
    @param level: the level at which the locks shall be released;
1266
        it must be a member of LEVELS
1267
    @param names: the names of the locks which shall be released
1268
        (defaults to all the locks acquired at that level)
1269

1270
    """
1271
    assert level in LEVELS, "Invalid locking level %s" % level
1272
    assert (not self._contains_BGL(level, names) or
1273
            not self._upper_owned(LEVEL_CLUSTER)), (
1274
            "Cannot release the Big Ganeti Lock while holding something"
1275
            " at upper levels (%r)" %
1276
            (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1277
                              for i in self.__keyring.keys()]), ))
1278

    
1279
    # Release will complain if we don't own the locks already
1280
    return self.__keyring[level].release(names)
1281

    
1282
  def add(self, level, names, acquired=0, shared=0):
1283
    """Add locks at the specified level.
1284

1285
    @param level: the level at which the locks shall be added;
1286
        it must be a member of LEVELS_MOD.
1287
    @param names: names of the locks to acquire
1288
    @param acquired: whether to acquire the newly added locks
1289
    @param shared: whether the acquisition will be shared
1290

1291
    """
1292
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1293
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1294
           " operations")
1295
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1296
           " while owning some at a greater one")
1297
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1298

    
1299
  def remove(self, level, names):
1300
    """Remove locks from the specified level.
1301

1302
    You must either already own the locks you are trying to remove
1303
    exclusively or not own any lock at an upper level.
1304

1305
    @param level: the level at which the locks shall be removed;
1306
        it must be a member of LEVELS_MOD
1307
    @param names: the names of the locks which shall be removed
1308
        (special lock names, or instance/node names)
1309

1310
    """
1311
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1312
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1313
           " operations")
1314
    # Check we either own the level or don't own anything from here
1315
    # up. LockSet.remove() will check the case in which we don't own
1316
    # all the needed resources, or we have a shared ownership.
1317
    assert self._is_owned(level) or not self._upper_owned(level), (
1318
           "Cannot remove locks at a level while not owning it or"
1319
           " owning some at a greater one")
1320
    return self.__keyring[level].remove(names)