Statistics
| Branch: | Tag: | Revision:

root / lib / locking.py @ a66bd91b

History | View | Annotate | Download (36.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Module implementing the Ganeti locking code."""
22

    
23
import os
24
import select
25
import threading
26
import time
27
import errno
28

    
29
from ganeti import errors
30
from ganeti import utils
31

    
32

    
33
def ssynchronized(lock, shared=0):
34
  """Shared Synchronization decorator.
35

36
  Calls the function holding the given lock, either in exclusive or shared
37
  mode. It requires the passed lock to be a SharedLock (or support its
38
  semantics).
39

40
  """
41
  def wrap(fn):
42
    def sync_function(*args, **kwargs):
43
      lock.acquire(shared=shared)
44
      try:
45
        return fn(*args, **kwargs)
46
      finally:
47
        lock.release()
48
    return sync_function
49
  return wrap
50

    
51

    
52
class _SingleActionPipeConditionWaiter(object):
53
  """Callable helper class for _SingleActionPipeCondition.
54

55
  """
56
  __slots__ = [
57
    "_cond",
58
    "_fd",
59
    "_poller",
60
    ]
61

    
62
  def __init__(self, cond, poller, fd):
63
    """Initializes this class.
64

65
    @type cond: L{_SingleActionPipeCondition}
66
    @param cond: Parent condition
67
    @type poller: select.poll
68
    @param poller: Poller object
69
    @type fd: int
70
    @param fd: File descriptor to wait for
71

72
    """
73
    object.__init__(self)
74

    
75
    self._cond = cond
76
    self._poller = poller
77
    self._fd = fd
78

    
79
  def __call__(self, timeout):
80
    """Wait for something to happen on the pipe.
81

82
    @type timeout: float or None
83
    @param timeout: Timeout for waiting (can be None)
84

85
    """
86
    start_time = time.time()
87
    remaining_time = timeout
88

    
89
    while timeout is None or remaining_time > 0:
90
      try:
91
        result = self._poller.poll(remaining_time)
92
      except EnvironmentError, err:
93
        if err.errno != errno.EINTR:
94
          raise
95
        result = None
96

    
97
      # Check whether we were notified
98
      if result and result[0][0] == self._fd:
99
        break
100

    
101
      # Re-calculate timeout if necessary
102
      if timeout is not None:
103
        remaining_time = start_time + timeout - time.time()
104

    
105

    
106
class _SingleActionPipeCondition(object):
107
  """Wrapper around a pipe for usage inside conditions.
108

109
  This class contains a POSIX pipe(2) and a poller to poll it. The pipe is
110
  always allocated when constructing the class. Extra care is taken to always
111
  close the file descriptors.
112

113
  An additional class, L{_SingleActionPipeConditionWaiter}, is used to wait for
114
  notifications.
115

116
  Warning: This class is designed to be used as the underlying component of a
117
  locking condition, but is not by itself thread safe, and needs to be
118
  protected by an external lock.
119

120
  """
121
  __slots__ = [
122
    "_poller",
123
    "_read_fd",
124
    "_write_fd",
125
    "_nwaiters",
126
    ]
127

    
128
  _waiter_class = _SingleActionPipeConditionWaiter
129

    
130
  def __init__(self):
131
    """Initializes this class.
132

133
    """
134
    object.__init__(self)
135

    
136
    self._nwaiters = 0
137

    
138
    # Just assume the unpacking is successful, otherwise error handling gets
139
    # very complicated.
140
    (self._read_fd, self._write_fd) = os.pipe()
141
    try:
142
      # The poller looks for closure of the write side
143
      poller = select.poll()
144
      poller.register(self._read_fd, select.POLLHUP)
145

    
146
      self._poller = poller
147
    except:
148
      if self._read_fd is not None:
149
        os.close(self._read_fd)
150
      if self._write_fd is not None:
151
        os.close(self._write_fd)
152
      raise
153

    
154
    # There should be no code here anymore, otherwise the pipe file descriptors
155
    # may be not be cleaned up properly in case of errors.
156

    
157
  def StartWaiting(self):
158
    """Return function to wait for notification.
159

160
    @rtype: L{_SingleActionPipeConditionWaiter}
161
    @return: Function to wait for notification
162

163
    """
164
    assert self._nwaiters >= 0
165

    
166
    if self._poller is None:
167
      raise RuntimeError("Already cleaned up")
168

    
169
    # Create waiter function and increase number of waiters
170
    wait_fn = self._waiter_class(self, self._poller, self._read_fd)
171
    self._nwaiters += 1
172
    return wait_fn
173

    
174
  def DoneWaiting(self):
175
    """Decrement number of waiters and automatic cleanup.
176

177
    Must be called after waiting for a notification.
178

179
    @rtype: bool
180
    @return: Whether this was the last waiter
181

182
    """
183
    assert self._nwaiters > 0
184

    
185
    self._nwaiters -= 1
186

    
187
    if self._nwaiters == 0:
188
      self._Cleanup()
189
      return True
190

    
191
    return False
192

    
193
  def notifyAll(self):
194
    """Close the writing side of the pipe to notify all waiters.
195

196
    """
197
    if self._write_fd is None:
198
      raise RuntimeError("Can only notify once")
199

    
200
    os.close(self._write_fd)
201
    self._write_fd = None
202

    
203
  def _Cleanup(self):
204
    """Close all file descriptors.
205

206
    """
207
    if self._read_fd is not None:
208
      os.close(self._read_fd)
209
      self._read_fd = None
210

    
211
    if self._write_fd is not None:
212
      os.close(self._write_fd)
213
      self._write_fd = None
214

    
215
    self._poller = None
216

    
217
  def __del__(self):
218
    """Called on object deletion.
219

220
    Ensure no file descriptors are left open.
221

222
    """
223
    self._Cleanup()
224

    
225

    
226
class _PipeCondition(object):
227
  """Group-only non-polling condition with counters.
228

229
  This condition class uses pipes and poll, internally, to be able to wait for
230
  notification with a timeout, without resorting to polling. It is almost
231
  compatible with Python's threading.Condition, but only supports notifyAll and
232
  non-recursive locks. As an additional features it's able to report whether
233
  there are any waiting threads.
234

235
  """
236
  __slots__ = [
237
    "_lock",
238
    "_nwaiters",
239
    "_pipe",
240
    "acquire",
241
    "release",
242
    ]
243

    
244
  _pipe_class = _SingleActionPipeCondition
245

    
246
  def __init__(self, lock):
247
    """Initializes this class.
248

249
    """
250
    object.__init__(self)
251

    
252
    # Recursive locks are not supported
253
    assert not hasattr(lock, "_acquire_restore")
254
    assert not hasattr(lock, "_release_save")
255

    
256
    self._lock = lock
257

    
258
    # Export the lock's acquire() and release() methods
259
    self.acquire = lock.acquire
260
    self.release = lock.release
261

    
262
    self._nwaiters = 0
263
    self._pipe = None
264

    
265
  def _is_owned(self):
266
    """Check whether lock is owned by current thread.
267

268
    """
269
    if self._lock.acquire(0):
270
      self._lock.release()
271
      return False
272

    
273
    return True
274

    
275
  def _check_owned(self):
276
    """Raise an exception if the current thread doesn't own the lock.
277

278
    """
279
    if not self._is_owned():
280
      raise RuntimeError("cannot work with un-aquired lock")
281

    
282
  def wait(self, timeout=None):
283
    """Wait for a notification.
284

285
    @type timeout: float or None
286
    @param timeout: Waiting timeout (can be None)
287

288
    """
289
    self._check_owned()
290

    
291
    if not self._pipe:
292
      self._pipe = self._pipe_class()
293

    
294
    # Keep local reference to the pipe. It could be replaced by another thread
295
    # notifying while we're waiting.
296
    pipe = self._pipe
297

    
298
    assert self._nwaiters >= 0
299
    self._nwaiters += 1
300
    try:
301
      # Get function to wait on the pipe
302
      wait_fn = pipe.StartWaiting()
303
      try:
304
        # Release lock while waiting
305
        self.release()
306
        try:
307
          # Wait for notification
308
          wait_fn(timeout)
309
        finally:
310
          # Re-acquire lock
311
          self.acquire()
312
      finally:
313
        # Destroy pipe if this was the last waiter and the current pipe is
314
        # still the same. The same pipe cannot be reused after cleanup.
315
        if pipe.DoneWaiting() and pipe == self._pipe:
316
          self._pipe = None
317
    finally:
318
      assert self._nwaiters > 0
319
      self._nwaiters -= 1
320

    
321
  def notifyAll(self):
322
    """Notify all currently waiting threads.
323

324
    """
325
    self._check_owned()
326

    
327
    # Notify and forget pipe. A new one will be created on the next call to
328
    # wait.
329
    if self._pipe is not None:
330
      self._pipe.notifyAll()
331
      self._pipe = None
332

    
333
  def has_waiting(self):
334
    """Returns whether there are active waiters.
335

336
    """
337
    self._check_owned()
338

    
339
    return bool(self._nwaiters)
340

    
341

    
342
class _CountingCondition(object):
343
  """Wrapper for Python's built-in threading.Condition class.
344

345
  This wrapper keeps a count of active waiters. We can't access the internal
346
  "__waiters" attribute of threading.Condition because it's not thread-safe.
347

348
  """
349
  __slots__ = [
350
    "_cond",
351
    "_nwaiters",
352
    ]
353

    
354
  def __init__(self, lock):
355
    """Initializes this class.
356

357
    """
358
    object.__init__(self)
359
    self._cond = threading.Condition(lock=lock)
360
    self._nwaiters = 0
361

    
362
  def notifyAll(self):
363
    """Notifies the condition.
364

365
    """
366
    return self._cond.notifyAll()
367

    
368
  def wait(self, timeout=None):
369
    """Waits for the condition to be notified.
370

371
    @type timeout: float or None
372
    @param timeout: Timeout in seconds
373

374
    """
375
    assert self._nwaiters >= 0
376

    
377
    self._nwaiters += 1
378
    try:
379
      return self._cond.wait(timeout=timeout)
380
    finally:
381
      self._nwaiters -= 1
382

    
383
  def has_waiting(self):
384
    """Returns whether there are active waiters.
385

386
    """
387
    return bool(self._nwaiters)
388

    
389

    
390
class SharedLock(object):
391
  """Implements a shared lock.
392

393
  Multiple threads can acquire the lock in a shared way, calling
394
  acquire_shared().  In order to acquire the lock in an exclusive way threads
395
  can call acquire_exclusive().
396

397
  The lock prevents starvation but does not guarantee that threads will acquire
398
  the shared lock in the order they queued for it, just that they will
399
  eventually do so.
400

401
  """
402
  __slots__ = [
403
    "__active_shr_c",
404
    "__inactive_shr_c",
405
    "__deleted",
406
    "__exc",
407
    "__lock",
408
    "__pending",
409
    "__shr",
410
    ]
411

    
412
  __condition_class = _PipeCondition
413

    
414
  def __init__(self):
415
    """Construct a new SharedLock.
416

417
    """
418
    object.__init__(self)
419

    
420
    # Internal lock
421
    self.__lock = threading.Lock()
422

    
423
    # Queue containing waiting acquires
424
    self.__pending = []
425

    
426
    # Active and inactive conditions for shared locks
427
    self.__active_shr_c = self.__condition_class(self.__lock)
428
    self.__inactive_shr_c = self.__condition_class(self.__lock)
429

    
430
    # Current lock holders
431
    self.__shr = set()
432
    self.__exc = None
433

    
434
    # is this lock in the deleted state?
435
    self.__deleted = False
436

    
437
  def __check_deleted(self):
438
    """Raises an exception if the lock has been deleted.
439

440
    """
441
    if self.__deleted:
442
      raise errors.LockError("Deleted lock")
443

    
444
  def __is_sharer(self):
445
    """Is the current thread sharing the lock at this time?
446

447
    """
448
    return threading.currentThread() in self.__shr
449

    
450
  def __is_exclusive(self):
451
    """Is the current thread holding the lock exclusively at this time?
452

453
    """
454
    return threading.currentThread() == self.__exc
455

    
456
  def __is_owned(self, shared=-1):
457
    """Is the current thread somehow owning the lock at this time?
458

459
    This is a private version of the function, which presumes you're holding
460
    the internal lock.
461

462
    """
463
    if shared < 0:
464
      return self.__is_sharer() or self.__is_exclusive()
465
    elif shared:
466
      return self.__is_sharer()
467
    else:
468
      return self.__is_exclusive()
469

    
470
  def _is_owned(self, shared=-1):
471
    """Is the current thread somehow owning the lock at this time?
472

473
    @param shared:
474
        - < 0: check for any type of ownership (default)
475
        - 0: check for exclusive ownership
476
        - > 0: check for shared ownership
477

478
    """
479
    self.__lock.acquire()
480
    try:
481
      return self.__is_owned(shared=shared)
482
    finally:
483
      self.__lock.release()
484

    
485
  def _count_pending(self):
486
    """Returns the number of pending acquires.
487

488
    @rtype: int
489

490
    """
491
    self.__lock.acquire()
492
    try:
493
      return len(self.__pending)
494
    finally:
495
      self.__lock.release()
496

    
497
  def __do_acquire(self, shared):
498
    """Actually acquire the lock.
499

500
    """
501
    if shared:
502
      self.__shr.add(threading.currentThread())
503
    else:
504
      self.__exc = threading.currentThread()
505

    
506
  def __can_acquire(self, shared):
507
    """Determine whether lock can be acquired.
508

509
    """
510
    if shared:
511
      return self.__exc is None
512
    else:
513
      return len(self.__shr) == 0 and self.__exc is None
514

    
515
  def __is_on_top(self, cond):
516
    """Checks whether the passed condition is on top of the queue.
517

518
    The caller must make sure the queue isn't empty.
519

520
    """
521
    return self.__pending[0] == cond
522

    
523
  def __acquire_unlocked(self, shared, timeout):
524
    """Acquire a shared lock.
525

526
    @param shared: whether to acquire in shared mode; by default an
527
        exclusive lock will be acquired
528
    @param timeout: maximum waiting time before giving up
529

530
    """
531
    self.__check_deleted()
532

    
533
    # We cannot acquire the lock if we already have it
534
    assert not self.__is_owned(), "double acquire() on a non-recursive lock"
535

    
536
    # Check whether someone else holds the lock or there are pending acquires.
537
    if not self.__pending and self.__can_acquire(shared):
538
      # Apparently not, can acquire lock directly.
539
      self.__do_acquire(shared)
540
      return True
541

    
542
    if shared:
543
      wait_condition = self.__active_shr_c
544

    
545
      # Check if we're not yet in the queue
546
      if wait_condition not in self.__pending:
547
        self.__pending.append(wait_condition)
548
    else:
549
      wait_condition = self.__condition_class(self.__lock)
550
      # Always add to queue
551
      self.__pending.append(wait_condition)
552

    
553
    try:
554
      # Wait until we become the topmost acquire in the queue or the timeout
555
      # expires.
556
      while not (self.__is_on_top(wait_condition) and
557
                 self.__can_acquire(shared)):
558
        # Wait for notification
559
        wait_condition.wait(timeout)
560
        self.__check_deleted()
561

    
562
        # A lot of code assumes blocking acquires always succeed. Loop
563
        # internally for that case.
564
        if timeout is not None:
565
          break
566

    
567
      if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
568
        self.__do_acquire(shared)
569
        return True
570
    finally:
571
      # Remove condition from queue if there are no more waiters
572
      if not wait_condition.has_waiting() and not self.__deleted:
573
        self.__pending.remove(wait_condition)
574

    
575
    return False
576

    
577
  def acquire(self, shared=0, timeout=None):
578
    """Acquire a shared lock.
579

580
    @type shared: int
581
    @param shared: whether to acquire in shared mode; by default an
582
        exclusive lock will be acquired
583
    @type timeout: float
584
    @param timeout: maximum waiting time before giving up
585

586
    """
587
    self.__lock.acquire()
588
    try:
589
      return self.__acquire_unlocked(shared, timeout)
590
    finally:
591
      self.__lock.release()
592

    
593
  def release(self):
594
    """Release a Shared Lock.
595

596
    You must have acquired the lock, either in shared or in exclusive mode,
597
    before calling this function.
598

599
    """
600
    self.__lock.acquire()
601
    try:
602
      assert self.__is_exclusive() or self.__is_sharer(), \
603
        "Cannot release non-owned lock"
604

    
605
      # Autodetect release type
606
      if self.__is_exclusive():
607
        self.__exc = None
608
      else:
609
        self.__shr.remove(threading.currentThread())
610

    
611
      # Notify topmost condition in queue
612
      if self.__pending:
613
        first_condition = self.__pending[0]
614
        first_condition.notifyAll()
615

    
616
        if first_condition == self.__active_shr_c:
617
          self.__active_shr_c = self.__inactive_shr_c
618
          self.__inactive_shr_c = first_condition
619

    
620
    finally:
621
      self.__lock.release()
622

    
623
  def delete(self, timeout=None):
624
    """Delete a Shared Lock.
625

626
    This operation will declare the lock for removal. First the lock will be
627
    acquired in exclusive mode if you don't already own it, then the lock
628
    will be put in a state where any future and pending acquire() fail.
629

630
    @type timeout: float
631
    @param timeout: maximum waiting time before giving up
632

633
    """
634
    self.__lock.acquire()
635
    try:
636
      assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
637

    
638
      self.__check_deleted()
639

    
640
      # The caller is allowed to hold the lock exclusively already.
641
      acquired = self.__is_exclusive()
642

    
643
      if not acquired:
644
        acquired = self.__acquire_unlocked(0, timeout)
645

    
646
        assert self.__is_exclusive() and not self.__is_sharer(), \
647
          "Lock wasn't acquired in exclusive mode"
648

    
649
      if acquired:
650
        self.__deleted = True
651
        self.__exc = None
652

    
653
        # Notify all acquires. They'll throw an error.
654
        while self.__pending:
655
          self.__pending.pop().notifyAll()
656

    
657
      return acquired
658
    finally:
659
      self.__lock.release()
660

    
661

    
662
# Whenever we want to acquire a full LockSet we pass None as the value
663
# to acquire.  Hide this behind this nicely named constant.
664
ALL_SET = None
665

    
666

    
667
class LockSet:
668
  """Implements a set of locks.
669

670
  This abstraction implements a set of shared locks for the same resource type,
671
  distinguished by name. The user can lock a subset of the resources and the
672
  LockSet will take care of acquiring the locks always in the same order, thus
673
  preventing deadlock.
674

675
  All the locks needed in the same set must be acquired together, though.
676

677
  """
678
  def __init__(self, members=None):
679
    """Constructs a new LockSet.
680

681
    @param members: initial members of the set
682

683
    """
684
    # Used internally to guarantee coherency.
685
    self.__lock = SharedLock()
686

    
687
    # The lockdict indexes the relationship name -> lock
688
    # The order-of-locking is implied by the alphabetical order of names
689
    self.__lockdict = {}
690

    
691
    if members is not None:
692
      for name in members:
693
        self.__lockdict[name] = SharedLock()
694

    
695
    # The owner dict contains the set of locks each thread owns. For
696
    # performance each thread can access its own key without a global lock on
697
    # this structure. It is paramount though that *no* other type of access is
698
    # done to this structure (eg. no looping over its keys). *_owner helper
699
    # function are defined to guarantee access is correct, but in general never
700
    # do anything different than __owners[threading.currentThread()], or there
701
    # will be trouble.
702
    self.__owners = {}
703

    
704
  def _is_owned(self):
705
    """Is the current thread a current level owner?"""
706
    return threading.currentThread() in self.__owners
707

    
708
  def _add_owned(self, name=None):
709
    """Note the current thread owns the given lock"""
710
    if name is None:
711
      if not self._is_owned():
712
        self.__owners[threading.currentThread()] = set()
713
    else:
714
      if self._is_owned():
715
        self.__owners[threading.currentThread()].add(name)
716
      else:
717
        self.__owners[threading.currentThread()] = set([name])
718

    
719
  def _del_owned(self, name=None):
720
    """Note the current thread owns the given lock"""
721

    
722
    if name is not None:
723
      self.__owners[threading.currentThread()].remove(name)
724

    
725
    # Only remove the key if we don't hold the set-lock as well
726
    if (not self.__lock._is_owned() and
727
        not self.__owners[threading.currentThread()]):
728
      del self.__owners[threading.currentThread()]
729

    
730
  def _list_owned(self):
731
    """Get the set of resource names owned by the current thread"""
732
    if self._is_owned():
733
      return self.__owners[threading.currentThread()].copy()
734
    else:
735
      return set()
736

    
737
  def __names(self):
738
    """Return the current set of names.
739

740
    Only call this function while holding __lock and don't iterate on the
741
    result after releasing the lock.
742

743
    """
744
    return self.__lockdict.keys()
745

    
746
  def _names(self):
747
    """Return a copy of the current set of elements.
748

749
    Used only for debugging purposes.
750

751
    """
752
    # If we don't already own the set-level lock acquired
753
    # we'll get it and note we need to release it later.
754
    release_lock = False
755
    if not self.__lock._is_owned():
756
      release_lock = True
757
      self.__lock.acquire(shared=1)
758
    try:
759
      result = self.__names()
760
    finally:
761
      if release_lock:
762
        self.__lock.release()
763
    return set(result)
764

    
765
  def acquire(self, names, timeout=None, shared=0):
766
    """Acquire a set of resource locks.
767

768
    @param names: the names of the locks which shall be acquired
769
        (special lock names, or instance/node names)
770
    @param shared: whether to acquire in shared mode; by default an
771
        exclusive lock will be acquired
772
    @type timeout: float
773
    @param timeout: Maximum time to acquire all locks
774

775
    @return: True when all the locks are successfully acquired
776

777
    @raise errors.LockError: when any lock we try to acquire has
778
        been deleted before we succeed. In this case none of the
779
        locks requested will be acquired.
780

781
    """
782
    if timeout is not None:
783
      raise NotImplementedError
784

    
785
    # Check we don't already own locks at this level
786
    assert not self._is_owned(), "Cannot acquire locks in the same set twice"
787

    
788
    if names is None:
789
      # If no names are given acquire the whole set by not letting new names
790
      # being added before we release, and getting the current list of names.
791
      # Some of them may then be deleted later, but we'll cope with this.
792
      #
793
      # We'd like to acquire this lock in a shared way, as it's nice if
794
      # everybody else can use the instances at the same time. If are acquiring
795
      # them exclusively though they won't be able to do this anyway, though,
796
      # so we'll get the list lock exclusively as well in order to be able to
797
      # do add() on the set while owning it.
798
      self.__lock.acquire(shared=shared)
799
      try:
800
        # note we own the set-lock
801
        self._add_owned()
802
        names = self.__names()
803
      except:
804
        # We shouldn't have problems adding the lock to the owners list, but
805
        # if we did we'll try to release this lock and re-raise exception.
806
        # Of course something is going to be really wrong, after this.
807
        self.__lock.release()
808
        raise
809

    
810
    try:
811
      # Support passing in a single resource to acquire rather than many
812
      if isinstance(names, basestring):
813
        names = [names]
814
      else:
815
        names = sorted(names)
816

    
817
      acquire_list = []
818
      # First we look the locks up on __lockdict. We have no way of being sure
819
      # they will still be there after, but this makes it a lot faster should
820
      # just one of them be the already wrong
821
      for lname in utils.UniqueSequence(names):
822
        try:
823
          lock = self.__lockdict[lname] # raises KeyError if lock is not there
824
          acquire_list.append((lname, lock))
825
        except (KeyError):
826
          if self.__lock._is_owned():
827
            # We are acquiring all the set, it doesn't matter if this
828
            # particular element is not there anymore.
829
            continue
830
          else:
831
            raise errors.LockError('non-existing lock in set (%s)' % lname)
832

    
833
      # This will hold the locknames we effectively acquired.
834
      acquired = set()
835
      # Now acquire_list contains a sorted list of resources and locks we want.
836
      # In order to get them we loop on this (private) list and acquire() them.
837
      # We gave no real guarantee they will still exist till this is done but
838
      # .acquire() itself is safe and will alert us if the lock gets deleted.
839
      for (lname, lock) in acquire_list:
840
        try:
841
          lock.acquire(shared=shared) # raises LockError if the lock is deleted
842
          # now the lock cannot be deleted, we have it!
843
          self._add_owned(name=lname)
844
          acquired.add(lname)
845
        except (errors.LockError):
846
          if self.__lock._is_owned():
847
            # We are acquiring all the set, it doesn't matter if this
848
            # particular element is not there anymore.
849
            continue
850
          else:
851
            name_fail = lname
852
            for lname in self._list_owned():
853
              self.__lockdict[lname].release()
854
              self._del_owned(name=lname)
855
            raise errors.LockError('non-existing lock in set (%s)' % name_fail)
856
        except:
857
          # We shouldn't have problems adding the lock to the owners list, but
858
          # if we did we'll try to release this lock and re-raise exception.
859
          # Of course something is going to be really wrong, after this.
860
          if lock._is_owned():
861
            lock.release()
862
          raise
863

    
864
    except:
865
      # If something went wrong and we had the set-lock let's release it...
866
      if self.__lock._is_owned():
867
        self.__lock.release()
868
      raise
869

    
870
    return acquired
871

    
872
  def release(self, names=None):
873
    """Release a set of resource locks, at the same level.
874

875
    You must have acquired the locks, either in shared or in exclusive mode,
876
    before releasing them.
877

878
    @param names: the names of the locks which shall be released
879
        (defaults to all the locks acquired at that level).
880

881
    """
882
    assert self._is_owned(), "release() on lock set while not owner"
883

    
884
    # Support passing in a single resource to release rather than many
885
    if isinstance(names, basestring):
886
      names = [names]
887

    
888
    if names is None:
889
      names = self._list_owned()
890
    else:
891
      names = set(names)
892
      assert self._list_owned().issuperset(names), (
893
               "release() on unheld resources %s" %
894
               names.difference(self._list_owned()))
895

    
896
    # First of all let's release the "all elements" lock, if set.
897
    # After this 'add' can work again
898
    if self.__lock._is_owned():
899
      self.__lock.release()
900
      self._del_owned()
901

    
902
    for lockname in names:
903
      # If we are sure the lock doesn't leave __lockdict without being
904
      # exclusively held we can do this...
905
      self.__lockdict[lockname].release()
906
      self._del_owned(name=lockname)
907

    
908
  def add(self, names, acquired=0, shared=0):
909
    """Add a new set of elements to the set
910

911
    @param names: names of the new elements to add
912
    @param acquired: pre-acquire the new resource?
913
    @param shared: is the pre-acquisition shared?
914

915
    """
916
    # Check we don't already own locks at this level
917
    assert not self._is_owned() or self.__lock._is_owned(shared=0), \
918
      "Cannot add locks if the set is only partially owned, or shared"
919

    
920
    # Support passing in a single resource to add rather than many
921
    if isinstance(names, basestring):
922
      names = [names]
923

    
924
    # If we don't already own the set-level lock acquired in an exclusive way
925
    # we'll get it and note we need to release it later.
926
    release_lock = False
927
    if not self.__lock._is_owned():
928
      release_lock = True
929
      self.__lock.acquire()
930

    
931
    try:
932
      invalid_names = set(self.__names()).intersection(names)
933
      if invalid_names:
934
        # This must be an explicit raise, not an assert, because assert is
935
        # turned off when using optimization, and this can happen because of
936
        # concurrency even if the user doesn't want it.
937
        raise errors.LockError("duplicate add() (%s)" % invalid_names)
938

    
939
      for lockname in names:
940
        lock = SharedLock()
941

    
942
        if acquired:
943
          lock.acquire(shared=shared)
944
          # now the lock cannot be deleted, we have it!
945
          try:
946
            self._add_owned(name=lockname)
947
          except:
948
            # We shouldn't have problems adding the lock to the owners list,
949
            # but if we did we'll try to release this lock and re-raise
950
            # exception.  Of course something is going to be really wrong,
951
            # after this.  On the other hand the lock hasn't been added to the
952
            # __lockdict yet so no other threads should be pending on it. This
953
            # release is just a safety measure.
954
            lock.release()
955
            raise
956

    
957
        self.__lockdict[lockname] = lock
958

    
959
    finally:
960
      # Only release __lock if we were not holding it previously.
961
      if release_lock:
962
        self.__lock.release()
963

    
964
    return True
965

    
966
  def remove(self, names):
967
    """Remove elements from the lock set.
968

969
    You can either not hold anything in the lockset or already hold a superset
970
    of the elements you want to delete, exclusively.
971

972
    @param names: names of the resource to remove.
973

974
    @return:: a list of locks which we removed; the list is always
975
        equal to the names list if we were holding all the locks
976
        exclusively
977

978
    """
979
    # Support passing in a single resource to remove rather than many
980
    if isinstance(names, basestring):
981
      names = [names]
982

    
983
    # If we own any subset of this lock it must be a superset of what we want
984
    # to delete. The ownership must also be exclusive, but that will be checked
985
    # by the lock itself.
986
    assert not self._is_owned() or self._list_owned().issuperset(names), (
987
      "remove() on acquired lockset while not owning all elements")
988

    
989
    removed = []
990

    
991
    for lname in names:
992
      # Calling delete() acquires the lock exclusively if we don't already own
993
      # it, and causes all pending and subsequent lock acquires to fail. It's
994
      # fine to call it out of order because delete() also implies release(),
995
      # and the assertion above guarantees that if we either already hold
996
      # everything we want to delete, or we hold none.
997
      try:
998
        self.__lockdict[lname].delete()
999
        removed.append(lname)
1000
      except (KeyError, errors.LockError):
1001
        # This cannot happen if we were already holding it, verify:
1002
        assert not self._is_owned(), "remove failed while holding lockset"
1003
      else:
1004
        # If no LockError was raised we are the ones who deleted the lock.
1005
        # This means we can safely remove it from lockdict, as any further or
1006
        # pending delete() or acquire() will fail (and nobody can have the lock
1007
        # since before our call to delete()).
1008
        #
1009
        # This is done in an else clause because if the exception was thrown
1010
        # it's the job of the one who actually deleted it.
1011
        del self.__lockdict[lname]
1012
        # And let's remove it from our private list if we owned it.
1013
        if self._is_owned():
1014
          self._del_owned(name=lname)
1015

    
1016
    return removed
1017

    
1018

    
1019
# Locking levels, must be acquired in increasing order.
1020
# Current rules are:
1021
#   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1022
#   acquired before performing any operation, either in shared or in exclusive
1023
#   mode. acquiring the BGL in exclusive mode is discouraged and should be
1024
#   avoided.
1025
#   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1026
#   If you need more than one node, or more than one instance, acquire them at
1027
#   the same time.
1028
LEVEL_CLUSTER = 0
1029
LEVEL_INSTANCE = 1
1030
LEVEL_NODE = 2
1031

    
1032
LEVELS = [LEVEL_CLUSTER,
1033
          LEVEL_INSTANCE,
1034
          LEVEL_NODE]
1035

    
1036
# Lock levels which are modifiable
1037
LEVELS_MOD = [LEVEL_NODE, LEVEL_INSTANCE]
1038

    
1039
LEVEL_NAMES = {
1040
  LEVEL_CLUSTER: "cluster",
1041
  LEVEL_INSTANCE: "instance",
1042
  LEVEL_NODE: "node",
1043
  }
1044

    
1045
# Constant for the big ganeti lock
1046
BGL = 'BGL'
1047

    
1048

    
1049
class GanetiLockManager:
1050
  """The Ganeti Locking Library
1051

1052
  The purpose of this small library is to manage locking for ganeti clusters
1053
  in a central place, while at the same time doing dynamic checks against
1054
  possible deadlocks. It will also make it easier to transition to a different
1055
  lock type should we migrate away from python threads.
1056

1057
  """
1058
  _instance = None
1059

    
1060
  def __init__(self, nodes=None, instances=None):
1061
    """Constructs a new GanetiLockManager object.
1062

1063
    There should be only a GanetiLockManager object at any time, so this
1064
    function raises an error if this is not the case.
1065

1066
    @param nodes: list of node names
1067
    @param instances: list of instance names
1068

1069
    """
1070
    assert self.__class__._instance is None, \
1071
           "double GanetiLockManager instance"
1072

    
1073
    self.__class__._instance = self
1074

    
1075
    # The keyring contains all the locks, at their level and in the correct
1076
    # locking order.
1077
    self.__keyring = {
1078
      LEVEL_CLUSTER: LockSet([BGL]),
1079
      LEVEL_NODE: LockSet(nodes),
1080
      LEVEL_INSTANCE: LockSet(instances),
1081
    }
1082

    
1083
  def _names(self, level):
1084
    """List the lock names at the given level.
1085

1086
    This can be used for debugging/testing purposes.
1087

1088
    @param level: the level whose list of locks to get
1089

1090
    """
1091
    assert level in LEVELS, "Invalid locking level %s" % level
1092
    return self.__keyring[level]._names()
1093

    
1094
  def _is_owned(self, level):
1095
    """Check whether we are owning locks at the given level
1096

1097
    """
1098
    return self.__keyring[level]._is_owned()
1099

    
1100
  is_owned = _is_owned
1101

    
1102
  def _list_owned(self, level):
1103
    """Get the set of owned locks at the given level
1104

1105
    """
1106
    return self.__keyring[level]._list_owned()
1107

    
1108
  def _upper_owned(self, level):
1109
    """Check that we don't own any lock at a level greater than the given one.
1110

1111
    """
1112
    # This way of checking only works if LEVELS[i] = i, which we check for in
1113
    # the test cases.
1114
    return utils.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1115

    
1116
  def _BGL_owned(self):
1117
    """Check if the current thread owns the BGL.
1118

1119
    Both an exclusive or a shared acquisition work.
1120

1121
    """
1122
    return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1123

    
1124
  def _contains_BGL(self, level, names):
1125
    """Check if the level contains the BGL.
1126

1127
    Check if acting on the given level and set of names will change
1128
    the status of the Big Ganeti Lock.
1129

1130
    """
1131
    return level == LEVEL_CLUSTER and (names is None or BGL in names)
1132

    
1133
  def acquire(self, level, names, timeout=None, shared=0):
1134
    """Acquire a set of resource locks, at the same level.
1135

1136
    @param level: the level at which the locks shall be acquired;
1137
        it must be a member of LEVELS.
1138
    @param names: the names of the locks which shall be acquired
1139
        (special lock names, or instance/node names)
1140
    @param shared: whether to acquire in shared mode; by default
1141
        an exclusive lock will be acquired
1142
    @type timeout: float
1143
    @param timeout: Maximum time to acquire all locks
1144

1145
    """
1146
    assert level in LEVELS, "Invalid locking level %s" % level
1147

    
1148
    # Check that we are either acquiring the Big Ganeti Lock or we already own
1149
    # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1150
    # so even if we've migrated we need to at least share the BGL to be
1151
    # compatible with them. Of course if we own the BGL exclusively there's no
1152
    # point in acquiring any other lock, unless perhaps we are half way through
1153
    # the migration of the current opcode.
1154
    assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1155
            "You must own the Big Ganeti Lock before acquiring any other")
1156

    
1157
    # Check we don't own locks at the same or upper levels.
1158
    assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1159
           " while owning some at a greater one")
1160

    
1161
    # Acquire the locks in the set.
1162
    return self.__keyring[level].acquire(names, shared=shared, timeout=timeout)
1163

    
1164
  def release(self, level, names=None):
1165
    """Release a set of resource locks, at the same level.
1166

1167
    You must have acquired the locks, either in shared or in exclusive
1168
    mode, before releasing them.
1169

1170
    @param level: the level at which the locks shall be released;
1171
        it must be a member of LEVELS
1172
    @param names: the names of the locks which shall be released
1173
        (defaults to all the locks acquired at that level)
1174

1175
    """
1176
    assert level in LEVELS, "Invalid locking level %s" % level
1177
    assert (not self._contains_BGL(level, names) or
1178
            not self._upper_owned(LEVEL_CLUSTER)), (
1179
            "Cannot release the Big Ganeti Lock while holding something"
1180
            " at upper levels")
1181

    
1182
    # Release will complain if we don't own the locks already
1183
    return self.__keyring[level].release(names)
1184

    
1185
  def add(self, level, names, acquired=0, shared=0):
1186
    """Add locks at the specified level.
1187

1188
    @param level: the level at which the locks shall be added;
1189
        it must be a member of LEVELS_MOD.
1190
    @param names: names of the locks to acquire
1191
    @param acquired: whether to acquire the newly added locks
1192
    @param shared: whether the acquisition will be shared
1193

1194
    """
1195
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1196
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1197
           " operations")
1198
    assert not self._upper_owned(level), ("Cannot add locks at a level"
1199
           " while owning some at a greater one")
1200
    return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1201

    
1202
  def remove(self, level, names):
1203
    """Remove locks from the specified level.
1204

1205
    You must either already own the locks you are trying to remove
1206
    exclusively or not own any lock at an upper level.
1207

1208
    @param level: the level at which the locks shall be removed;
1209
        it must be a member of LEVELS_MOD
1210
    @param names: the names of the locks which shall be removed
1211
        (special lock names, or instance/node names)
1212

1213
    """
1214
    assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1215
    assert self._BGL_owned(), ("You must own the BGL before performing other"
1216
           " operations")
1217
    # Check we either own the level or don't own anything from here
1218
    # up. LockSet.remove() will check the case in which we don't own
1219
    # all the needed resources, or we have a shared ownership.
1220
    assert self._is_owned(level) or not self._upper_owned(level), (
1221
           "Cannot remove locks at a level while not owning it or"
1222
           " owning some at a greater one")
1223
    return self.__keyring[level].remove(names)