Make _GenerateDRBD8Branch accept different VG names
[ganeti-local] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Module implementing the Ganeti locking code."""
22
23 # pylint: disable-msg=W0212
24
25 # W0212 since e.g. LockSet methods use (a lot) the internals of
26 # SharedLock
27
28 import os
29 import select
30 import threading
31 import errno
32 import weakref
33 import logging
34 import heapq
35 import operator
36 import itertools
37
38 from ganeti import errors
39 from ganeti import utils
40 from ganeti import compat
41 from ganeti import query
42
43
44 _EXCLUSIVE_TEXT = "exclusive"
45 _SHARED_TEXT = "shared"
46 _DELETED_TEXT = "deleted"
47
48 _DEFAULT_PRIORITY = 0
49
50
51 def ssynchronized(mylock, shared=0):
52   """Shared Synchronization decorator.
53
54   Calls the function holding the given lock, either in exclusive or shared
55   mode. It requires the passed lock to be a SharedLock (or support its
56   semantics).
57
58   @type mylock: lockable object or string
59   @param mylock: lock to acquire or class member name of the lock to acquire
60
61   """
62   def wrap(fn):
63     def sync_function(*args, **kwargs):
64       if isinstance(mylock, basestring):
65         assert args, "cannot ssynchronize on non-class method: self not found"
66         # args[0] is "self"
67         lock = getattr(args[0], mylock)
68       else:
69         lock = mylock
70       lock.acquire(shared=shared)
71       try:
72         return fn(*args, **kwargs)
73       finally:
74         lock.release()
75     return sync_function
76   return wrap
77
78
79 class _SingleNotifyPipeConditionWaiter(object):
80   """Helper class for SingleNotifyPipeCondition
81
82   """
83   __slots__ = [
84     "_fd",
85     "_poller",
86     ]
87
88   def __init__(self, poller, fd):
89     """Constructor for _SingleNotifyPipeConditionWaiter
90
91     @type poller: select.poll
92     @param poller: Poller object
93     @type fd: int
94     @param fd: File descriptor to wait for
95
96     """
97     object.__init__(self)
98     self._poller = poller
99     self._fd = fd
100
101   def __call__(self, timeout):
102     """Wait for something to happen on the pipe.
103
104     @type timeout: float or None
105     @param timeout: Timeout for waiting (can be None)
106
107     """
108     running_timeout = utils.RunningTimeout(timeout, True)
109
110     while True:
111       remaining_time = running_timeout.Remaining()
112
113       if remaining_time is not None:
114         if remaining_time < 0.0:
115           break
116
117         # Our calculation uses seconds, poll() wants milliseconds
118         remaining_time *= 1000
119
120       try:
121         result = self._poller.poll(remaining_time)
122       except EnvironmentError, err:
123         if err.errno != errno.EINTR:
124           raise
125         result = None
126
127       # Check whether we were notified
128       if result and result[0][0] == self._fd:
129         break
130
131
132 class _BaseCondition(object):
133   """Base class containing common code for conditions.
134
135   Some of this code is taken from python's threading module.
136
137   """
138   __slots__ = [
139     "_lock",
140     "acquire",
141     "release",
142     "_is_owned",
143     "_acquire_restore",
144     "_release_save",
145     ]
146
147   def __init__(self, lock):
148     """Constructor for _BaseCondition.
149
150     @type lock: threading.Lock
151     @param lock: condition base lock
152
153     """
154     object.__init__(self)
155
156     try:
157       self._release_save = lock._release_save
158     except AttributeError:
159       self._release_save = self._base_release_save
160     try:
161       self._acquire_restore = lock._acquire_restore
162     except AttributeError:
163       self._acquire_restore = self._base_acquire_restore
164     try:
165       self._is_owned = lock._is_owned
166     except AttributeError:
167       self._is_owned = self._base_is_owned
168
169     self._lock = lock
170
171     # Export the lock's acquire() and release() methods
172     self.acquire = lock.acquire
173     self.release = lock.release
174
175   def _base_is_owned(self):
176     """Check whether lock is owned by current thread.
177
178     """
179     if self._lock.acquire(0):
180       self._lock.release()
181       return False
182     return True
183
184   def _base_release_save(self):
185     self._lock.release()
186
187   def _base_acquire_restore(self, _):
188     self._lock.acquire()
189
190   def _check_owned(self):
191     """Raise an exception if the current thread doesn't own the lock.
192
193     """
194     if not self._is_owned():
195       raise RuntimeError("cannot work with un-aquired lock")
196
197
198 class SingleNotifyPipeCondition(_BaseCondition):
199   """Condition which can only be notified once.
200
201   This condition class uses pipes and poll, internally, to be able to wait for
202   notification with a timeout, without resorting to polling. It is almost
203   compatible with Python's threading.Condition, with the following differences:
204     - notifyAll can only be called once, and no wait can happen after that
205     - notify is not supported, only notifyAll
206
207   """
208
209   __slots__ = [
210     "_poller",
211     "_read_fd",
212     "_write_fd",
213     "_nwaiters",
214     "_notified",
215     ]
216
217   _waiter_class = _SingleNotifyPipeConditionWaiter
218
219   def __init__(self, lock):
220     """Constructor for SingleNotifyPipeCondition
221
222     """
223     _BaseCondition.__init__(self, lock)
224     self._nwaiters = 0
225     self._notified = False
226     self._read_fd = None
227     self._write_fd = None
228     self._poller = None
229
230   def _check_unnotified(self):
231     """Throws an exception if already notified.
232
233     """
234     if self._notified:
235       raise RuntimeError("cannot use already notified condition")
236
237   def _Cleanup(self):
238     """Cleanup open file descriptors, if any.
239
240     """
241     if self._read_fd is not None:
242       os.close(self._read_fd)
243       self._read_fd = None
244
245     if self._write_fd is not None:
246       os.close(self._write_fd)
247       self._write_fd = None
248     self._poller = None
249
250   def wait(self, timeout=None):
251     """Wait for a notification.
252
253     @type timeout: float or None
254     @param timeout: Waiting timeout (can be None)
255
256     """
257     self._check_owned()
258     self._check_unnotified()
259
260     self._nwaiters += 1
261     try:
262       if self._poller is None:
263         (self._read_fd, self._write_fd) = os.pipe()
264         self._poller = select.poll()
265         self._poller.register(self._read_fd, select.POLLHUP)
266
267       wait_fn = self._waiter_class(self._poller, self._read_fd)
268       state = self._release_save()
269       try:
270         # Wait for notification
271         wait_fn(timeout)
272       finally:
273         # Re-acquire lock
274         self._acquire_restore(state)
275     finally:
276       self._nwaiters -= 1
277       if self._nwaiters == 0:
278         self._Cleanup()
279
280   def notifyAll(self): # pylint: disable-msg=C0103
281     """Close the writing side of the pipe to notify all waiters.
282
283     """
284     self._check_owned()
285     self._check_unnotified()
286     self._notified = True
287     if self._write_fd is not None:
288       os.close(self._write_fd)
289       self._write_fd = None
290
291
292 class PipeCondition(_BaseCondition):
293   """Group-only non-polling condition with counters.
294
295   This condition class uses pipes and poll, internally, to be able to wait for
296   notification with a timeout, without resorting to polling. It is almost
297   compatible with Python's threading.Condition, but only supports notifyAll and
298   non-recursive locks. As an additional features it's able to report whether
299   there are any waiting threads.
300
301   """
302   __slots__ = [
303     "_waiters",
304     "_single_condition",
305     ]
306
307   _single_condition_class = SingleNotifyPipeCondition
308
309   def __init__(self, lock):
310     """Initializes this class.
311
312     """
313     _BaseCondition.__init__(self, lock)
314     self._waiters = set()
315     self._single_condition = self._single_condition_class(self._lock)
316
317   def wait(self, timeout=None):
318     """Wait for a notification.
319
320     @type timeout: float or None
321     @param timeout: Waiting timeout (can be None)
322
323     """
324     self._check_owned()
325
326     # Keep local reference to the pipe. It could be replaced by another thread
327     # notifying while we're waiting.
328     cond = self._single_condition
329
330     self._waiters.add(threading.currentThread())
331     try:
332       cond.wait(timeout)
333     finally:
334       self._check_owned()
335       self._waiters.remove(threading.currentThread())
336
337   def notifyAll(self): # pylint: disable-msg=C0103
338     """Notify all currently waiting threads.
339
340     """
341     self._check_owned()
342     self._single_condition.notifyAll()
343     self._single_condition = self._single_condition_class(self._lock)
344
345   def get_waiting(self):
346     """Returns a list of all waiting threads.
347
348     """
349     self._check_owned()
350
351     return self._waiters
352
353   def has_waiting(self):
354     """Returns whether there are active waiters.
355
356     """
357     self._check_owned()
358
359     return bool(self._waiters)
360
361
362 class _PipeConditionWithMode(PipeCondition):
363   __slots__ = [
364     "shared",
365     ]
366
367   def __init__(self, lock, shared):
368     """Initializes this class.
369
370     """
371     self.shared = shared
372     PipeCondition.__init__(self, lock)
373
374
375 class SharedLock(object):
376   """Implements a shared lock.
377
378   Multiple threads can acquire the lock in a shared way by calling
379   C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
380   threads can call C{acquire(shared=0)}.
381
382   Notes on data structures: C{__pending} contains a priority queue (heapq) of
383   all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
384   ...]}. Each per-priority queue contains a normal in-order list of conditions
385   to be notified when the lock can be acquired. Shared locks are grouped
386   together by priority and the condition for them is stored in
387   C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
388   references for the per-priority queues indexed by priority for faster access.
389
390   @type name: string
391   @ivar name: the name of the lock
392
393   """
394   __slots__ = [
395     "__weakref__",
396     "__deleted",
397     "__exc",
398     "__lock",
399     "__pending",
400     "__pending_by_prio",
401     "__pending_shared",
402     "__shr",
403     "name",
404     ]
405
406   __condition_class = _PipeConditionWithMode
407
408   def __init__(self, name, monitor=None):
409     """Construct a new SharedLock.
410
411     @param name: the name of the lock
412     @type monitor: L{LockMonitor}
413     @param monitor: Lock monitor with which to register
414
415     """
416     object.__init__(self)
417
418     self.name = name
419
420     # Internal lock
421     self.__lock = threading.Lock()
422
423     # Queue containing waiting acquires
424     self.__pending = []
425     self.__pending_by_prio = {}
426     self.__pending_shared = {}
427
428     # Current lock holders
429     self.__shr = set()
430     self.__exc = None
431
432     # is this lock in the deleted state?
433     self.__deleted = False
434
435     # Register with lock monitor
436     if monitor:
437       monitor.RegisterLock(self)
438
439   def GetInfo(self, requested):
440     """Retrieves information for querying locks.
441
442     @type requested: set
443     @param requested: Requested information, see C{query.LQ_*}
444
445     """
446     self.__lock.acquire()
447     try:
448       # Note: to avoid unintentional race conditions, no references to
449       # modifiable objects should be returned unless they were created in this
450       # function.
451       mode = None
452       owner_names = None
453
454       if query.LQ_MODE in requested:
455         if self.__deleted:
456           mode = _DELETED_TEXT
457           assert not (self.__exc or self.__shr)
458         elif self.__exc:
459           mode = _EXCLUSIVE_TEXT
460         elif self.__shr:
461           mode = _SHARED_TEXT
462
463       # Current owner(s) are wanted
464       if query.LQ_OWNER in requested:
465         if self.__exc:
466           owner = [self.__exc]
467         else:
468           owner = self.__shr
469
470         if owner:
471           assert not self.__deleted
472           owner_names = [i.getName() for i in owner]
473
474       # Pending acquires are wanted
475       if query.LQ_PENDING in requested:
476         pending = []
477
478         # Sorting instead of copying and using heaq functions for simplicity
479         for (_, prioqueue) in sorted(self.__pending):
480           for cond in prioqueue:
481             if cond.shared:
482               pendmode = _SHARED_TEXT
483             else:
484               pendmode = _EXCLUSIVE_TEXT
485
486             # List of names will be sorted in L{query._GetLockPending}
487             pending.append((pendmode, [i.getName()
488                                        for i in cond.get_waiting()]))
489       else:
490         pending = None
491
492       return (self.name, mode, owner_names, pending)
493     finally:
494       self.__lock.release()
495
496   def __check_deleted(self):
497     """Raises an exception if the lock has been deleted.
498
499     """
500     if self.__deleted:
501       raise errors.LockError("Deleted lock %s" % self.name)
502
503   def __is_sharer(self):
504     """Is the current thread sharing the lock at this time?
505
506     """
507     return threading.currentThread() in self.__shr
508
509   def __is_exclusive(self):
510     """Is the current thread holding the lock exclusively at this time?
511
512     """
513     return threading.currentThread() == self.__exc
514
515   def __is_owned(self, shared=-1):
516     """Is the current thread somehow owning the lock at this time?
517
518     This is a private version of the function, which presumes you're holding
519     the internal lock.
520
521     """
522     if shared < 0:
523       return self.__is_sharer() or self.__is_exclusive()
524     elif shared:
525       return self.__is_sharer()
526     else:
527       return self.__is_exclusive()
528
529   def _is_owned(self, shared=-1):
530     """Is the current thread somehow owning the lock at this time?
531
532     @param shared:
533         - < 0: check for any type of ownership (default)
534         - 0: check for exclusive ownership
535         - > 0: check for shared ownership
536
537     """
538     self.__lock.acquire()
539     try:
540       return self.__is_owned(shared=shared)
541     finally:
542       self.__lock.release()
543
544   def _count_pending(self):
545     """Returns the number of pending acquires.
546
547     @rtype: int
548
549     """
550     self.__lock.acquire()
551     try:
552       return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
553     finally:
554       self.__lock.release()
555
556   def _check_empty(self):
557     """Checks whether there are any pending acquires.
558
559     @rtype: bool
560
561     """
562     self.__lock.acquire()
563     try:
564       # Order is important: __find_first_pending_queue modifies __pending
565       return not (self.__find_first_pending_queue() or
566                   self.__pending or
567                   self.__pending_by_prio or
568                   self.__pending_shared)
569     finally:
570       self.__lock.release()
571
572   def __do_acquire(self, shared):
573     """Actually acquire the lock.
574
575     """
576     if shared:
577       self.__shr.add(threading.currentThread())
578     else:
579       self.__exc = threading.currentThread()
580
581   def __can_acquire(self, shared):
582     """Determine whether lock can be acquired.
583
584     """
585     if shared:
586       return self.__exc is None
587     else:
588       return len(self.__shr) == 0 and self.__exc is None
589
590   def __find_first_pending_queue(self):
591     """Tries to find the topmost queued entry with pending acquires.
592
593     Removes empty entries while going through the list.
594
595     """
596     while self.__pending:
597       (priority, prioqueue) = self.__pending[0]
598
599       if not prioqueue:
600         heapq.heappop(self.__pending)
601         del self.__pending_by_prio[priority]
602         assert priority not in self.__pending_shared
603         continue
604
605       if prioqueue:
606         return prioqueue
607
608     return None
609
610   def __is_on_top(self, cond):
611     """Checks whether the passed condition is on top of the queue.
612
613     The caller must make sure the queue isn't empty.
614
615     """
616     return cond == self.__find_first_pending_queue()[0]
617
618   def __acquire_unlocked(self, shared, timeout, priority):
619     """Acquire a shared lock.
620
621     @param shared: whether to acquire in shared mode; by default an
622         exclusive lock will be acquired
623     @param timeout: maximum waiting time before giving up
624     @type priority: integer
625     @param priority: Priority for acquiring lock
626
627     """
628     self.__check_deleted()
629
630     # We cannot acquire the lock if we already have it
631     assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
632                                    " %s" % self.name)
633
634     # Remove empty entries from queue
635     self.__find_first_pending_queue()
636
637     # Check whether someone else holds the lock or there are pending acquires.
638     if not self.__pending and self.__can_acquire(shared):
639       # Apparently not, can acquire lock directly.
640       self.__do_acquire(shared)
641       return True
642
643     prioqueue = self.__pending_by_prio.get(priority, None)
644
645     if shared:
646       # Try to re-use condition for shared acquire
647       wait_condition = self.__pending_shared.get(priority, None)
648       assert (wait_condition is None or
649               (wait_condition.shared and wait_condition in prioqueue))
650     else:
651       wait_condition = None
652
653     if wait_condition is None:
654       if prioqueue is None:
655         assert priority not in self.__pending_by_prio
656
657         prioqueue = []
658         heapq.heappush(self.__pending, (priority, prioqueue))
659         self.__pending_by_prio[priority] = prioqueue
660
661       wait_condition = self.__condition_class(self.__lock, shared)
662       prioqueue.append(wait_condition)
663
664       if shared:
665         # Keep reference for further shared acquires on same priority. This is
666         # better than trying to find it in the list of pending acquires.
667         assert priority not in self.__pending_shared
668         self.__pending_shared[priority] = wait_condition
669
670     try:
671       # Wait until we become the topmost acquire in the queue or the timeout
672       # expires.
673       # TODO: Decrease timeout with spurious notifications
674       while not (self.__is_on_top(wait_condition) and
675                  self.__can_acquire(shared)):
676         # Wait for notification
677         wait_condition.wait(timeout)
678         self.__check_deleted()
679
680         # A lot of code assumes blocking acquires always succeed. Loop
681         # internally for that case.
682         if timeout is not None:
683           break
684
685       if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
686         self.__do_acquire(shared)
687         return True
688     finally:
689       # Remove condition from queue if there are no more waiters
690       if not wait_condition.has_waiting():
691         prioqueue.remove(wait_condition)
692         if wait_condition.shared:
693           del self.__pending_shared[priority]
694
695     return False
696
697   def acquire(self, shared=0, timeout=None, priority=None,
698               test_notify=None):
699     """Acquire a shared lock.
700
701     @type shared: integer (0/1) used as a boolean
702     @param shared: whether to acquire in shared mode; by default an
703         exclusive lock will be acquired
704     @type timeout: float
705     @param timeout: maximum waiting time before giving up
706     @type priority: integer
707     @param priority: Priority for acquiring lock
708     @type test_notify: callable or None
709     @param test_notify: Special callback function for unittesting
710
711     """
712     if priority is None:
713       priority = _DEFAULT_PRIORITY
714
715     self.__lock.acquire()
716     try:
717       # We already got the lock, notify now
718       if __debug__ and callable(test_notify):
719         test_notify()
720
721       return self.__acquire_unlocked(shared, timeout, priority)
722     finally:
723       self.__lock.release()
724
725   def release(self):
726     """Release a Shared Lock.
727
728     You must have acquired the lock, either in shared or in exclusive mode,
729     before calling this function.
730
731     """
732     self.__lock.acquire()
733     try:
734       assert self.__is_exclusive() or self.__is_sharer(), \
735         "Cannot release non-owned lock"
736
737       # Autodetect release type
738       if self.__is_exclusive():
739         self.__exc = None
740       else:
741         self.__shr.remove(threading.currentThread())
742
743       # Notify topmost condition in queue
744       prioqueue = self.__find_first_pending_queue()
745       if prioqueue:
746         prioqueue[0].notifyAll()
747
748     finally:
749       self.__lock.release()
750
751   def delete(self, timeout=None, priority=None):
752     """Delete a Shared Lock.
753
754     This operation will declare the lock for removal. First the lock will be
755     acquired in exclusive mode if you don't already own it, then the lock
756     will be put in a state where any future and pending acquire() fail.
757
758     @type timeout: float
759     @param timeout: maximum waiting time before giving up
760     @type priority: integer
761     @param priority: Priority for acquiring lock
762
763     """
764     if priority is None:
765       priority = _DEFAULT_PRIORITY
766
767     self.__lock.acquire()
768     try:
769       assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
770
771       self.__check_deleted()
772
773       # The caller is allowed to hold the lock exclusively already.
774       acquired = self.__is_exclusive()
775
776       if not acquired:
777         acquired = self.__acquire_unlocked(0, timeout, priority)
778
779         assert self.__is_exclusive() and not self.__is_sharer(), \
780           "Lock wasn't acquired in exclusive mode"
781
782       if acquired:
783         self.__deleted = True
784         self.__exc = None
785
786         assert not (self.__exc or self.__shr), "Found owner during deletion"
787
788         # Notify all acquires. They'll throw an error.
789         for (_, prioqueue) in self.__pending:
790           for cond in prioqueue:
791             cond.notifyAll()
792
793         assert self.__deleted
794
795       return acquired
796     finally:
797       self.__lock.release()
798
799   def _release_save(self):
800     shared = self.__is_sharer()
801     self.release()
802     return shared
803
804   def _acquire_restore(self, shared):
805     self.acquire(shared=shared)
806
807
808 # Whenever we want to acquire a full LockSet we pass None as the value
809 # to acquire.  Hide this behind this nicely named constant.
810 ALL_SET = None
811
812
813 class _AcquireTimeout(Exception):
814   """Internal exception to abort an acquire on a timeout.
815
816   """
817
818
819 class LockSet:
820   """Implements a set of locks.
821
822   This abstraction implements a set of shared locks for the same resource type,
823   distinguished by name. The user can lock a subset of the resources and the
824   LockSet will take care of acquiring the locks always in the same order, thus
825   preventing deadlock.
826
827   All the locks needed in the same set must be acquired together, though.
828
829   @type name: string
830   @ivar name: the name of the lockset
831
832   """
833   def __init__(self, members, name, monitor=None):
834     """Constructs a new LockSet.
835
836     @type members: list of strings
837     @param members: initial members of the set
838     @type monitor: L{LockMonitor}
839     @param monitor: Lock monitor with which to register member locks
840
841     """
842     assert members is not None, "members parameter is not a list"
843     self.name = name
844
845     # Lock monitor
846     self.__monitor = monitor
847
848     # Used internally to guarantee coherency.
849     self.__lock = SharedLock(name)
850
851     # The lockdict indexes the relationship name -> lock
852     # The order-of-locking is implied by the alphabetical order of names
853     self.__lockdict = {}
854
855     for mname in members:
856       self.__lockdict[mname] = SharedLock(self._GetLockName(mname),
857                                           monitor=monitor)
858
859     # The owner dict contains the set of locks each thread owns. For
860     # performance each thread can access its own key without a global lock on
861     # this structure. It is paramount though that *no* other type of access is
862     # done to this structure (eg. no looping over its keys). *_owner helper
863     # function are defined to guarantee access is correct, but in general never
864     # do anything different than __owners[threading.currentThread()], or there
865     # will be trouble.
866     self.__owners = {}
867
868   def _GetLockName(self, mname):
869     """Returns the name for a member lock.
870
871     """
872     return "%s/%s" % (self.name, mname)
873
874   def _is_owned(self):
875     """Is the current thread a current level owner?"""
876     return threading.currentThread() in self.__owners
877
878   def _add_owned(self, name=None):
879     """Note the current thread owns the given lock"""
880     if name is None:
881       if not self._is_owned():
882         self.__owners[threading.currentThread()] = set()
883     else:
884       if self._is_owned():
885         self.__owners[threading.currentThread()].add(name)
886       else:
887         self.__owners[threading.currentThread()] = set([name])
888
889   def _del_owned(self, name=None):
890     """Note the current thread owns the given lock"""
891
892     assert not (name is None and self.__lock._is_owned()), \
893            "Cannot hold internal lock when deleting owner status"
894
895     if name is not None:
896       self.__owners[threading.currentThread()].remove(name)
897
898     # Only remove the key if we don't hold the set-lock as well
899     if (not self.__lock._is_owned() and
900         not self.__owners[threading.currentThread()]):
901       del self.__owners[threading.currentThread()]
902
903   def _list_owned(self):
904     """Get the set of resource names owned by the current thread"""
905     if self._is_owned():
906       return self.__owners[threading.currentThread()].copy()
907     else:
908       return set()
909
910   def _release_and_delete_owned(self):
911     """Release and delete all resources owned by the current thread"""
912     for lname in self._list_owned():
913       lock = self.__lockdict[lname]
914       if lock._is_owned():
915         lock.release()
916       self._del_owned(name=lname)
917
918   def __names(self):
919     """Return the current set of names.
920
921     Only call this function while holding __lock and don't iterate on the
922     result after releasing the lock.
923
924     """
925     return self.__lockdict.keys()
926
927   def _names(self):
928     """Return a copy of the current set of elements.
929
930     Used only for debugging purposes.
931
932     """
933     # If we don't already own the set-level lock acquired
934     # we'll get it and note we need to release it later.
935     release_lock = False
936     if not self.__lock._is_owned():
937       release_lock = True
938       self.__lock.acquire(shared=1)
939     try:
940       result = self.__names()
941     finally:
942       if release_lock:
943         self.__lock.release()
944     return set(result)
945
946   def acquire(self, names, timeout=None, shared=0, priority=None,
947               test_notify=None):
948     """Acquire a set of resource locks.
949
950     @type names: list of strings (or string)
951     @param names: the names of the locks which shall be acquired
952         (special lock names, or instance/node names)
953     @type shared: integer (0/1) used as a boolean
954     @param shared: whether to acquire in shared mode; by default an
955         exclusive lock will be acquired
956     @type timeout: float or None
957     @param timeout: Maximum time to acquire all locks
958     @type priority: integer
959     @param priority: Priority for acquiring locks
960     @type test_notify: callable or None
961     @param test_notify: Special callback function for unittesting
962
963     @return: Set of all locks successfully acquired or None in case of timeout
964
965     @raise errors.LockError: when any lock we try to acquire has
966         been deleted before we succeed. In this case none of the
967         locks requested will be acquired.
968
969     """
970     assert timeout is None or timeout >= 0.0
971
972     # Check we don't already own locks at this level
973     assert not self._is_owned(), ("Cannot acquire locks in the same set twice"
974                                   " (lockset %s)" % self.name)
975
976     if priority is None:
977       priority = _DEFAULT_PRIORITY
978
979     # We need to keep track of how long we spent waiting for a lock. The
980     # timeout passed to this function is over all lock acquires.
981     running_timeout = utils.RunningTimeout(timeout, False)
982
983     try:
984       if names is not None:
985         # Support passing in a single resource to acquire rather than many
986         if isinstance(names, basestring):
987           names = [names]
988
989         return self.__acquire_inner(names, False, shared, priority,
990                                     running_timeout.Remaining, test_notify)
991
992       else:
993         # If no names are given acquire the whole set by not letting new names
994         # being added before we release, and getting the current list of names.
995         # Some of them may then be deleted later, but we'll cope with this.
996         #
997         # We'd like to acquire this lock in a shared way, as it's nice if
998         # everybody else can use the instances at the same time. If we are
999         # acquiring them exclusively though they won't be able to do this
1000         # anyway, though, so we'll get the list lock exclusively as well in
1001         # order to be able to do add() on the set while owning it.
1002         if not self.__lock.acquire(shared=shared, priority=priority,
1003                                    timeout=running_timeout.Remaining()):
1004           raise _AcquireTimeout()
1005         try:
1006           # note we own the set-lock
1007           self._add_owned()
1008
1009           return self.__acquire_inner(self.__names(), True, shared, priority,
1010                                       running_timeout.Remaining, test_notify)
1011         except:
1012           # We shouldn't have problems adding the lock to the owners list, but
1013           # if we did we'll try to release this lock and re-raise exception.
1014           # Of course something is going to be really wrong, after this.
1015           self.__lock.release()
1016           self._del_owned()
1017           raise
1018
1019     except _AcquireTimeout:
1020       return None
1021
1022   def __acquire_inner(self, names, want_all, shared, priority,
1023                       timeout_fn, test_notify):
1024     """Inner logic for acquiring a number of locks.
1025
1026     @param names: Names of the locks to be acquired
1027     @param want_all: Whether all locks in the set should be acquired
1028     @param shared: Whether to acquire in shared mode
1029     @param timeout_fn: Function returning remaining timeout
1030     @param priority: Priority for acquiring locks
1031     @param test_notify: Special callback function for unittesting
1032
1033     """
1034     acquire_list = []
1035
1036     # First we look the locks up on __lockdict. We have no way of being sure
1037     # they will still be there after, but this makes it a lot faster should
1038     # just one of them be the already wrong. Using a sorted sequence to prevent
1039     # deadlocks.
1040     for lname in sorted(utils.UniqueSequence(names)):
1041       try:
1042         lock = self.__lockdict[lname] # raises KeyError if lock is not there
1043       except KeyError:
1044         if want_all:
1045           # We are acquiring all the set, it doesn't matter if this particular
1046           # element is not there anymore.
1047           continue
1048
1049         raise errors.LockError("Non-existing lock %s in set %s (it may have"
1050                                " been removed)" % (lname, self.name))
1051
1052       acquire_list.append((lname, lock))
1053
1054     # This will hold the locknames we effectively acquired.
1055     acquired = set()
1056
1057     try:
1058       # Now acquire_list contains a sorted list of resources and locks we
1059       # want.  In order to get them we loop on this (private) list and
1060       # acquire() them.  We gave no real guarantee they will still exist till
1061       # this is done but .acquire() itself is safe and will alert us if the
1062       # lock gets deleted.
1063       for (lname, lock) in acquire_list:
1064         if __debug__ and callable(test_notify):
1065           test_notify_fn = lambda: test_notify(lname)
1066         else:
1067           test_notify_fn = None
1068
1069         timeout = timeout_fn()
1070
1071         try:
1072           # raises LockError if the lock was deleted
1073           acq_success = lock.acquire(shared=shared, timeout=timeout,
1074                                      priority=priority,
1075                                      test_notify=test_notify_fn)
1076         except errors.LockError:
1077           if want_all:
1078             # We are acquiring all the set, it doesn't matter if this
1079             # particular element is not there anymore.
1080             continue
1081
1082           raise errors.LockError("Non-existing lock %s in set %s (it may"
1083                                  " have been removed)" % (lname, self.name))
1084
1085         if not acq_success:
1086           # Couldn't get lock or timeout occurred
1087           if timeout is None:
1088             # This shouldn't happen as SharedLock.acquire(timeout=None) is
1089             # blocking.
1090             raise errors.LockError("Failed to get lock %s (set %s)" %
1091                                    (lname, self.name))
1092
1093           raise _AcquireTimeout()
1094
1095         try:
1096           # now the lock cannot be deleted, we have it!
1097           self._add_owned(name=lname)
1098           acquired.add(lname)
1099
1100         except:
1101           # We shouldn't have problems adding the lock to the owners list, but
1102           # if we did we'll try to release this lock and re-raise exception.
1103           # Of course something is going to be really wrong after this.
1104           if lock._is_owned():
1105             lock.release()
1106           raise
1107
1108     except:
1109       # Release all owned locks
1110       self._release_and_delete_owned()
1111       raise
1112
1113     return acquired
1114
1115   def release(self, names=None):
1116     """Release a set of resource locks, at the same level.
1117
1118     You must have acquired the locks, either in shared or in exclusive mode,
1119     before releasing them.
1120
1121     @type names: list of strings, or None
1122     @param names: the names of the locks which shall be released
1123         (defaults to all the locks acquired at that level).
1124
1125     """
1126     assert self._is_owned(), ("release() on lock set %s while not owner" %
1127                               self.name)
1128
1129     # Support passing in a single resource to release rather than many
1130     if isinstance(names, basestring):
1131       names = [names]
1132
1133     if names is None:
1134       names = self._list_owned()
1135     else:
1136       names = set(names)
1137       assert self._list_owned().issuperset(names), (
1138                "release() on unheld resources %s (set %s)" %
1139                (names.difference(self._list_owned()), self.name))
1140
1141     # First of all let's release the "all elements" lock, if set.
1142     # After this 'add' can work again
1143     if self.__lock._is_owned():
1144       self.__lock.release()
1145       self._del_owned()
1146
1147     for lockname in names:
1148       # If we are sure the lock doesn't leave __lockdict without being
1149       # exclusively held we can do this...
1150       self.__lockdict[lockname].release()
1151       self._del_owned(name=lockname)
1152
1153   def add(self, names, acquired=0, shared=0):
1154     """Add a new set of elements to the set
1155
1156     @type names: list of strings
1157     @param names: names of the new elements to add
1158     @type acquired: integer (0/1) used as a boolean
1159     @param acquired: pre-acquire the new resource?
1160     @type shared: integer (0/1) used as a boolean
1161     @param shared: is the pre-acquisition shared?
1162
1163     """
1164     # Check we don't already own locks at this level
1165     assert not self._is_owned() or self.__lock._is_owned(shared=0), \
1166       ("Cannot add locks if the set %s is only partially owned, or shared" %
1167        self.name)
1168
1169     # Support passing in a single resource to add rather than many
1170     if isinstance(names, basestring):
1171       names = [names]
1172
1173     # If we don't already own the set-level lock acquired in an exclusive way
1174     # we'll get it and note we need to release it later.
1175     release_lock = False
1176     if not self.__lock._is_owned():
1177       release_lock = True
1178       self.__lock.acquire()
1179
1180     try:
1181       invalid_names = set(self.__names()).intersection(names)
1182       if invalid_names:
1183         # This must be an explicit raise, not an assert, because assert is
1184         # turned off when using optimization, and this can happen because of
1185         # concurrency even if the user doesn't want it.
1186         raise errors.LockError("duplicate add(%s) on lockset %s" %
1187                                (invalid_names, self.name))
1188
1189       for lockname in names:
1190         lock = SharedLock(self._GetLockName(lockname), monitor=self.__monitor)
1191
1192         if acquired:
1193           # No need for priority or timeout here as this lock has just been
1194           # created
1195           lock.acquire(shared=shared)
1196           # now the lock cannot be deleted, we have it!
1197           try:
1198             self._add_owned(name=lockname)
1199           except:
1200             # We shouldn't have problems adding the lock to the owners list,
1201             # but if we did we'll try to release this lock and re-raise
1202             # exception.  Of course something is going to be really wrong,
1203             # after this.  On the other hand the lock hasn't been added to the
1204             # __lockdict yet so no other threads should be pending on it. This
1205             # release is just a safety measure.
1206             lock.release()
1207             raise
1208
1209         self.__lockdict[lockname] = lock
1210
1211     finally:
1212       # Only release __lock if we were not holding it previously.
1213       if release_lock:
1214         self.__lock.release()
1215
1216     return True
1217
1218   def remove(self, names):
1219     """Remove elements from the lock set.
1220
1221     You can either not hold anything in the lockset or already hold a superset
1222     of the elements you want to delete, exclusively.
1223
1224     @type names: list of strings
1225     @param names: names of the resource to remove.
1226
1227     @return: a list of locks which we removed; the list is always
1228         equal to the names list if we were holding all the locks
1229         exclusively
1230
1231     """
1232     # Support passing in a single resource to remove rather than many
1233     if isinstance(names, basestring):
1234       names = [names]
1235
1236     # If we own any subset of this lock it must be a superset of what we want
1237     # to delete. The ownership must also be exclusive, but that will be checked
1238     # by the lock itself.
1239     assert not self._is_owned() or self._list_owned().issuperset(names), (
1240       "remove() on acquired lockset %s while not owning all elements" %
1241       self.name)
1242
1243     removed = []
1244
1245     for lname in names:
1246       # Calling delete() acquires the lock exclusively if we don't already own
1247       # it, and causes all pending and subsequent lock acquires to fail. It's
1248       # fine to call it out of order because delete() also implies release(),
1249       # and the assertion above guarantees that if we either already hold
1250       # everything we want to delete, or we hold none.
1251       try:
1252         self.__lockdict[lname].delete()
1253         removed.append(lname)
1254       except (KeyError, errors.LockError):
1255         # This cannot happen if we were already holding it, verify:
1256         assert not self._is_owned(), ("remove failed while holding lockset %s"
1257                                       % self.name)
1258       else:
1259         # If no LockError was raised we are the ones who deleted the lock.
1260         # This means we can safely remove it from lockdict, as any further or
1261         # pending delete() or acquire() will fail (and nobody can have the lock
1262         # since before our call to delete()).
1263         #
1264         # This is done in an else clause because if the exception was thrown
1265         # it's the job of the one who actually deleted it.
1266         del self.__lockdict[lname]
1267         # And let's remove it from our private list if we owned it.
1268         if self._is_owned():
1269           self._del_owned(name=lname)
1270
1271     return removed
1272
1273
1274 # Locking levels, must be acquired in increasing order.
1275 # Current rules are:
1276 #   - at level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
1277 #   acquired before performing any operation, either in shared or in exclusive
1278 #   mode. acquiring the BGL in exclusive mode is discouraged and should be
1279 #   avoided.
1280 #   - at levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks.
1281 #   If you need more than one node, or more than one instance, acquire them at
1282 #   the same time.
1283 LEVEL_CLUSTER = 0
1284 LEVEL_INSTANCE = 1
1285 LEVEL_NODEGROUP = 2
1286 LEVEL_NODE = 3
1287
1288 LEVELS = [LEVEL_CLUSTER,
1289           LEVEL_INSTANCE,
1290           LEVEL_NODEGROUP,
1291           LEVEL_NODE]
1292
1293 # Lock levels which are modifiable
1294 LEVELS_MOD = [LEVEL_NODE, LEVEL_NODEGROUP, LEVEL_INSTANCE]
1295
1296 LEVEL_NAMES = {
1297   LEVEL_CLUSTER: "cluster",
1298   LEVEL_INSTANCE: "instance",
1299   LEVEL_NODEGROUP: "nodegroup",
1300   LEVEL_NODE: "node",
1301   }
1302
1303 # Constant for the big ganeti lock
1304 BGL = 'BGL'
1305
1306
1307 class GanetiLockManager:
1308   """The Ganeti Locking Library
1309
1310   The purpose of this small library is to manage locking for ganeti clusters
1311   in a central place, while at the same time doing dynamic checks against
1312   possible deadlocks. It will also make it easier to transition to a different
1313   lock type should we migrate away from python threads.
1314
1315   """
1316   _instance = None
1317
1318   def __init__(self, nodes, nodegroups, instances):
1319     """Constructs a new GanetiLockManager object.
1320
1321     There should be only a GanetiLockManager object at any time, so this
1322     function raises an error if this is not the case.
1323
1324     @param nodes: list of node names
1325     @param nodegroups: list of nodegroup uuids
1326     @param instances: list of instance names
1327
1328     """
1329     assert self.__class__._instance is None, \
1330            "double GanetiLockManager instance"
1331
1332     self.__class__._instance = self
1333
1334     self._monitor = LockMonitor()
1335
1336     # The keyring contains all the locks, at their level and in the correct
1337     # locking order.
1338     self.__keyring = {
1339       LEVEL_CLUSTER: LockSet([BGL], "BGL", monitor=self._monitor),
1340       LEVEL_NODE: LockSet(nodes, "nodes", monitor=self._monitor),
1341       LEVEL_NODEGROUP: LockSet(nodegroups, "nodegroups", monitor=self._monitor),
1342       LEVEL_INSTANCE: LockSet(instances, "instances",
1343                               monitor=self._monitor),
1344       }
1345
1346   def QueryLocks(self, fields):
1347     """Queries information from all locks.
1348
1349     See L{LockMonitor.QueryLocks}.
1350
1351     """
1352     return self._monitor.QueryLocks(fields)
1353
1354   def OldStyleQueryLocks(self, fields):
1355     """Queries information from all locks, returning old-style data.
1356
1357     See L{LockMonitor.OldStyleQueryLocks}.
1358
1359     """
1360     return self._monitor.OldStyleQueryLocks(fields)
1361
1362   def _names(self, level):
1363     """List the lock names at the given level.
1364
1365     This can be used for debugging/testing purposes.
1366
1367     @param level: the level whose list of locks to get
1368
1369     """
1370     assert level in LEVELS, "Invalid locking level %s" % level
1371     return self.__keyring[level]._names()
1372
1373   def _is_owned(self, level):
1374     """Check whether we are owning locks at the given level
1375
1376     """
1377     return self.__keyring[level]._is_owned()
1378
1379   is_owned = _is_owned
1380
1381   def _list_owned(self, level):
1382     """Get the set of owned locks at the given level
1383
1384     """
1385     return self.__keyring[level]._list_owned()
1386
1387   def _upper_owned(self, level):
1388     """Check that we don't own any lock at a level greater than the given one.
1389
1390     """
1391     # This way of checking only works if LEVELS[i] = i, which we check for in
1392     # the test cases.
1393     return compat.any((self._is_owned(l) for l in LEVELS[level + 1:]))
1394
1395   def _BGL_owned(self): # pylint: disable-msg=C0103
1396     """Check if the current thread owns the BGL.
1397
1398     Both an exclusive or a shared acquisition work.
1399
1400     """
1401     return BGL in self.__keyring[LEVEL_CLUSTER]._list_owned()
1402
1403   @staticmethod
1404   def _contains_BGL(level, names): # pylint: disable-msg=C0103
1405     """Check if the level contains the BGL.
1406
1407     Check if acting on the given level and set of names will change
1408     the status of the Big Ganeti Lock.
1409
1410     """
1411     return level == LEVEL_CLUSTER and (names is None or BGL in names)
1412
1413   def acquire(self, level, names, timeout=None, shared=0, priority=None):
1414     """Acquire a set of resource locks, at the same level.
1415
1416     @type level: member of locking.LEVELS
1417     @param level: the level at which the locks shall be acquired
1418     @type names: list of strings (or string)
1419     @param names: the names of the locks which shall be acquired
1420         (special lock names, or instance/node names)
1421     @type shared: integer (0/1) used as a boolean
1422     @param shared: whether to acquire in shared mode; by default
1423         an exclusive lock will be acquired
1424     @type timeout: float
1425     @param timeout: Maximum time to acquire all locks
1426     @type priority: integer
1427     @param priority: Priority for acquiring lock
1428
1429     """
1430     assert level in LEVELS, "Invalid locking level %s" % level
1431
1432     # Check that we are either acquiring the Big Ganeti Lock or we already own
1433     # it. Some "legacy" opcodes need to be sure they are run non-concurrently
1434     # so even if we've migrated we need to at least share the BGL to be
1435     # compatible with them. Of course if we own the BGL exclusively there's no
1436     # point in acquiring any other lock, unless perhaps we are half way through
1437     # the migration of the current opcode.
1438     assert (self._contains_BGL(level, names) or self._BGL_owned()), (
1439             "You must own the Big Ganeti Lock before acquiring any other")
1440
1441     # Check we don't own locks at the same or upper levels.
1442     assert not self._upper_owned(level), ("Cannot acquire locks at a level"
1443            " while owning some at a greater one")
1444
1445     # Acquire the locks in the set.
1446     return self.__keyring[level].acquire(names, shared=shared, timeout=timeout,
1447                                          priority=priority)
1448
1449   def release(self, level, names=None):
1450     """Release a set of resource locks, at the same level.
1451
1452     You must have acquired the locks, either in shared or in exclusive
1453     mode, before releasing them.
1454
1455     @type level: member of locking.LEVELS
1456     @param level: the level at which the locks shall be released
1457     @type names: list of strings, or None
1458     @param names: the names of the locks which shall be released
1459         (defaults to all the locks acquired at that level)
1460
1461     """
1462     assert level in LEVELS, "Invalid locking level %s" % level
1463     assert (not self._contains_BGL(level, names) or
1464             not self._upper_owned(LEVEL_CLUSTER)), (
1465             "Cannot release the Big Ganeti Lock while holding something"
1466             " at upper levels (%r)" %
1467             (utils.CommaJoin(["%s=%r" % (LEVEL_NAMES[i], self._list_owned(i))
1468                               for i in self.__keyring.keys()]), ))
1469
1470     # Release will complain if we don't own the locks already
1471     return self.__keyring[level].release(names)
1472
1473   def add(self, level, names, acquired=0, shared=0):
1474     """Add locks at the specified level.
1475
1476     @type level: member of locking.LEVELS_MOD
1477     @param level: the level at which the locks shall be added
1478     @type names: list of strings
1479     @param names: names of the locks to acquire
1480     @type acquired: integer (0/1) used as a boolean
1481     @param acquired: whether to acquire the newly added locks
1482     @type shared: integer (0/1) used as a boolean
1483     @param shared: whether the acquisition will be shared
1484
1485     """
1486     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1487     assert self._BGL_owned(), ("You must own the BGL before performing other"
1488            " operations")
1489     assert not self._upper_owned(level), ("Cannot add locks at a level"
1490            " while owning some at a greater one")
1491     return self.__keyring[level].add(names, acquired=acquired, shared=shared)
1492
1493   def remove(self, level, names):
1494     """Remove locks from the specified level.
1495
1496     You must either already own the locks you are trying to remove
1497     exclusively or not own any lock at an upper level.
1498
1499     @type level: member of locking.LEVELS_MOD
1500     @param level: the level at which the locks shall be removed
1501     @type names: list of strings
1502     @param names: the names of the locks which shall be removed
1503         (special lock names, or instance/node names)
1504
1505     """
1506     assert level in LEVELS_MOD, "Invalid or immutable level %s" % level
1507     assert self._BGL_owned(), ("You must own the BGL before performing other"
1508            " operations")
1509     # Check we either own the level or don't own anything from here
1510     # up. LockSet.remove() will check the case in which we don't own
1511     # all the needed resources, or we have a shared ownership.
1512     assert self._is_owned(level) or not self._upper_owned(level), (
1513            "Cannot remove locks at a level while not owning it or"
1514            " owning some at a greater one")
1515     return self.__keyring[level].remove(names)
1516
1517
1518 def _MonitorSortKey((num, item)):
1519   """Sorting key function.
1520
1521   Sort by name, then by incoming order.
1522
1523   """
1524   (name, _, _, _) = item
1525
1526   return (utils.NiceSortKey(name), num)
1527
1528
1529 class LockMonitor(object):
1530   _LOCK_ATTR = "_lock"
1531
1532   def __init__(self):
1533     """Initializes this class.
1534
1535     """
1536     self._lock = SharedLock("LockMonitor")
1537
1538     # Counter for stable sorting
1539     self._counter = itertools.count(0)
1540
1541     # Tracked locks. Weak references are used to avoid issues with circular
1542     # references and deletion.
1543     self._locks = weakref.WeakKeyDictionary()
1544
1545   @ssynchronized(_LOCK_ATTR)
1546   def RegisterLock(self, lock):
1547     """Registers a new lock.
1548
1549     """
1550     logging.debug("Registering lock %s", lock.name)
1551     assert lock not in self._locks, "Duplicate lock registration"
1552
1553     # There used to be a check for duplicate names here. As it turned out, when
1554     # a lock is re-created with the same name in a very short timeframe, the
1555     # previous instance might not yet be removed from the weakref dictionary.
1556     # By keeping track of the order of incoming registrations, a stable sort
1557     # ordering can still be guaranteed.
1558
1559     self._locks[lock] = self._counter.next()
1560
1561   @ssynchronized(_LOCK_ATTR)
1562   def _GetLockInfo(self, requested):
1563     """Get information from all locks while the monitor lock is held.
1564
1565     """
1566     return [(num, lock.GetInfo(requested)) for lock, num in self._locks.items()]
1567
1568   def _Query(self, fields):
1569     """Queries information from all locks.
1570
1571     @type fields: list of strings
1572     @param fields: List of fields to return
1573
1574     """
1575     qobj = query.Query(query.LOCK_FIELDS, fields)
1576
1577     # Get all data with internal lock held and then sort by name and incoming
1578     # order
1579     lockinfo = sorted(self._GetLockInfo(qobj.RequestedData()),
1580                       key=_MonitorSortKey)
1581
1582     # Extract lock information and build query data
1583     return (qobj, query.LockQueryData(map(operator.itemgetter(1), lockinfo)))
1584
1585   def QueryLocks(self, fields):
1586     """Queries information from all locks.
1587
1588     @type fields: list of strings
1589     @param fields: List of fields to return
1590
1591     """
1592     (qobj, ctx) = self._Query(fields)
1593
1594     # Prepare query response
1595     return query.GetQueryResponse(qobj, ctx)
1596
1597   def OldStyleQueryLocks(self, fields):
1598     """Queries information from all locks, returning old-style data.
1599
1600     @type fields: list of strings
1601     @param fields: List of fields to return
1602
1603     """
1604     (qobj, ctx) = self._Query(fields)
1605
1606     return qobj.OldStyleQuery(ctx)