Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ 2419060d

History | View | Annotate | Download (36.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
import os
24
import select
25
import threading
26
import time
27
import errno
28

    
29
from ganeti import errors
30
from ganeti import utils
31

    
32

    
33
def ssynchronized(lock, shared=0):
34
  """Shared Synchronization decorator.
35

36
  Calls the function holding the given lock, either in exclusive or shared
37
  mode. It requires the passed lock to be a SharedLock (or support its
38
  semantics).
39

40
  """
41
  def wrap(fn):
42
    def sync_function(*args, **kwargs):
43
      lock.acquire(shared=shared)
44
      try:
45
        return fn(*args, **kwargs)
46
      finally:
47
        lock.release()
48
    return sync_function
49
  return wrap
50

    
51

    
52
class _SingleActionPipeConditionWaiter(object):
53
  """Callable helper class for _SingleActionPipeCondition.
54

55
  """
56
  __slots__ = [
57
    "_cond",
58
    "_fd",
59
    "_poller",
60
    ]
61

    
62
  def __init__(self, cond, poller, fd):
63
    """Initializes this class.
64

65
    @type cond: L{_SingleActionPipeCondition}
66
    @param cond: Parent condition
67
    @type poller: select.poll
68
    @param poller: Poller object
69
    @type fd: int
70
    @param fd: File descriptor to wait for
71

72
    """
73
    object.__init__(self)
74

    
75
    self._cond = cond
76
    self._poller = poller
77
    self._fd = fd
78

    
79
  def __call__(self, timeout):
80
    """Wait for something to happen on the pipe.
81

82
    @type timeout: float or None
83
    @param timeout: Timeout for waiting (can be None)
84

85
    """
86
    start_time = time.time()
87
    remaining_time = timeout
88

    
89
    while timeout is None or remaining_time > 0:
90
      try:
91
        result = self._poller.poll(remaining_time)
92
      except EnvironmentError, err:
93
        if err.errno != errno.EINTR:
94
          raise
95
        result = None
96

    
97
      # Check whether we were notified
98
      if result and result[0][0] == self._fd:
99
        break
100

    
101
      # Re-calculate timeout if necessary
102
      if timeout is not None:
103
        remaining_time = start_time + timeout - time.time()
104

    
105

    
106
class _BaseCondition(object):
107
  """Base class containing common code for conditions.
108

109
  Some of this code is taken from python's threading module.
110

111
  """
112
  __slots__ = [
113
    "_lock",
114
    "acquire",
115
    "release",
116
    ]
117

    
118
  def __init__(self, lock):
119
    """Constructor for _BaseCondition.
120

121
    @type lock: L{threading.Lock}
122
    @param lock: condition base lock
123

124
    """
125
    object.__init__(self)
126

    
127
    # Recursive locks are not supported
128
    assert not hasattr(lock, "_acquire_restore")
129
    assert not hasattr(lock, "_release_save")
130

    
131
    self._lock = lock
132

    
133
    # Export the lock's acquire() and release() methods
134
    self.acquire = lock.acquire
135
    self.release = lock.release
136

    
137
  def _is_owned(self):
138
    """Check whether lock is owned by current thread.
139

140
    """
141
    if self._lock.acquire(0):
142
      self._lock.release()
143
      return False
144

    
145
    return True
146

    
147
  def _check_owned(self):
148
    """Raise an exception if the current thread doesn't own the lock.
149

150
    """
151
    if not self._is_owned():
152
      raise RuntimeError("cannot work with un-aquired lock")
153

    
154

    
155
class _SingleActionPipeCondition(object):
156
  """Wrapper around a pipe for usage inside conditions.
157

158
  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
159
  always allocated when constructing the class. Extra care is taken to always
160
  close the file descriptors.
161

162
  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
163
  notifications.
164

165
  Warning: This class is designed to be used as the underlying component of a
166
  locking condition, but is not by itself thread safe, and needs to be
167
  protected by an external lock.
168

169
  """
170
  __slots__ = [
171
    "_poller",
172
    "_read_fd",
173
    "_write_fd",
174
    "_nwaiters",
175
    ]
176

    
177
  _waiter_class = _SingleActionPipeConditionWaiter
178

    
179
  def __init__(self):
180
    """Initializes this class.
181

182
    """
183
    object.__init__(self)
184

    
185
    self._nwaiters = 0
186

    
187
    # Just assume the unpacking is successful, otherwise error handling gets
188
    # very complicated.
189
    (self._read_fd, self._write_fd) = os.pipe()
190
    try:
191
      # The poller looks for closure of the write side
192
      poller = select.poll()
193
      poller.register(self._read_fd, select.POLLHUP)
194

    
195
      self._poller = poller
196
    except:
197
      if self._read_fd is not None:
198
        os.close(self._read_fd)
199
      if self._write_fd is not None:
200
        os.close(self._write_fd)
201
      raise
202

    
203
    # There should be no code here anymore, otherwise the pipe file descriptors
204
    # may be not be cleaned up properly in case of errors.
205

    
206
  def StartWaiting(self):
207
    """Return function to wait for notification.
208

209
    @rtype: L{_SingleActionPipeConditionWaiter}
210
    @return: Function to wait for notification
211

212
    """
213
    assert self._nwaiters >= 0
214

    
215
    if self._poller is None:
216
      raise RuntimeError("Already cleaned up")
217

    
218
    # Create waiter function and increase number of waiters
219
    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
220
    self._nwaiters += 1
221
    return wait_fn
222

    
223
  def DoneWaiting(self):
224
    """Decrement number of waiters and automatic cleanup.
225

226
    Must be called after waiting for a notification.
227

228
    @rtype: bool
229
    @return: Whether this was the last waiter
230

231
    """
232
    assert self._nwaiters > 0
233

    
234
    self._nwaiters -= 1
235

    
236
    if self._nwaiters == 0:
237
      self._Cleanup()
238
      return True
239

    
240
    return False
241

    
242
  def notifyAll(self):
243
    """Close the writing side of the pipe to notify all waiters.
244

245
    """
246
    if self._write_fd is None:
247
      raise RuntimeError("Can only notify once")
248

    
249
    os.close(self._write_fd)
250
    self._write_fd = None
251

    
252
  def _Cleanup(self):
253
    """Close all file descriptors.
254

255
    """
256
    if self._read_fd is not None:
257
      os.close(self._read_fd)
258
      self._read_fd = None
259

    
260
    if self._write_fd is not None:
261
      os.close(self._write_fd)
262
      self._write_fd = None
263

    
264
    self._poller = None
265

    
266
  def __del__(self):
267
    """Called on object deletion.
268

269
    Ensure no file descriptors are left open.
270

271
    """
272
    self._Cleanup()
273

    
274

    
275
class _PipeCondition(_BaseCondition):
276
  """Group-only non-polling condition with counters.
277

278
  This condition class uses pipes and poll, internally, to be able to wait for
279
  notification with a timeout, without resorting to polling. It is almost
280
  compatible with Python's threading.Condition, but only supports notifyAll and
281
  non-recursive locks. As an additional features it's able to report whether
282
  there are any waiting threads.
283

284
  """
285
  __slots__ = _BaseCondition.__slots__ + [
286
    "_nwaiters",
287
    "_pipe",
288
    ]
289

    
290
  _pipe_class = _SingleActionPipeCondition
291

    
292
  def __init__(self, lock):
293
    """Initializes this class.
294

295
    """
296
    _BaseCondition.__init__(self, lock)
297
    self._nwaiters = 0
298
    self._pipe = None
299

    
300
  def wait(self, timeout=None):
301
    """Wait for a notification.
302

303
    @type timeout: float or None
304
    @param timeout: Waiting timeout (can be None)
305

306
    """
307
    self._check_owned()
308

    
309
    if not self._pipe:
310
      self._pipe = self._pipe_class()
311

    
312
    # Keep local reference to the pipe. It could be replaced by another thread
313
    # notifying while we're waiting.
314
    pipe = self._pipe
315

    
316
    assert self._nwaiters >= 0
317
    self._nwaiters += 1
318
    try:
319
      # Get function to wait on the pipe
320
      wait_fn = pipe.StartWaiting()
321
      try:
322
        # Release lock while waiting
323
        self.release()
324
        try:
325
          # Wait for notification
326
          wait_fn(timeout)
327
        finally:
328
          # Re-acquire lock
329
          self.acquire()
330
      finally:
331
        # Destroy pipe if this was the last waiter and the current pipe is
332
        # still the same. The same pipe cannot be reused after cleanup.
333
        if pipe.DoneWaiting() and pipe == self._pipe:
334
          self._pipe = None
335
    finally:
336
      assert self._nwaiters > 0
337
      self._nwaiters -= 1
338

    
339
  def notifyAll(self):
340
    """Notify all currently waiting threads.
341

342
    """
343
    self._check_owned()
344

    
345
    # Notify and forget pipe. A new one will be created on the next call to
346
    # wait.
347
    if self._pipe is not None:
348
      self._pipe.notifyAll()
349
      self._pipe = None
350

    
351
  def has_waiting(self):
352
    """Returns whether there are active waiters.
353

354
    """
355
    self._check_owned()
356

    
357
    return bool(self._nwaiters)
358

    
359

    
360
class _CountingCondition(object):
361
  """Wrapper for Python's built-in threading.Condition class.
362

363
  This wrapper keeps a count of active waiters. We can't access the internal
364
  "__waiters" attribute of threading.Condition because it's not thread-safe.
365

366
  """
367
  __slots__ = [
368
    "_cond",
369
    "_nwaiters",
370
    ]
371

    
372
  def __init__(self, lock):
373
    """Initializes this class.
374

375
    """
376
    object.__init__(self)
377
    self._cond = threading.Condition(lock=lock)
378
    self._nwaiters = 0
379

    
380
  def notifyAll(self):
381
    """Notifies the condition.
382

383
    """
384
    return self._cond.notifyAll()
385

    
386
  def wait(self, timeout=None):
387
    """Waits for the condition to be notified.
388

389
    @type timeout: float or None
390
    @param timeout: Timeout in seconds
391

392
    """
393
    assert self._nwaiters >= 0
394

    
395
    self._nwaiters += 1
396
    try:
397
      return self._cond.wait(timeout=timeout)
398
    finally:
399
      self._nwaiters -= 1
400

    
401
  def has_waiting(self):
402
    """Returns whether there are active waiters.
403

404
    """
405
    return bool(self._nwaiters)
406

    
407

    
408
class SharedLock(object):
409
  """Implements a shared lock.
410

411
  Multiple threads can acquire the lock in a shared way, calling
412
  acquire_shared().  In order to acquire the lock in an exclusive way threads
413
  can call acquire_exclusive().
414

415
  The lock prevents starvation but does not guarantee that threads will acquire
416
  the shared lock in the order they queued for it, just that they will
417
  eventually do so.
418

419
  """
420
  __slots__ = [
421
    "__active_shr_c",
422
    "__inactive_shr_c",
423
    "__deleted",
424
    "__exc",
425
    "__lock",
426
    "__pending",
427
    "__shr",
428
    ]
429

    
430
  __condition_class = _PipeCondition
431

    
432
  def __init__(self):
433
    """Construct a new SharedLock.
434

435
    """
436
    object.__init__(self)
437

    
438
    # Internal lock
439
    self.__lock = threading.Lock()
440

    
441
    # Queue containing waiting acquires
442
    self.__pending = []
443

    
444
    # Active and inactive conditions for shared locks
445
    self.__active_shr_c = self.__condition_class(self.__lock)
446
    self.__inactive_shr_c = self.__condition_class(self.__lock)
447

    
448
    # Current lock holders
449
    self.__shr = set()
450
    self.__exc = None
451

    
452
    # is this lock in the deleted state?
453
    self.__deleted = False
454

    
455
  def __check_deleted(self):
456
    """Raises an exception if the lock has been deleted.
457

458
    """
459
    if self.__deleted:
460
      raise errors.LockError("Deleted lock")
461

    
462
  def __is_sharer(self):
463
    """Is the current thread sharing the lock at this time?
464

465
    """
466
    return threading.currentThread() in self.__shr
467

    
468
  def __is_exclusive(self):
469
    """Is the current thread holding the lock exclusively at this time?
470

471
    """
472
    return threading.currentThread() == self.__exc
473

    
474
  def __is_owned(self, shared=-1):
475
    """Is the current thread somehow owning the lock at this time?
476

477
    This is a private version of the function, which presumes you're holding
478
    the internal lock.
479

480
    """
481
    if shared < 0:
482
      return self.__is_sharer() or self.__is_exclusive()
483
    elif shared:
484
      return self.__is_sharer()
485
    else:
486
      return self.__is_exclusive()
487

    
488
  def _is_owned(self, shared=-1):
489
    """Is the current thread somehow owning the lock at this time?
490

491
    @param shared:
492
        - < 0: check for any type of ownership (default)
493
        - 0: check for exclusive ownership
494
        - > 0: check for shared ownership
495

496
    """
497
    self.__lock.acquire()
498
    try:
499
      return self.__is_owned(shared=shared)
500
    finally:
501
      self.__lock.release()
502

    
503
  def _count_pending(self):
504
    """Returns the number of pending acquires.
505

506
    @rtype: int
507

508
    """
509
    self.__lock.acquire()
510
    try:
511
      return len(self.__pending)
512
    finally:
513
      self.__lock.release()
514

    
515
  def __do_acquire(self, shared):
516
    """Actually acquire the lock.
517

518
    """
519
    if shared:
520
      self.__shr.add(threading.currentThread())
521
    else:
522
      self.__exc = threading.currentThread()
523

    
524
  def __can_acquire(self, shared):
525
    """Determine whether lock can be acquired.
526

527
    """
528
    if shared:
529
      return self.__exc is None
530
    else:
531
      return len(self.__shr) == 0 and self.__exc is None
532

    
533
  def __is_on_top(self, cond):
534
    """Checks whether the passed condition is on top of the queue.
535

536
    The caller must make sure the queue isn't empty.
537

538
    """
539
    return self.__pending[0] == cond
540

    
541
  def __acquire_unlocked(self, shared, timeout):
542
    """Acquire a shared lock.
543

544
    @param shared: whether to acquire in shared mode; by default an
545
        exclusive lock will be acquired
546
    @param timeout: maximum waiting time before giving up
547

548
    """
549
    self.__check_deleted()
550

    
551
    # We cannot acquire the lock if we already have it
552
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
553

    
554
    # Check whether someone else holds the lock or there are pending acquires.
555
    if not self.__pending and self.__can_acquire(shared):
556
      # Apparently not, can acquire lock directly.
557
      self.__do_acquire(shared)
558
      return True
559

    
560
    if shared:
561
      wait_condition = self.__active_shr_c
562

    
563
      # Check if we're not yet in the queue
564
      if wait_condition not in self.__pending:
565
        self.__pending.append(wait_condition)
566
    else:
567
      wait_condition = self.__condition_class(self.__lock)
568
      # Always add to queue
569
      self.__pending.append(wait_condition)
570

    
571
    try:
572
      # Wait until we become the topmost acquire in the queue or the timeout
573
      # expires.
574
      while not (self.__is_on_top(wait_condition) and
575
                 self.__can_acquire(shared)):
576
        # Wait for notification
577
        wait_condition.wait(timeout)
578
        self.__check_deleted()
579

    
580
        # A lot of code assumes blocking acquires always succeed. Loop
581
        # internally for that case.
582
        if timeout is not None:
583
          break
584

    
585
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
586
        self.__do_acquire(shared)
587
        return True
588
    finally:
589
      # Remove condition from queue if there are no more waiters
590
      if not wait_condition.has_waiting() and not self.__deleted:
591
        self.__pending.remove(wait_condition)
592

    
593
    return False
594

    
595
  def acquire(self, shared=0, timeout=None):
596
    """Acquire a shared lock.
597

598
    @type shared: int
599
    @param shared: whether to acquire in shared mode; by default an
600
        exclusive lock will be acquired
601
    @type timeout: float
602
    @param timeout: maximum waiting time before giving up
603

604
    """
605
    self.__lock.acquire()
606
    try:
607
      return self.__acquire_unlocked(shared, timeout)
608
    finally:
609
      self.__lock.release()
610

    
611
  def release(self):
612
    """Release a Shared Lock.
613

614
    You must have acquired the lock, either in shared or in exclusive mode,
615
    before calling this function.
616

617
    """
618
    self.__lock.acquire()
619
    try:
620
      assert self.__is_exclusive() or self.__is_sharer(), \
621
        "Cannot release non-owned lock"
622

    
623
      # Autodetect release type
624
      if self.__is_exclusive():
625
        self.__exc = None
626
      else:
627
        self.__shr.remove(threading.currentThread())
628

    
629
      # Notify topmost condition in queue
630
      if self.__pending:
631
        first_condition = self.__pending[0]
632
        first_condition.notifyAll()
633

    
634
        if first_condition == self.__active_shr_c:
635
          self.__active_shr_c = self.__inactive_shr_c
636
          self.__inactive_shr_c = first_condition
637

    
638
    finally:
639
      self.__lock.release()
640

    
641
  def delete(self, timeout=None):
642
    """Delete a Shared Lock.
643

644
    This operation will declare the lock for removal. First the lock will be
645
    acquired in exclusive mode if you don't already own it, then the lock
646
    will be put in a state where any future and pending acquire() fail.
647

648
    @type timeout: float
649
    @param timeout: maximum waiting time before giving up
650

651
    """
652
    self.__lock.acquire()
653
    try:
654
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
655

    
656
      self.__check_deleted()
657

    
658
      # The caller is allowed to hold the lock exclusively already.
659
      acquired = self.__is_exclusive()
660

    
661
      if not acquired:
662
        acquired = self.__acquire_unlocked(0, timeout)
663

    
664
        assert self.__is_exclusive() and not self.__is_sharer(), \
665
          "Lock wasn't acquired in exclusive mode"
666

    
667
      if acquired:
668
        self.__deleted = True
669
        self.__exc = None
670

    
671
        # Notify all acquires. They'll throw an error.
672
        while self.__pending:
673
          self.__pending.pop().notifyAll()
674

    
675
      return acquired
676
    finally:
677
      self.__lock.release()
678

    
679

    
680
# Whenever we want to acquire a full LockSet we pass None as the value
681
# to acquire.  Hide this behind this nicely named constant.
682
ALL_SET = None
683

    
684

    
685
class LockSet:
686
  """Implements a set of locks.
687

688
  This abstraction implements a set of shared locks for the same resource type,
689
  distinguished by name. The user can lock a subset of the resources and the
690
  LockSet will take care of acquiring the locks always in the same order, thus
691
  preventing deadlock.
692

693
  All the locks needed in the same set must be acquired together, though.
694

695
  """
696
  def __init__(self, members=None):
697
    """Constructs a new LockSet.
698

699
    @param members: initial members of the set
700

701
    """
702
    # Used internally to guarantee coherency.
703
    self.__lock = SharedLock()
704

    
705
    # The lockdict indexes the relationship name -> lock
706
    # The order-of-locking is implied by the alphabetical order of names
707
    self.__lockdict = {}
708

    
709
    if members is not None:
710
      for name in members:
711
        self.__lockdict[name] = SharedLock()
712

    
713
    # The owner dict contains the set of locks each thread owns. For
714
    # performance each thread can access its own key without a global lock on
715
    # this structure. It is paramount though that *no* other type of access is
716
    # done to this structure (eg. no looping over its keys). *_owner helper
717
    # function are defined to guarantee access is correct, but in general never
718
    # do anything different than __owners[threading.currentThread()], or there
719
    # will be trouble.
720
    self.__owners = {}
721

    
722
  def _is_owned(self):
723
    """Is the current thread a current level owner?"""
724
    return threading.currentThread() in self.__owners
725

    
726
  def _add_owned(self, name=None):
727
    """Note the current thread owns the given lock"""
728
    if name is None:
729
      if not self._is_owned():
730
        self.__owners[threading.currentThread()] = set()
731
    else:
732
      if self._is_owned():
733
        self.__owners[threading.currentThread()].add(name)
734
      else:
735
        self.__owners[threading.currentThread()] = set([name])
736

    
737
  def _del_owned(self, name=None):
738
    """Note the current thread owns the given lock"""
739

    
740
    if name is not None:
741
      self.__owners[threading.currentThread()].remove(name)
742

    
743
    # Only remove the key if we don't hold the set-lock as well
744
    if (not self.__lock._is_owned() and
745
        not self.__owners[threading.currentThread()]):
746
      del self.__owners[threading.currentThread()]
747

    
748
  def _list_owned(self):
749
    """Get the set of resource names owned by the current thread"""
750
    if self._is_owned():
751
      return self.__owners[threading.currentThread()].copy()
752
    else:
753
      return set()
754

    
755
  def __names(self):
756
    """Return the current set of names.
757

758
    Only call this function while holding __lock and don't iterate on the
759
    result after releasing the lock.
760

761
    """
762
    return self.__lockdict.keys()
763

    
764
  def _names(self):
765
    """Return a copy of the current set of elements.
766

767
    Used only for debugging purposes.
768

769
    """
770
    # If we don't already own the set-level lock acquired
771
    # we'll get it and note we need to release it later.
772
    release_lock = False
773
    if not self.__lock._is_owned():
774
      release_lock = True
775
      self.__lock.acquire(shared=1)
776
    try:
777
      result = self.__names()
778
    finally:
779
      if release_lock:
780
        self.__lock.release()
781
    return set(result)
782

    
783
  def acquire(self, names, timeout=None, shared=0):
784
    """Acquire a set of resource locks.
785

786
    @param names: the names of the locks which shall be acquired
787
        (special lock names, or instance/node names)
788
    @param shared: whether to acquire in shared mode; by default an
789
        exclusive lock will be acquired
790
    @type timeout: float
791
    @param timeout: Maximum time to acquire all locks
792

793
    @return: True when all the locks are successfully acquired
794

795
    @raise errors.LockError: when any lock we try to acquire has
796
        been deleted before we succeed. In this case none of the
797
        locks requested will be acquired.
798

799
    """
800
    if timeout is not None:
801
      raise NotImplementedError
802

    
803
    # Check we don't already own locks at this level
804
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
805

    
806
    if names is None:
807
      # If no names are given acquire the whole set by not letting new names
808
      # being added before we release, and getting the current list of names.
809
      # Some of them may then be deleted later, but we'll cope with this.
810
      #
811
      # We'd like to acquire this lock in a shared way, as it's nice if
812
      # everybody else can use the instances at the same time. If are acquiring
813
      # them exclusively though they won't be able to do this anyway, though,
814
      # so we'll get the list lock exclusively as well in order to be able to
815
      # do add() on the set while owning it.
816
      self.__lock.acquire(shared=shared)
817
      try:
818
        # note we own the set-lock
819
        self._add_owned()
820
        names = self.__names()
821
      except:
822
        # We shouldn't have problems adding the lock to the owners list, but
823
        # if we did we'll try to release this lock and re-raise exception.
824
        # Of course something is going to be really wrong, after this.
825
        self.__lock.release()
826
        raise
827

    
828
    try:
829
      # Support passing in a single resource to acquire rather than many
830
      if isinstance(names, basestring):
831
        names = [names]
832
      else:
833
        names = sorted(names)
834

    
835
      acquire_list = []
836
      # First we look the locks up on __lockdict. We have no way of being sure
837
      # they will still be there after, but this makes it a lot faster should
838
      # just one of them be the already wrong
839
      for lname in utils.UniqueSequence(names):
840
        try:
841
          lock = self.__lockdict[lname] # raises KeyError if lock is not there
842
          acquire_list.append((lname, lock))
843
        except (KeyError):
844
          if self.__lock._is_owned():
845
            # We are acquiring all the set, it doesn't matter if this
846
            # particular element is not there anymore.
847
            continue
848
          else:
849
            raise errors.LockError('non-existing lock in set (%s)' % lname)
850

    
851
      # This will hold the locknames we effectively acquired.
852
      acquired = set()
853
      # Now acquire_list contains a sorted list of resources and locks we want.
854
      # In order to get them we loop on this (private) list and acquire() them.
855
      # We gave no real guarantee they will still exist till this is done but
856
      # .acquire() itself is safe and will alert us if the lock gets deleted.
857
      for (lname, lock) in acquire_list:
858
        try:
859
          lock.acquire(shared=shared) # raises LockError if the lock is deleted
860
          # now the lock cannot be deleted, we have it!
861
          self._add_owned(name=lname)
862
          acquired.add(lname)
863
        except (errors.LockError):
864
          if self.__lock._is_owned():
865
            # We are acquiring all the set, it doesn't matter if this
866
            # particular element is not there anymore.
867
            continue
868
          else:
869
            name_fail = lname
870
            for lname in self._list_owned():
871
              self.__lockdict[lname].release()
872
              self._del_owned(name=lname)
873
            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
874
        except:
875
          # We shouldn't have problems adding the lock to the owners list, but
876
          # if we did we'll try to release this lock and re-raise exception.
877
          # Of course something is going to be really wrong, after this.
878
          if lock._is_owned():
879
            lock.release()
880
          raise
881

    
882
    except:
883
      # If something went wrong and we had the set-lock let's release it...
884
      if self.__lock._is_owned():
885
        self.__lock.release()
886
      raise
887

    
888
    return acquired
889

    
890
  def release(self, names=None):
891
    """Release a set of resource locks, at the same level.
892

893
    You must have acquired the locks, either in shared or in exclusive mode,
894
    before releasing them.
895

896
    @param names: the names of the locks which shall be released
897
        (defaults to all the locks acquired at that level).
898

899
    """
900
    assert self._is_owned(), "release() on lock set while not owner"
901

    
902
    # Support passing in a single resource to release rather than many
903
    if isinstance(names, basestring):
904
      names = [names]
905

    
906
    if names is None:
907
      names = self._list_owned()
908
    else:
909
      names = set(names)
910
      assert self._list_owned().issuperset(names), (
911
               "release() on unheld resources %s" %
912
               names.difference(self._list_owned()))
913

    
914
    # First of all let's release the "all elements" lock, if set.
915
    # After this 'add' can work again
916
    if self.__lock._is_owned():
917
      self.__lock.release()
918
      self._del_owned()
919

    
920
    for lockname in names:
921
      # If we are sure the lock doesn't leave __lockdict without being
922
      # exclusively held we can do this...
923
      self.__lockdict[lockname].release()
924
      self._del_owned(name=lockname)
925

    
926
  def add(self, names, acquired=0, shared=0):
927
    """Add a new set of elements to the set
928

929
    @param names: names of the new elements to add
930
    @param acquired: pre-acquire the new resource?
931
    @param shared: is the pre-acquisition shared?
932

933
    """
934
    # Check we don't already own locks at this level
935
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
936
      "Cannot add locks if the set is only partially owned, or shared"
937

    
938
    # Support passing in a single resource to add rather than many
939
    if isinstance(names, basestring):
940
      names = [names]
941

    
942
    # If we don't already own the set-level lock acquired in an exclusive way
943
    # we'll get it and note we need to release it later.
944
    release_lock = False
945
    if not self.__lock._is_owned():
946
      release_lock = True
947
      self.__lock.acquire()
948

    
949
    try:
950
      invalid_names = set(self.__names()).intersection(names)
951
      if invalid_names:
952
        # This must be an explicit raise, not an assert, because assert is
953
        # turned off when using optimization, and this can happen because of
954
        # concurrency even if the user doesn't want it.
955
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
956

    
957
      for lockname in names:
958
        lock = SharedLock()
959

    
960
        if acquired:
961
          lock.acquire(shared=shared)
962
          # now the lock cannot be deleted, we have it!
963
          try:
964
            self._add_owned(name=lockname)
965
          except:
966
            # We shouldn't have problems adding the lock to the owners list,
967
            # but if we did we'll try to release this lock and re-raise
968
            # exception.  Of course something is going to be really wrong,
969
            # after this.  On the other hand the lock hasn't been added to the
970
            # __lockdict yet so no other threads should be pending on it. This
971
            # release is just a safety measure.
972
            lock.release()
973
            raise
974

    
975
        self.__lockdict[lockname] = lock
976

    
977
    finally:
978
      # Only release __lock if we were not holding it previously.
979
      if release_lock:
980
        self.__lock.release()
981

    
982
    return True
983

    
984
  def remove(self, names):
985
    """Remove elements from the lock set.
986

987
    You can either not hold anything in the lockset or already hold a superset
988
    of the elements you want to delete, exclusively.
989

990
    @param names: names of the resource to remove.
991

992
    @return:: a list of locks which we removed; the list is always
993
        equal to the names list if we were holding all the locks
994
        exclusively
995

996
    """
997
    # Support passing in a single resource to remove rather than many
998
    if isinstance(names, basestring):
999
      names = [names]
1000

    
1001
    # If we own any subset of this lock it must be a superset of what we want
1002
    # to delete. The ownership must also be exclusive, but that will be checked
1003
    # by the lock itself.
1004
    assert not self._is_owned() or self._list_owned().issuperset(names), (
1005
      "remove() on acquired lockset while not owning all elements")
1006

    
1007
    removed = []
1008

    
1009
    for lname in names:
1010
      # Calling delete() acquires the lock exclusively if we don't already own
1011
      # it, and causes all pending and subsequent lock acquires to fail. It's
1012
      # fine to call it out of order because delete() also implies release(),
1013
      # and the assertion above guarantees that if we either already hold
1014
      # everything we want to delete, or we hold none.
1015
      try:
1016
        self.__lockdict[lname].delete()
1017
        removed.append(lname)
1018
      except (KeyError, errors.LockError):
1019
        # This cannot happen if we were already holding it, verify:
1020
        assert not self._is_owned(), "remove failed while holding lockset"
1021
      else:
1022
        # If no LockError was raised we are the ones who deleted the lock.
1023
        # This means we can safely remove it from lockdict, as any further or
1024
        # pending delete() or acquire() will fail (and nobody can have the lock
1025
        # since before our call to delete()).
1026
        #
1027
        # This is done in an else clause because if the exception was thrown
1028
        # it's the job of the one who actually deleted it.
1029
        del self.__lockdict[lname]
1030
        # And let's remove it from our private list if we owned it.
1031
        if self._is_owned():
1032
          self._del_owned(name=lname)
1033

    
1034
    return removed
1035

    
1036

    
1037
# Locking levels, must be acquired in increasing order.
1038
# Current rules are:
1039
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1040
#   acquired before performing any operation, either in shared or in exclusive
1041
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1042
#   avoided.
1043
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1044
#   If you need more than one node, or more than one instance, acquire them at
1045
#   the same time.
1046
LEVEL_CLUSTER = 0
1047
LEVEL_INSTANCE = 1
1048
LEVEL_NODE = 2
1049

    
1050
LEVELS = [LEVEL_CLUSTER,
1051
          LEVEL_INSTANCE,
1052
          LEVEL_NODE]
1053

    
1054
# Lock levels which are modifiable
1055
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1056

    
1057
LEVEL_NAMES = {
1058
  LEVEL_CLUSTER: "cluster",
1059
  LEVEL_INSTANCE: "instance",
1060
  LEVEL_NODE: "node",
1061
  }
1062

    
1063
# Constant for the big ganeti lock
1064
BGL = 'BGL'
1065

    
1066

    
1067
class GanetiLockManager:
1068
  """The Ganeti Locking Library
1069

1070
  The purpose of this small library is to manage locking for ganeti clusters
1071
  in a central place, while at the same time doing dynamic checks against
1072
  possible deadlocks. It will also make it easier to transition to a different
1073
  lock type should we migrate away from python threads.
1074

1075
  """
1076
  _instance = None
1077

    
1078
  def __init__(self, nodes=None, instances=None):
1079
    """Constructs a new GanetiLockManager object.
1080

1081
    There should be only a GanetiLockManager object at any time, so this
1082
    function raises an error if this is not the case.
1083

1084
    @param nodes: list of node names
1085
    @param instances: list of instance names
1086

1087
    """
1088
    assert self.__class__._instance is None, \
1089
           "double GanetiLockManager instance"
1090

    
1091
    self.__class__._instance = self
1092

    
1093
    # The keyring contains all the locks, at their level and in the correct
1094
    # locking order.
1095
    self.__keyring = {
1096
      LEVEL_CLUSTER: LockSet([BGL]),
1097
      LEVEL_NODE: LockSet(nodes),
1098
      LEVEL_INSTANCE: LockSet(instances),
1099
    }
1100

    
1101
  def _names(self, level):
1102
    """List the lock names at the given level.
1103

1104
    This can be used for debugging/testing purposes.
1105

1106
    @param level: the level whose list of locks to get
1107

1108
    """
1109
    assert level in LEVELS, "Invalid locking level %s" % level
1110
    return self.__keyring[level]._names()
1111

    
1112
  def _is_owned(self, level):
1113
    """Check whether we are owning locks at the given level
1114

1115
    """
1116
    return self.__keyring[level]._is_owned()
1117

    
1118
  is_owned = _is_owned
1119

    
1120
  def _list_owned(self, level):
1121
    """Get the set of owned locks at the given level
1122

1123
    """
1124
    return self.__keyring[level]._list_owned()
1125

    
1126
  def _upper_owned(self, level):
1127
    """Check that we don't own any lock at a level greater than the given one.
1128

1129
    """
1130
    # This way of checking only works if LEVELS[i] = i, which we check for in
1131
    # the test cases.
1132
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1133

    
1134
  def _BGL_owned(self):
1135
    """Check if the current thread owns the BGL.
1136

1137
    Both an exclusive or a shared acquisition work.
1138

1139
    """
1140
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1141

    
1142
  def _contains_BGL(self, level, names):
1143
    """Check if the level contains the BGL.
1144

1145
    Check if acting on the given level and set of names will change
1146
    the status of the Big Ganeti Lock.
1147

1148
    """
1149
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1150

    
1151
  def acquire(self, level, names, timeout=None, shared=0):
1152
    """Acquire a set of resource locks, at the same level.
1153

1154
    @param level: the level at which the locks shall be acquired;
1155
        it must be a member of LEVELS.
1156
    @param names: the names of the locks which shall be acquired
1157
        (special lock names, or instance/node names)
1158
    @param shared: whether to acquire in shared mode; by default
1159
        an exclusive lock will be acquired
1160
    @type timeout: float
1161
    @param timeout: Maximum time to acquire all locks
1162

1163
    """
1164
    assert level in LEVELS, "Invalid locking level %s" % level
1165

    
1166
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1167
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1168
    # so even if we've migrated we need to at least share the BGL to be
1169
    # compatible with them. Of course if we own the BGL exclusively there's no
1170
    # point in acquiring any other lock, unless perhaps we are half way through
1171
    # the migration of the current opcode.
1172
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1173
            "You must own the Big Ganeti Lock before acquiring any other")
