Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ c6997f21

History | View | Annotate | Download (37 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
import os
24
import select
25
import threading
26
import time
27
import errno
28

    
29
from ganeti import errors
30
from ganeti import utils
31

    
32

    
33
def ssynchronized(lock, shared=0):
34
  """Shared Synchronization decorator.
35

36
  Calls the function holding the given lock, either in exclusive or shared
37
  mode. It requires the passed lock to be a SharedLock (or support its
38
  semantics).
39

40
  """
41
  def wrap(fn):
42
    def sync_function(*args, **kwargs):
43
      lock.acquire(shared=shared)
44
      try:
45
        return fn(*args, **kwargs)
46
      finally:
47
        lock.release()
48
    return sync_function
49
  return wrap
50

    
51

    
52
class _SingleActionPipeConditionWaiter(object):
53
  """Callable helper class for _SingleActionPipeCondition.
54

55
  """
56
  __slots__ = [
57
    "_cond",
58
    "_fd",
59
    "_poller",
60
    ]
61

    
62
  def __init__(self, cond, poller, fd):
63
    """Initializes this class.
64

65
    @type cond: L{_SingleActionPipeCondition}
66
    @param cond: Parent condition
67
    @type poller: select.poll
68
    @param poller: Poller object
69
    @type fd: int
70
    @param fd: File descriptor to wait for
71

72
    """
73
    object.__init__(self)
74

    
75
    self._cond = cond
76
    self._poller = poller
77
    self._fd = fd
78

    
79
  def __call__(self, timeout):
80
    """Wait for something to happen on the pipe.
81

82
    @type timeout: float or None
83
    @param timeout: Timeout for waiting (can be None)
84

85
    """
86
    start_time = time.time()
87
    remaining_time = timeout
88

    
89
    while timeout is None or remaining_time > 0:
90
      try:
91
        result = self._poller.poll(remaining_time)
92
      except EnvironmentError, err:
93
        if err.errno != errno.EINTR:
94
          raise
95
        result = None
96

    
97
      # Check whether we were notified
98
      if result and result[0][0] == self._fd:
99
        break
100

    
101
      # Re-calculate timeout if necessary
102
      if timeout is not None:
103
        remaining_time = start_time + timeout - time.time()
104

    
105

    
106
class _SingleActionPipeCondition(object):
107
  """Wrapper around a pipe for usage inside conditions.
108

109
  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
110
  always allocated when constructing the class. Extra care is taken to always
111
  close the file descriptors.
112

113
  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
114
  notifications.
115

116
  Warning: This class is designed to be used as the underlying component of a
117
  locking condition, but is not by itself thread safe, and needs to be
118
  protected by an external lock.
119

120
  """
121
  __slots__ = [
122
    "_poller",
123
    "_read_fd",
124
    "_write_fd",
125
    "_nwaiters",
126
    ]
127

    
128
  _waiter_class = _SingleActionPipeConditionWaiter
129

    
130
  def __init__(self):
131
    """Initializes this class.
132

133
    """
134
    object.__init__(self)
135

    
136
    self._nwaiters = 0
137

    
138
    # Just assume the unpacking is successful, otherwise error handling gets
139
    # very complicated.
140
    (self._read_fd, self._write_fd) = os.pipe()
141
    try:
142
      # The poller looks for closure of the write side
143
      poller = select.poll()
144
      poller.register(self._read_fd, select.POLLHUP)
145

    
146
      self._poller = poller
147
    except:
148
      if self._read_fd is not None:
149
        os.close(self._read_fd)
150
      if self._write_fd is not None:
151
        os.close(self._write_fd)
152
      raise
153

    
154
    # There should be no code here anymore, otherwise the pipe file descriptors
155
    # may be not be cleaned up properly in case of errors.
156

    
157
  def StartWaiting(self):
158
    """Return function to wait for notification.
159

160
    @rtype: L{_SingleActionPipeConditionWaiter}
161
    @return: Function to wait for notification
162

163
    """
164
    assert self._nwaiters >= 0
165

    
166
    if self._poller is None:
167
      raise RuntimeError("Already cleaned up")
168

    
169
    # Create waiter function and increase number of waiters
170
    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
171
    self._nwaiters += 1
172
    return wait_fn
173

    
174
  def DoneWaiting(self):
175
    """Decrement number of waiters and automatic cleanup.
176

177
    Must be called after waiting for a notification.
178

179
    @rtype: bool
180
    @return: Whether this was the last waiter
181

182
    """
183
    assert self._nwaiters > 0
184

    
185
    self._nwaiters -= 1
186

    
187
    if self._nwaiters == 0:
188
      self._Cleanup()
189
      return True
190

    
191
    return False
192

    
193
  def notifyAll(self):
194
    """Close the writing side of the pipe to notify all waiters.
195

196
    """
197
    if self._write_fd is None:
198
      raise RuntimeError("Can only notify once")
199

    
200
    os.close(self._write_fd)
201
    self._write_fd = None
202

    
203
  def _Cleanup(self):
204
    """Close all file descriptors.
205

206
    """
207
    if self._read_fd is not None:
208
      os.close(self._read_fd)
209
      self._read_fd = None
210

    
211
    if self._write_fd is not None:
212
      os.close(self._write_fd)
213
      self._write_fd = None
214

    
215
    self._poller = None
216

    
217
  def __del__(self):
218
    """Called on object deletion.
219

220
    Ensure no file descriptors are left open.
221

222
    """
223
    self._Cleanup()
224

    
225

    
226
class _PipeCondition(object):
227
  """Group-only non-polling condition with counters.
228

229
  This condition class uses pipes and poll, internally, to be able to wait for
230
  notification with a timeout, without resorting to polling. It is almost
231
  compatible with Python's threading.Condition, but only supports notifyAll and
232
  non-recursive locks. As an additional features it's able to report whether
233
  there are any waiting threads.
234

235
  """
236
  __slots__ = [
237
    "_lock",
238
    "_nwaiters",
239
    "_pipe",
240
    "acquire",
241
    "release",
242
    ]
243

    
244
  _pipe_class = _SingleActionPipeCondition
245

    
246
  def __init__(self, lock):
247
    """Initializes this class.
248

249
    """
250
    object.__init__(self)
251

    
252
    # Recursive locks are not supported
253
    assert not hasattr(lock, "_acquire_restore")
254
    assert not hasattr(lock, "_release_save")
255

    
256
    self._lock = lock
257

    
258
    # Export the lock's acquire() and release() methods
259
    self.acquire = lock.acquire
260
    self.release = lock.release
261

    
262
    self._nwaiters = 0
263
    self._pipe = None
264

    
265
  def _is_owned(self):
266
    """Check whether lock is owned by current thread.
267

268
    """
269
    if self._lock.acquire(0):
270
      self._lock.release()
271
      return False
272

    
273
    return True
274

    
275
  def _check_owned(self):
276
    """Raise an exception if the current thread doesn't own the lock.
277

278
    """
279
    if not self._is_owned():
280
      raise RuntimeError("cannot work with un-aquired lock")
281

    
282
  def wait(self, timeout=None):
283
    """Wait for a notification.
284

285
    @type timeout: float or None
286
    @param timeout: Waiting timeout (can be None)
287

288
    """
289
    self._check_owned()
290

    
291
    if not self._pipe:
292
      self._pipe = self._pipe_class()
293

    
294
    # Keep local reference to the pipe. It could be replaced by another thread
295
    # notifying while we're waiting.
296
    pipe = self._pipe
297

    
298
    assert self._nwaiters >= 0
299
    self._nwaiters += 1
300
    try:
301
      # Get function to wait on the pipe
302
      wait_fn = pipe.StartWaiting()
303
      try:
304
        # Release lock while waiting
305
        self.release()
306
        try:
307
          # Wait for notification
308
          wait_fn(timeout)
309
        finally:
310
          # Re-acquire lock
311
          self.acquire()
312
      finally:
313
        # Destroy pipe if this was the last waiter and the current pipe is
314
        # still the same. The same pipe cannot be reused after cleanup.
315
        if pipe.DoneWaiting() and pipe == self._pipe:
316
          self._pipe = None
317
    finally:
318
      assert self._nwaiters > 0
319
      self._nwaiters -= 1
320

    
321
  def notifyAll(self):
322
    """Notify all currently waiting threads.
323

324
    """
325
    self._check_owned()
326

    
327
    # Notify and forget pipe. A new one will be created on the next call to
328
    # wait.
329
    if self._pipe is not None:
330
      self._pipe.notifyAll()
331
      self._pipe = None
332

    
333
  def has_waiting(self):
334
    """Returns whether there are active waiters.
335

336
    """
337
    self._check_owned()
338

    
339
    return bool(self._nwaiters)
340

    
341

    
342
class _CountingCondition(object):
343
  """Wrapper for Python's built-in threading.Condition class.
344

345
  This wrapper keeps a count of active waiters. We can't access the internal
346
  "__waiters" attribute of threading.Condition because it's not thread-safe.
347

348
  """
349
  __slots__ = [
350
    "_cond",
351
    "_nwaiters",
352
    ]
353

    
354
  def __init__(self, lock):
355
    """Initializes this class.
356

357
    """
358
    object.__init__(self)
359
    self._cond = threading.Condition(lock=lock)
360
    self._nwaiters = 0
361

    
362
  def notifyAll(self):
363
    """Notifies the condition.
364

365
    """
366
    return self._cond.notifyAll()
367

    
368
  def wait(self, timeout=None):
369
    """Waits for the condition to be notified.
370

371
    @type timeout: float or None
372
    @param timeout: Timeout in seconds
373

374
    """
375
    assert self._nwaiters >= 0
376

    
377
    self._nwaiters += 1
378
    try:
379
      return self._cond.wait(timeout=timeout)
380
    finally:
381
      self._nwaiters -= 1
382

    
383
  def has_waiting(self):
384
    """Returns whether there are active waiters.
385

386
    """
387
    return bool(self._nwaiters)
388

    
389

    
390
class SharedLock(object):
391
  """Implements a shared lock.
392

393
  Multiple threads can acquire the lock in a shared way, calling
394
  acquire_shared().  In order to acquire the lock in an exclusive way threads
395
  can call acquire_exclusive().
396

397
  The lock prevents starvation but does not guarantee that threads will acquire
398
  the shared lock in the order they queued for it, just that they will
399
  eventually do so.
400

401
  """
402
  __slots__ = [
403
    "__active_shr_c",
404
    "__inactive_shr_c",
405
    "__deleted",
406
    "__exc",
407
    "__lock",
408
    "__pending",
409
    "__shr",
410
    ]
411

    
412
  __condition_class = _PipeCondition
413

    
414
  def __init__(self):
415
    """Construct a new SharedLock.
416

417
    """
418
    object.__init__(self)
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425

    
426
    # Active and inactive conditions for shared locks
427
    self.__active_shr_c = self.__condition_class(self.__lock)
428
    self.__inactive_shr_c = self.__condition_class(self.__lock)
429

    
430
    # Current lock holders
431
    self.__shr = set()
432
    self.__exc = None
433

    
434
    # is this lock in the deleted state?
435
    self.__deleted = False
436

    
437
  def __check_deleted(self):
438
    """Raises an exception if the lock has been deleted.
439

440
    """
441
    if self.__deleted:
442
      raise errors.LockError("Deleted lock")
443

    
444
  def __is_sharer(self):
445
    """Is the current thread sharing the lock at this time?
446

447
    """
448
    return threading.currentThread() in self.__shr
449

    
450
  def __is_exclusive(self):
451
    """Is the current thread holding the lock exclusively at this time?
452

453
    """
454
    return threading.currentThread() == self.__exc
455

    
456
  def __is_owned(self, shared=-1):
457
    """Is the current thread somehow owning the lock at this time?
458

459
    This is a private version of the function, which presumes you're holding
460
    the internal lock.
461

462
    """
463
    if shared < 0:
464
      return self.__is_sharer() or self.__is_exclusive()
465
    elif shared:
466
      return self.__is_sharer()
467
    else:
468
      return self.__is_exclusive()
469

    
470
  def _is_owned(self, shared=-1):
471
    """Is the current thread somehow owning the lock at this time?
472

473
    @param shared:
474
        - < 0: check for any type of ownership (default)
475
        - 0: check for exclusive ownership
476
        - > 0: check for shared ownership
477

478
    """
479
    self.__lock.acquire()
480
    try:
481
      return self.__is_owned(shared=shared)
482
    finally:
483
      self.__lock.release()
484

    
485
  def _count_pending(self):
486
    """Returns the number of pending acquires.
487

488
    @rtype: int
489

490
    """
491
    self.__lock.acquire()
492
    try:
493
      return len(self.__pending)
494
    finally:
495
      self.__lock.release()
496

    
497
  def __do_acquire(self, shared):
498
    """Actually acquire the lock.
499

500
    """
501
    if shared:
502
      self.__shr.add(threading.currentThread())
503
    else:
504
      self.__exc = threading.currentThread()
505

    
506
  def __can_acquire(self, shared):
507
    """Determine whether lock can be acquired.
508

509
    """
510
    if shared:
511
      return self.__exc is None
512
    else:
513
      return len(self.__shr) == 0 and self.__exc is None
514

    
515
  def __is_on_top(self, cond):
516
    """Checks whether the passed condition is on top of the queue.
517

518
    The caller must make sure the queue isn't empty.
519

520
    """
521
    return self.__pending[0] == cond
522

    
523
  def __acquire_unlocked(self, shared=0, timeout=None):
524
    """Acquire a shared lock.
525

526
    @param shared: whether to acquire in shared mode; by default an
527
        exclusive lock will be acquired
528
    @param timeout: maximum waiting time before giving up
529

530
    """
531
    self.__check_deleted()
532

    
533
    # We cannot acquire the lock if we already have it
534
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
535

    
536
    # Check whether someone else holds the lock or there are pending acquires.
537
    if not self.__pending and self.__can_acquire(shared):
538
      # Apparently not, can acquire lock directly.
539
      self.__do_acquire(shared)
540
      return True
541

    
542
    if shared:
543
      wait_condition = self.__active_shr_c
544

    
545
      # Check if we're not yet in the queue
546
      if wait_condition not in self.__pending:
547
        self.__pending.append(wait_condition)
548
    else:
549
      wait_condition = self.__condition_class(self.__lock)
550
      # Always add to queue
551
      self.__pending.append(wait_condition)
552

    
553
    try:
554
      # Wait until we become the topmost acquire in the queue or the timeout
555
      # expires.
556
      while not (self.__is_on_top(wait_condition) and
557
                 self.__can_acquire(shared)):
558
        # Wait for notification
559
        wait_condition.wait(timeout)
560
        self.__check_deleted()
561

    
562
        # A lot of code assumes blocking acquires always succeed. Loop
563
        # internally for that case.
564
        if timeout is not None:
565
          break
566

    
567
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
568
        self.__do_acquire(shared)
569
        return True
570
    finally:
571
      # Remove condition from queue if there are no more waiters
572
      if not wait_condition.has_waiting() and not self.__deleted:
573
        self.__pending.remove(wait_condition)
574

    
575
    return False
576

    
577
  def acquire(self, shared=0, timeout=None):
578
    """Acquire a shared lock.
579

580
    @type shared: int
581
    @param shared: whether to acquire in shared mode; by default an
582
        exclusive lock will be acquired
583
    @type timeout: float
584
    @param timeout: maximum waiting time before giving up
585

586
    """
587
    self.__lock.acquire()
588
    try:
589
      return self.__acquire_unlocked(shared, timeout)
590
    finally:
591
      self.__lock.release()
592

    
593
  def release(self):
594
    """Release a Shared Lock.
595

596
    You must have acquired the lock, either in shared or in exclusive mode,
597
    before calling this function.
598

599
    """
600
    self.__lock.acquire()
601
    try:
602
      assert self.__is_exclusive() or self.__is_sharer(), \
603
        "Cannot release non-owned lock"
604

    
605
      # Autodetect release type
606
      if self.__is_exclusive():
607
        self.__exc = None
608
      else:
609
        self.__shr.remove(threading.currentThread())
610

    
611
      # Notify topmost condition in queue
612
      if self.__pending:
613
        first_condition = self.__pending[0]
614
        first_condition.notifyAll()
615

    
616
        if first_condition == self.__active_shr_c:
617
          self.__active_shr_c = self.__inactive_shr_c
618
          self.__inactive_shr_c = first_condition
619

    
620
    finally:
621
      self.__lock.release()
622

    
623
  def delete(self, timeout=None):
624
    """Delete a Shared Lock.
625

626
    This operation will declare the lock for removal. First the lock will be
627
    acquired in exclusive mode if you don't already own it, then the lock
628
    will be put in a state where any future and pending acquire() fail.
629

630
    @type timeout: float
631
    @param timeout: maximum waiting time before giving up
632

633
    """
634
    self.__lock.acquire()
635
    try:
636
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
637

    
638
      self.__check_deleted()
639

    
640
      # The caller is allowed to hold the lock exclusively already.
641
      acquired = self.__is_exclusive()
642

    
643
      if not acquired:
644
        acquired = self.__acquire_unlocked(timeout)
645

    
646
      if acquired:
647
        self.__deleted = True
648
        self.__exc = None
649

    
650
        # Notify all acquires. They'll throw an error.
651
        while self.__pending:
652
          self.__pending.pop().notifyAll()
653

    
654
      return acquired
655
    finally:
656
      self.__lock.release()
657

    
658

    
659
# Whenever we want to acquire a full LockSet we pass None as the value
660
# to acquire.  Hide this behind this nicely named constant.
661
ALL_SET = None
662

    
663

    
664
class LockSet:
665
  """Implements a set of locks.
666

667
  This abstraction implements a set of shared locks for the same resource type,
668
  distinguished by name. The user can lock a subset of the resources and the
669
  LockSet will take care of acquiring the locks always in the same order, thus
670
  preventing deadlock.
671

672
  All the locks needed in the same set must be acquired together, though.
673

674
  """
675
  def __init__(self, members=None):
676
    """Constructs a new LockSet.
677

678
    @param members: initial members of the set
679

680
    """
681
    # Used internally to guarantee coherency.
682
    self.__lock = SharedLock()
683

    
684
    # The lockdict indexes the relationship name -> lock
685
    # The order-of-locking is implied by the alphabetical order of names
686
    self.__lockdict = {}
687

    
688
    if members is not None:
689
      for name in members:
690
        self.__lockdict[name] = SharedLock()
691

    
692
    # The owner dict contains the set of locks each thread owns. For
693
    # performance each thread can access its own key without a global lock on
694
    # this structure. It is paramount though that *no* other type of access is
695
    # done to this structure (eg. no looping over its keys). *_owner helper
696
    # function are defined to guarantee access is correct, but in general never
697
    # do anything different than __owners[threading.currentThread()], or there
698
    # will be trouble.
699
    self.__owners = {}
700

    
701
  def _is_owned(self):
702
    """Is the current thread a current level owner?"""
703
    return threading.currentThread() in self.__owners
704

    
705
  def _add_owned(self, name=None):
706
    """Note the current thread owns the given lock"""
707
    if name is None:
708
      if not self._is_owned():
709
        self.__owners[threading.currentThread()] = set()
710
    else:
711
      if self._is_owned():
712
        self.__owners[threading.currentThread()].add(name)
713
      else:
714
        self.__owners[threading.currentThread()] = set([name])
715

    
716
  def _del_owned(self, name=None):
717
    """Note the current thread owns the given lock"""
718

    
719
    if name is not None:
720
      self.__owners[threading.currentThread()].remove(name)
721

    
722
    # Only remove the key if we don't hold the set-lock as well
723
    if (not self.__lock._is_owned() and
724
        not self.__owners[threading.currentThread()]):
725
      del self.__owners[threading.currentThread()]
726

    
727
  def _list_owned(self):
728
    """Get the set of resource names owned by the current thread"""
729
    if self._is_owned():
730
      return self.__owners[threading.currentThread()].copy()
731
    else:
732
      return set()
733

    
734
  def __names(self):
735
    """Return the current set of names.
736

737
    Only call this function while holding __lock and don't iterate on the
738
    result after releasing the lock.
739

740
    """
741
    return self.__lockdict.keys()
742

    
743
  def _names(self):
744
    """Return a copy of the current set of elements.
745

746
    Used only for debugging purposes.
747

748
    """
749
    # If we don't already own the set-level lock acquired
750
    # we'll get it and note we need to release it later.
751
    release_lock = False
752
    if not self.__lock._is_owned():
753
      release_lock = True
754
      self.__lock.acquire(shared=1)
755
    try:
756
      result = self.__names()
757
    finally:
758
      if release_lock:
759
        self.__lock.release()
760
    return set(result)
761

    
762
  def acquire(self, names, blocking=1, shared=0):
763
    """Acquire a set of resource locks.
764

765
    @param names: the names of the locks which shall be acquired
766
        (special lock names, or instance/node names)
767
    @param shared: whether to acquire in shared mode; by default an
768
        exclusive lock will be acquired
769
    @param blocking: whether to block while trying to acquire or to
770
        operate in try-lock mode (this locking mode is not supported yet)
771

772
    @return: True when all the locks are successfully acquired
773

774
    @raise errors.LockError: when any lock we try to acquire has
775
        been deleted before we succeed. In this case none of the
776
        locks requested will be acquired.
777

778
    """
779
    if not blocking:
780
      # We don't have non-blocking mode for now
781
      raise NotImplementedError
782

    
783
    # Check we don't already own locks at this level
784
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
785

    
786
    if names is None:
787
      # If no names are given acquire the whole set by not letting new names
788
      # being added before we release, and getting the current list of names.
789
      # Some of them may then be deleted later, but we'll cope with this.
790
      #
791
      # We'd like to acquire this lock in a shared way, as it's nice if
792
      # everybody else can use the instances at the same time. If are acquiring
793
      # them exclusively though they won't be able to do this anyway, though,
794
      # so we'll get the list lock exclusively as well in order to be able to
795
      # do add() on the set while owning it.
796
      self.__lock.acquire(shared=shared)
797
      try:
798
        # note we own the set-lock
799
        self._add_owned()
800
        names = self.__names()
801
      except:
802
        # We shouldn't have problems adding the lock to the owners list, but
803
        # if we did we'll try to release this lock and re-raise exception.
804
        # Of course something is going to be really wrong, after this.
805
        self.__lock.release()
806
        raise
807

    
808
    try:
809
      # Support passing in a single resource to acquire rather than many
810
      if isinstance(names, basestring):
811
        names = [names]
812
      else:
813
        names = sorted(names)
814

    
815
      acquire_list = []
816
      # First we look the locks up on __lockdict. We have no way of being sure
817
      # they will still be there after, but this makes it a lot faster should
818
      # just one of them be the already wrong
819
      for lname in utils.UniqueSequence(names):
820
        try:
821
          lock = self.__lockdict[lname] # raises KeyError if lock is not there
822
          acquire_list.append((lname, lock))
823
        except (KeyError):
824
          if self.__lock._is_owned():
825
            # We are acquiring all the set, it doesn't matter if this
826
            # particular element is not there anymore.
827
            continue
828
          else:
829
            raise errors.LockError('non-existing lock in set (%s)' % lname)
830

    
831
      # This will hold the locknames we effectively acquired.
832
      acquired = set()
833
      # Now acquire_list contains a sorted list of resources and locks we want.
834
      # In order to get them we loop on this (private) list and acquire() them.
835
      # We gave no real guarantee they will still exist till this is done but
836
      # .acquire() itself is safe and will alert us if the lock gets deleted.
837
      for (lname, lock) in acquire_list:
838
        try:
839
          lock.acquire(shared=shared) # raises LockError if the lock is deleted
840
          # now the lock cannot be deleted, we have it!
841
          self._add_owned(name=lname)
842
          acquired.add(lname)
843
        except (errors.LockError):
844
          if self.__lock._is_owned():
845
            # We are acquiring all the set, it doesn't matter if this
846
            # particular element is not there anymore.
847
            continue
848
          else:
849
            name_fail = lname
850
            for lname in self._list_owned():
851
              self.__lockdict[lname].release()
852
              self._del_owned(name=lname)
853
            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
854
        except:
855
          # We shouldn't have problems adding the lock to the owners list, but
856
          # if we did we'll try to release this lock and re-raise exception.
857
          # Of course something is going to be really wrong, after this.
858
          if lock._is_owned():
859
            lock.release()
860
          raise
861

    
862
    except:
863
      # If something went wrong and we had the set-lock let's release it...
864
      if self.__lock._is_owned():
865
        self.__lock.release()
866
      raise
867

    
868
    return acquired
869

    
870
  def release(self, names=None):
871
    """Release a set of resource locks, at the same level.
872

873
    You must have acquired the locks, either in shared or in exclusive mode,
874
    before releasing them.
875

876
    @param names: the names of the locks which shall be released
877
        (defaults to all the locks acquired at that level).
878

879
    """
880
    assert self._is_owned(), "release() on lock set while not owner"
881

    
882
    # Support passing in a single resource to release rather than many
883
    if isinstance(names, basestring):
884
      names = [names]
885

    
886
    if names is None:
887
      names = self._list_owned()
888
    else:
889
      names = set(names)
890
      assert self._list_owned().issuperset(names), (
891
               "release() on unheld resources %s" %
892
               names.difference(self._list_owned()))
893

    
894
    # First of all let's release the "all elements" lock, if set.
895
    # After this 'add' can work again
896
    if self.__lock._is_owned():
897
      self.__lock.release()
898
      self._del_owned()
899

    
900
    for lockname in names:
901
      # If we are sure the lock doesn't leave __lockdict without being
902
      # exclusively held we can do this...
903
      self.__lockdict[lockname].release()
904
      self._del_owned(name=lockname)
905

    
906
  def add(self, names, acquired=0, shared=0):
907
    """Add a new set of elements to the set
908

909
    @param names: names of the new elements to add
910
    @param acquired: pre-acquire the new resource?
911
    @param shared: is the pre-acquisition shared?
912

913
    """
914
    # Check we don't already own locks at this level
915
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
916
      "Cannot add locks if the set is only partially owned, or shared"
917

    
918
    # Support passing in a single resource to add rather than many
919
    if isinstance(names, basestring):
920
      names = [names]
921

    
922
    # If we don't already own the set-level lock acquired in an exclusive way
923
    # we'll get it and note we need to release it later.
924
    release_lock = False
925
    if not self.__lock._is_owned():
926
      release_lock = True
927
      self.__lock.acquire()
928

    
929
    try:
930
      invalid_names = set(self.__names()).intersection(names)
931
      if invalid_names:
932
        # This must be an explicit raise, not an assert, because assert is
933
        # turned off when using optimization, and this can happen because of
934
        # concurrency even if the user doesn't want it.
935
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
936

    
937
      for lockname in names:
938
        lock = SharedLock()
939

    
940
        if acquired:
941
          lock.acquire(shared=shared)
942
          # now the lock cannot be deleted, we have it!
943
          try:
944
            self._add_owned(name=lockname)
945
          except:
946
            # We shouldn't have problems adding the lock to the owners list,
947
            # but if we did we'll try to release this lock and re-raise
948
            # exception.  Of course something is going to be really wrong,
949
            # after this.  On the other hand the lock hasn't been added to the
950
            # __lockdict yet so no other threads should be pending on it. This
951
            # release is just a safety measure.
952
            lock.release()
953
            raise
954

    
955
        self.__lockdict[lockname] = lock
956

    
957
    finally:
958
      # Only release __lock if we were not holding it previously.
959
      if release_lock:
960
        self.__lock.release()
961

    
962
    return True
963

    
964
  def remove(self, names, blocking=1):
965
    """Remove elements from the lock set.
966

967
    You can either not hold anything in the lockset or already hold a superset
968
    of the elements you want to delete, exclusively.
969

970
    @param names: names of the resource to remove.
971
    @param blocking: whether to block while trying to acquire or to
972
        operate in try-lock mode (this locking mode is not supported
973
        yet unless you are already holding exclusively the locks)
974

975
    @return:: a list of locks which we removed; the list is always
976
        equal to the names list if we were holding all the locks
977
        exclusively
978

979
    """
980
    if not blocking and not self._is_owned():
981
      # We don't have non-blocking mode for now
982
      raise NotImplementedError
983

    
984
    # Support passing in a single resource to remove rather than many
985
    if isinstance(names, basestring):
986
      names = [names]
987

    
988
    # If we own any subset of this lock it must be a superset of what we want
989
    # to delete. The ownership must also be exclusive, but that will be checked
990
    # by the lock itself.
991
    assert not self._is_owned() or self._list_owned().issuperset(names), (
992
      "remove() on acquired lockset while not owning all elements")
993

    
994
    removed = []
995

    
996
    for lname in names:
997
      # Calling delete() acquires the lock exclusively if we don't already own
998
      # it, and causes all pending and subsequent lock acquires to fail. It's
999
      # fine to call it out of order because delete() also implies release(),
1000
      # and the assertion above guarantees that if we either already hold
1001
      # everything we want to delete, or we hold none.
1002
      try:
1003
        self.__lockdict[lname].delete()
1004
        removed.append(lname)
1005
      except (KeyError, errors.LockError):
1006
        # This cannot happen if we were already holding it, verify:
1007
        assert not self._is_owned(), "remove failed while holding lockset"
1008
      else:
1009
        # If no LockError was raised we are the ones who deleted the lock.
1010
        # This means we can safely remove it from lockdict, as any further or
1011
        # pending delete() or acquire() will fail (and nobody can have the lock
1012
        # since before our call to delete()).
1013
        #
1014
        # This is done in an else clause because if the exception was thrown
1015
        # it's the job of the one who actually deleted it.
1016
        del self.__lockdict[lname]
1017
        # And let's remove it from our private list if we owned it.
1018
        if self._is_owned():
1019
          self._del_owned(name=lname)
1020

    
1021
    return removed
1022

    
1023

    
1024
# Locking levels, must be acquired in increasing order.
1025
# Current rules are:
1026
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1027
#   acquired before performing any operation, either in shared or in exclusive
1028
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1029
#   avoided.
1030
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1031
#   If you need more than one node, or more than one instance, acquire them at
1032
#   the same time.
1033
LEVEL_CLUSTER = 0
1034
LEVEL_INSTANCE = 1
1035
LEVEL_NODE = 2
1036

    
1037
LEVELS = [LEVEL_CLUSTER,
1038
          LEVEL_INSTANCE,
1039
          LEVEL_NODE]
1040

    
1041
# Lock levels which are modifiable
1042
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1043

    
1044
LEVEL_NAMES = {
1045
  LEVEL_CLUSTER: "cluster",
1046
  LEVEL_INSTANCE: "instance",
1047
  LEVEL_NODE: "node",
1048
  }
1049

    
1050
# Constant for the big ganeti lock
1051
BGL = 'BGL'
1052

    
1053

    
1054
class GanetiLockManager:
1055
  """The Ganeti Locking Library
1056

1057
  The purpose of this small library is to manage locking for ganeti clusters
1058
  in a central place, while at the same time doing dynamic checks against
1059
  possible deadlocks. It will also make it easier to transition to a different
1060
  lock type should we migrate away from python threads.
1061

1062
  """
1063
  _instance = None
1064

    
1065
  def __init__(self, nodes=None, instances=None):
1066
    """Constructs a new GanetiLockManager object.
1067

1068
    There should be only a GanetiLockManager object at any time, so this
1069
    function raises an error if this is not the case.
1070

1071
    @param nodes: list of node names
1072
    @param instances: list of instance names
1073

1074
    """
1075
    assert self.__class__._instance is None, \
1076
           "double GanetiLockManager instance"
1077

    
1078
    self.__class__._instance = self
1079

    
1080
    # The keyring contains all the locks, at their level and in the correct
1081
    # locking order.
1082
    self.__keyring = {
1083
      LEVEL_CLUSTER: LockSet([BGL]),
1084
      LEVEL_NODE: LockSet(nodes),
1085
      LEVEL_INSTANCE: LockSet(instances),
1086
    }
1087

    
1088
  def _names(self, level):
1089
    """List the lock names at the given level.
1090

1091
    This can be used for debugging/testing purposes.
1092

1093
    @param level: the level whose list of locks to get
1094

1095
    """
1096
    assert level in LEVELS, "Invalid locking level %s" % level
1097
    return self.__keyring[level]._names()
1098

    
1099
  def _is_owned(self, level):
1100
    """Check whether we are owning locks at the given level
1101

1102
    """
1103
    return self.__keyring[level]._is_owned()
1104

    
1105
  is_owned = _is_owned
1106

    
1107
  def _list_owned(self, level):
1108
    """Get the set of owned locks at the given level
1109

1110
    """
1111
    return self.__keyring[level]._list_owned()
1112

    
1113
  def _upper_owned(self, level):
1114
    """Check that we don't own any lock at a level greater than the given one.
1115

1116
    """
1117
    # This way of checking only works if LEVELS[i] = i, which we check for in
1118
    # the test cases.
1119
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1120

    
1121
  def _BGL_owned(self):
1122
    """Check if the current thread owns the BGL.
1123

1124
    Both an exclusive or a shared acquisition work.
1125

1126
    """
1127
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1128

    
1129
  def _contains_BGL(self, level, names):
1130
    """Check if the level contains the BGL.
1131

1132
    Check if acting on the given level and set of names will change
1133
    the status of the Big Ganeti Lock.
1134

1135
    """
1136
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1137

    
1138
  def acquire(self, level, names, blocking=1, shared=0):
1139
    """Acquire a set of resource locks, at the same level.
1140

1141
    @param level: the level at which the locks shall be acquired;
1142
        it must be a member of LEVELS.
1143
    @param names: the names of the locks which shall be acquired
1144
        (special lock names, or instance/node names)
1145
    @param shared: whether to acquire in shared mode; by default
1146
        an exclusive lock will be acquired
1147
    @param blocking: whether to block while trying to acquire or to
1148
        operate in try-lock mode (this locking mode is not supported yet)
1149

1150
    """
1151
    assert level in LEVELS, "Invalid locking level %s" % level
1152

    
1153
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1154
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1155
    # so even if we've migrated we need to at least share the BGL to be
1156
    # compatible with them. Of course if we own the BGL exclusively there's no
1157
    # point in acquiring any other lock, unless perhaps we are half way through
1158
    # the migration of the current opcode.
1159
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1160
            "You must own the Big Ganeti Lock before acquiring any other")