1174

    
1175
    # Check we don't own locks at the same or upper levels.
1176
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1177
           " while owning some at a greater one")
1178

    
1179
    # Acquire the locks in the set.
1180
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1181

    
1182
  def release(self, level, names=None):
1183
    """Release a set of resource locks, at the same level.
1184

1185
    You must have acquired the locks, either in shared or in exclusive
1186
    mode, before releasing them.
1187

1188
    @param level: the level at which the locks shall be released;
1189
        it must be a member of LEVELS
1190
    @param names: the names of the locks which shall be released
1191
        (defaults to all the locks acquired at that level)
1192

1193
    """
1194
    assert level in LEVELS, "Invalid locking level %s" % level
1195
    assert (not self._contains_BGL(level, names) or
1196
            not self._upper_owned(LEVEL_CLUSTER)), (
1197
            "Cannot release the Big Ganeti Lock while holding something"
1198
            " at upper levels")
1199

    
1200
    # Release will complain if we don't own the locks already
1201
    return self.__keyring[level].release(names)
1202

    
1203
  def add(self, level, names, acquired=0, shared=0):
1204
    """Add locks at the specified level.
1205

1206
    @param level: the level at which the locks shall be added;
1207
        it must be a member of LEVELS_MOD.
1208
    @param names: names of the locks to acquire
1209
    @param acquired: whether to acquire the newly added locks
1210
    @param shared: whether the acquisition will be shared
1211

1212
    """
1213
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1214
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1215
           " operations")
1216
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1217
           " while owning some at a greater one")
1218
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1219

    
1220
  def remove(self, level, names):
1221
    """Remove locks from the specified level.
1222

1223
    You must either already own the locks you are trying to remove
1224
    exclusively or not own any lock at an upper level.
1225

1226
    @param level: the level at which the locks shall be removed;
1227
        it must be a member of LEVELS_MOD
1228
    @param names: the names of the locks which shall be removed
1229
        (special lock names, or instance/node names)
1230

1231
    """
1232
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1233
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1234
           " operations")
1235
    # Check we either own the level or don't own anything from here
1236
    # up. LockSet.remove() will check the case in which we don't own
1237
    # all the needed resources, or we have a shared ownership.
1238
    assert self._is_owned(level) or not self._upper_owned(level), (
1239
           "Cannot remove locks at a level while not owning it or"
1240
           " owning some at a greater one")
1241
    return self.__keyring[level].remove(names)