1161

    
1162
    # Check we don't own locks at the same or upper levels.
1163
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1164
           " while owning some at a greater one")
1165

    
1166
    # Acquire the locks in the set.
1167
    return self.__keyring[level].acquire(names, shared=shared,
1168
                                         blocking=blocking)
1169

    
1170
  def release(self, level, names=None):
1171
    """Release a set of resource locks, at the same level.
1172

1173
    You must have acquired the locks, either in shared or in exclusive
1174
    mode, before releasing them.
1175

1176
    @param level: the level at which the locks shall be released;
1177
        it must be a member of LEVELS
1178
    @param names: the names of the locks which shall be released
1179
        (defaults to all the locks acquired at that level)
1180

1181
    """
1182
    assert level in LEVELS, "Invalid locking level %s" % level
1183
    assert (not self._contains_BGL(level, names) or
1184
            not self._upper_owned(LEVEL_CLUSTER)), (
1185
            "Cannot release the Big Ganeti Lock while holding something"
1186
            " at upper levels")
1187

    
1188
    # Release will complain if we don't own the locks already
1189
    return self.__keyring[level].release(names)
1190

    
1191
  def add(self, level, names, acquired=0, shared=0):
1192
    """Add locks at the specified level.
1193

1194
    @param level: the level at which the locks shall be added;
1195
        it must be a member of LEVELS_MOD.
1196
    @param names: names of the locks to acquire
1197
    @param acquired: whether to acquire the newly added locks
1198
    @param shared: whether the acquisition will be shared
1199

1200
    """
1201
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1202
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1203
           " operations")
1204
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1205
           " while owning some at a greater one")
1206
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1207

    
1208
  def remove(self, level, names, blocking=1):
1209
    """Remove locks from the specified level.
1210

1211
    You must either already own the locks you are trying to remove
1212
    exclusively or not own any lock at an upper level.
1213

1214
    @param level: the level at which the locks shall be removed;
1215
        it must be a member of LEVELS_MOD
1216
    @param names: the names of the locks which shall be removed
1217
        (special lock names, or instance/node names)
1218
    @param blocking: whether to block while trying to operate in
1219
        try-lock mode (this locking mode is not supported yet)
1220

1221
    """
1222
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1223
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1224
           " operations")
1225
    # Check we either own the level or don't own anything from here
1226
    # up. LockSet.remove() will check the case in which we don't own
1227
    # all the needed resources, or we have a shared ownership.
1228
    assert self._is_owned(level) or not self._upper_owned(level), (
1229
           "Cannot remove locks at a level while not owning it or"
1230
           " owning some at a greater one")
1231
    return self.__keyring[level].remove(names, blocking=blocking)