Move _TimeoutExpired to utils
[ganeti-local] / lib / utils / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti utility module.
23
24 This module holds functions that can be used in both daemons (all) and
25 the command line scripts.
26
27 """
28
29 # Allow wildcard import in pylint: disable=W0401
30
31 import os
32 import re
33 import errno
34 import pwd
35 import time
36 import itertools
37 import select
38 import logging
39 import signal
40
41 from ganeti import errors
42 from ganeti import constants
43 from ganeti import compat
44
45 from ganeti.utils.algo import *
46 from ganeti.utils.filelock import *
47 from ganeti.utils.hash import *
48 from ganeti.utils.io import *
49 from ganeti.utils.log import *
50 from ganeti.utils.mlock import *
51 from ganeti.utils.nodesetup import *
52 from ganeti.utils.process import *
53 from ganeti.utils.retry import *
54 from ganeti.utils.text import *
55 from ganeti.utils.wrapper import *
56 from ganeti.utils.x509 import *
57
58
59 _VALID_SERVICE_NAME_RE = re.compile("^[-_.a-zA-Z0-9]{1,128}$")
60
61 UUID_RE = re.compile("^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-"
62                      "[a-f0-9]{4}-[a-f0-9]{12}$")
63
64
65 def ForceDictType(target, key_types, allowed_values=None):
66   """Force the values of a dict to have certain types.
67
68   @type target: dict
69   @param target: the dict to update
70   @type key_types: dict
71   @param key_types: dict mapping target dict keys to types
72                     in constants.ENFORCEABLE_TYPES
73   @type allowed_values: list
74   @keyword allowed_values: list of specially allowed values
75
76   """
77   if allowed_values is None:
78     allowed_values = []
79
80   if not isinstance(target, dict):
81     msg = "Expected dictionary, got '%s'" % target
82     raise errors.TypeEnforcementError(msg)
83
84   for key in target:
85     if key not in key_types:
86       msg = "Unknown parameter '%s'" % key
87       raise errors.TypeEnforcementError(msg)
88
89     if target[key] in allowed_values:
90       continue
91
92     ktype = key_types[key]
93     if ktype not in constants.ENFORCEABLE_TYPES:
94       msg = "'%s' has non-enforceable type %s" % (key, ktype)
95       raise errors.ProgrammerError(msg)
96
97     if ktype in (constants.VTYPE_STRING, constants.VTYPE_MAYBE_STRING):
98       if target[key] is None and ktype == constants.VTYPE_MAYBE_STRING:
99         pass
100       elif not isinstance(target[key], basestring):
101         if isinstance(target[key], bool) and not target[key]:
102           target[key] = ""
103         else:
104           msg = "'%s' (value %s) is not a valid string" % (key, target[key])
105           raise errors.TypeEnforcementError(msg)
106     elif ktype == constants.VTYPE_BOOL:
107       if isinstance(target[key], basestring) and target[key]:
108         if target[key].lower() == constants.VALUE_FALSE:
109           target[key] = False
110         elif target[key].lower() == constants.VALUE_TRUE:
111           target[key] = True
112         else:
113           msg = "'%s' (value %s) is not a valid boolean" % (key, target[key])
114           raise errors.TypeEnforcementError(msg)
115       elif target[key]:
116         target[key] = True
117       else:
118         target[key] = False
119     elif ktype == constants.VTYPE_SIZE:
120       try:
121         target[key] = ParseUnit(target[key])
122       except errors.UnitParseError, err:
123         msg = "'%s' (value %s) is not a valid size. error: %s" % \
124               (key, target[key], err)
125         raise errors.TypeEnforcementError(msg)
126     elif ktype == constants.VTYPE_INT:
127       try:
128         target[key] = int(target[key])
129       except (ValueError, TypeError):
130         msg = "'%s' (value %s) is not a valid integer" % (key, target[key])
131         raise errors.TypeEnforcementError(msg)
132
133
134 def ValidateServiceName(name):
135   """Validate the given service name.
136
137   @type name: number or string
138   @param name: Service name or port specification
139
140   """
141   try:
142     numport = int(name)
143   except (ValueError, TypeError):
144     # Non-numeric service name
145     valid = _VALID_SERVICE_NAME_RE.match(name)
146   else:
147     # Numeric port (protocols other than TCP or UDP might need adjustments
148     # here)
149     valid = (numport >= 0 and numport < (1 << 16))
150
151   if not valid:
152     raise errors.OpPrereqError("Invalid service name '%s'" % name,
153                                errors.ECODE_INVAL)
154
155   return name
156
157
158 def ListVolumeGroups():
159   """List volume groups and their size
160
161   @rtype: dict
162   @return:
163        Dictionary with keys volume name and values
164        the size of the volume
165
166   """
167   command = "vgs --noheadings --units m --nosuffix -o name,size"
168   result = RunCmd(command)
169   retval = {}
170   if result.failed:
171     return retval
172
173   for line in result.stdout.splitlines():
174     try:
175       name, size = line.split()
176       size = int(float(size))
177     except (IndexError, ValueError), err:
178       logging.error("Invalid output from vgs (%s): %s", err, line)
179       continue
180
181     retval[name] = size
182
183   return retval
184
185
186 def BridgeExists(bridge):
187   """Check whether the given bridge exists in the system
188
189   @type bridge: str
190   @param bridge: the bridge name to check
191   @rtype: boolean
192   @return: True if it does
193
194   """
195   return os.path.isdir("/sys/class/net/%s/bridge" % bridge)
196
197
198 def TryConvert(fn, val):
199   """Try to convert a value ignoring errors.
200
201   This function tries to apply function I{fn} to I{val}. If no
202   C{ValueError} or C{TypeError} exceptions are raised, it will return
203   the result, else it will return the original value. Any other
204   exceptions are propagated to the caller.
205
206   @type fn: callable
207   @param fn: function to apply to the value
208   @param val: the value to be converted
209   @return: The converted value if the conversion was successful,
210       otherwise the original value.
211
212   """
213   try:
214     nv = fn(val)
215   except (ValueError, TypeError):
216     nv = val
217   return nv
218
219
220 def ParseCpuMask(cpu_mask):
221   """Parse a CPU mask definition and return the list of CPU IDs.
222
223   CPU mask format: comma-separated list of CPU IDs
224   or dash-separated ID ranges
225   Example: "0-2,5" -> "0,1,2,5"
226
227   @type cpu_mask: str
228   @param cpu_mask: CPU mask definition
229   @rtype: list of int
230   @return: list of CPU IDs
231
232   """
233   if not cpu_mask:
234     return []
235   cpu_list = []
236   for range_def in cpu_mask.split(","):
237     boundaries = range_def.split("-")
238     n_elements = len(boundaries)
239     if n_elements > 2:
240       raise errors.ParseError("Invalid CPU ID range definition"
241                               " (only one hyphen allowed): %s" % range_def)
242     try:
243       lower = int(boundaries[0])
244     except (ValueError, TypeError), err:
245       raise errors.ParseError("Invalid CPU ID value for lower boundary of"
246                               " CPU ID range: %s" % str(err))
247     try:
248       higher = int(boundaries[-1])
249     except (ValueError, TypeError), err:
250       raise errors.ParseError("Invalid CPU ID value for higher boundary of"
251                               " CPU ID range: %s" % str(err))
252     if lower > higher:
253       raise errors.ParseError("Invalid CPU ID range definition"
254                               " (%d > %d): %s" % (lower, higher, range_def))
255     cpu_list.extend(range(lower, higher + 1))
256   return cpu_list
257
258
259 def GetHomeDir(user, default=None):
260   """Try to get the homedir of the given user.
261
262   The user can be passed either as a string (denoting the name) or as
263   an integer (denoting the user id). If the user is not found, the
264   'default' argument is returned, which defaults to None.
265
266   """
267   try:
268     if isinstance(user, basestring):
269       result = pwd.getpwnam(user)
270     elif isinstance(user, (int, long)):
271       result = pwd.getpwuid(user)
272     else:
273       raise errors.ProgrammerError("Invalid type passed to GetHomeDir (%s)" %
274                                    type(user))
275   except KeyError:
276     return default
277   return result.pw_dir
278
279
280 def FirstFree(seq, base=0):
281   """Returns the first non-existing integer from seq.
282
283   The seq argument should be a sorted list of positive integers. The
284   first time the index of an element is smaller than the element
285   value, the index will be returned.
286
287   The base argument is used to start at a different offset,
288   i.e. C{[3, 4, 6]} with I{offset=3} will return 5.
289
290   Example: C{[0, 1, 3]} will return I{2}.
291
292   @type seq: sequence
293   @param seq: the sequence to be analyzed.
294   @type base: int
295   @param base: use this value as the base index of the sequence
296   @rtype: int
297   @return: the first non-used index in the sequence
298
299   """
300   for idx, elem in enumerate(seq):
301     assert elem >= base, "Passed element is higher than base offset"
302     if elem > idx + base:
303       # idx is not used
304       return idx + base
305   return None
306
307
308 def SingleWaitForFdCondition(fdobj, event, timeout):
309   """Waits for a condition to occur on the socket.
310
311   Immediately returns at the first interruption.
312
313   @type fdobj: integer or object supporting a fileno() method
314   @param fdobj: entity to wait for events on
315   @type event: integer
316   @param event: ORed condition (see select module)
317   @type timeout: float or None
318   @param timeout: Timeout in seconds
319   @rtype: int or None
320   @return: None for timeout, otherwise occured conditions
321
322   """
323   check = (event | select.POLLPRI |
324            select.POLLNVAL | select.POLLHUP | select.POLLERR)
325
326   if timeout is not None:
327     # Poller object expects milliseconds
328     timeout *= 1000
329
330   poller = select.poll()
331   poller.register(fdobj, event)
332   try:
333     # TODO: If the main thread receives a signal and we have no timeout, we
334     # could wait forever. This should check a global "quit" flag or something
335     # every so often.
336     io_events = poller.poll(timeout)
337   except select.error, err:
338     if err[0] != errno.EINTR:
339       raise
340     io_events = []
341   if io_events and io_events[0][1] & check:
342     return io_events[0][1]
343   else:
344     return None
345
346
347 class FdConditionWaiterHelper(object):
348   """Retry helper for WaitForFdCondition.
349
350   This class contains the retried and wait functions that make sure
351   WaitForFdCondition can continue waiting until the timeout is actually
352   expired.
353
354   """
355
356   def __init__(self, timeout):
357     self.timeout = timeout
358
359   def Poll(self, fdobj, event):
360     result = SingleWaitForFdCondition(fdobj, event, self.timeout)
361     if result is None:
362       raise RetryAgain()
363     else:
364       return result
365
366   def UpdateTimeout(self, timeout):
367     self.timeout = timeout
368
369
370 def WaitForFdCondition(fdobj, event, timeout):
371   """Waits for a condition to occur on the socket.
372
373   Retries until the timeout is expired, even if interrupted.
374
375   @type fdobj: integer or object supporting a fileno() method
376   @param fdobj: entity to wait for events on
377   @type event: integer
378   @param event: ORed condition (see select module)
379   @type timeout: float or None
380   @param timeout: Timeout in seconds
381   @rtype: int or None
382   @return: None for timeout, otherwise occured conditions
383
384   """
385   if timeout is not None:
386     retrywaiter = FdConditionWaiterHelper(timeout)
387     try:
388       result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout,
389                      args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout)
390     except RetryTimeout:
391       result = None
392   else:
393     result = None
394     while result is None:
395       result = SingleWaitForFdCondition(fdobj, event, timeout)
396   return result
397
398
399 def EnsureDaemon(name):
400   """Check for and start daemon if not alive.
401
402   """
403   result = RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
404   if result.failed:
405     logging.error("Can't start daemon '%s', failure %s, output: %s",
406                   name, result.fail_reason, result.output)
407     return False
408
409   return True
410
411
412 def StopDaemon(name):
413   """Stop daemon
414
415   """
416   result = RunCmd([constants.DAEMON_UTIL, "stop", name])
417   if result.failed:
418     logging.error("Can't stop daemon '%s', failure %s, output: %s",
419                   name, result.fail_reason, result.output)
420     return False
421
422   return True
423
424
425 def CheckVolumeGroupSize(vglist, vgname, minsize):
426   """Checks if the volume group list is valid.
427
428   The function will check if a given volume group is in the list of
429   volume groups and has a minimum size.
430
431   @type vglist: dict
432   @param vglist: dictionary of volume group names and their size
433   @type vgname: str
434   @param vgname: the volume group we should check
435   @type minsize: int
436   @param minsize: the minimum size we accept
437   @rtype: None or str
438   @return: None for success, otherwise the error message
439
440   """
441   vgsize = vglist.get(vgname, None)
442   if vgsize is None:
443     return "volume group '%s' missing" % vgname
444   elif vgsize < minsize:
445     return ("volume group '%s' too small (%s MiB required, %d MiB found)" %
446             (vgname, minsize, vgsize))
447   return None
448
449
450 def SplitTime(value):
451   """Splits time as floating point number into a tuple.
452
453   @param value: Time in seconds
454   @type value: int or float
455   @return: Tuple containing (seconds, microseconds)
456
457   """
458   (seconds, microseconds) = divmod(int(value * 1000000), 1000000)
459
460   assert 0 <= seconds, \
461     "Seconds must be larger than or equal to 0, but are %s" % seconds
462   assert 0 <= microseconds <= 999999, \
463     "Microseconds must be 0-999999, but are %s" % microseconds
464
465   return (int(seconds), int(microseconds))
466
467
468 def MergeTime(timetuple):
469   """Merges a tuple into time as a floating point number.
470
471   @param timetuple: Time as tuple, (seconds, microseconds)
472   @type timetuple: tuple
473   @return: Time as a floating point number expressed in seconds
474
475   """
476   (seconds, microseconds) = timetuple
477
478   assert 0 <= seconds, \
479     "Seconds must be larger than or equal to 0, but are %s" % seconds
480   assert 0 <= microseconds <= 999999, \
481     "Microseconds must be 0-999999, but are %s" % microseconds
482
483   return float(seconds) + (float(microseconds) * 0.000001)
484
485
486 def FindMatch(data, name):
487   """Tries to find an item in a dictionary matching a name.
488
489   Callers have to ensure the data names aren't contradictory (e.g. a regexp
490   that matches a string). If the name isn't a direct key, all regular
491   expression objects in the dictionary are matched against it.
492
493   @type data: dict
494   @param data: Dictionary containing data
495   @type name: string
496   @param name: Name to look for
497   @rtype: tuple; (value in dictionary, matched groups as list)
498
499   """
500   if name in data:
501     return (data[name], [])
502
503   for key, value in data.items():
504     # Regex objects
505     if hasattr(key, "match"):
506       m = key.match(name)
507       if m:
508         return (value, list(m.groups()))
509
510   return None
511
512
513 def GetMounts(filename=constants.PROC_MOUNTS):
514   """Returns the list of mounted filesystems.
515
516   This function is Linux-specific.
517
518   @param filename: path of mounts file (/proc/mounts by default)
519   @rtype: list of tuples
520   @return: list of mount entries (device, mountpoint, fstype, options)
521
522   """
523   # TODO(iustin): investigate non-Linux options (e.g. via mount output)
524   data = []
525   mountlines = ReadFile(filename).splitlines()
526   for line in mountlines:
527     device, mountpoint, fstype, options, _ = line.split(None, 4)
528     data.append((device, mountpoint, fstype, options))
529
530   return data
531
532
533 def SignalHandled(signums):
534   """Signal Handled decoration.
535
536   This special decorator installs a signal handler and then calls the target
537   function. The function must accept a 'signal_handlers' keyword argument,
538   which will contain a dict indexed by signal number, with SignalHandler
539   objects as values.
540
541   The decorator can be safely stacked with iself, to handle multiple signals
542   with different handlers.
543
544   @type signums: list
545   @param signums: signals to intercept
546
547   """
548   def wrap(fn):
549     def sig_function(*args, **kwargs):
550       assert "signal_handlers" not in kwargs or \
551              kwargs["signal_handlers"] is None or \
552              isinstance(kwargs["signal_handlers"], dict), \
553              "Wrong signal_handlers parameter in original function call"
554       if "signal_handlers" in kwargs and kwargs["signal_handlers"] is not None:
555         signal_handlers = kwargs["signal_handlers"]
556       else:
557         signal_handlers = {}
558         kwargs["signal_handlers"] = signal_handlers
559       sighandler = SignalHandler(signums)
560       try:
561         for sig in signums:
562           signal_handlers[sig] = sighandler
563         return fn(*args, **kwargs)
564       finally:
565         sighandler.Reset()
566     return sig_function
567   return wrap
568
569
570 def TimeoutExpired(epoch, timeout, _time_fn=time.time):
571   """Checks whether a timeout has expired.
572
573   """
574   return _time_fn() > (epoch + timeout)
575
576
577 class SignalWakeupFd(object):
578   try:
579     # This is only supported in Python 2.5 and above (some distributions
580     # backported it to Python 2.4)
581     _set_wakeup_fd_fn = signal.set_wakeup_fd
582   except AttributeError:
583     # Not supported
584     def _SetWakeupFd(self, _): # pylint: disable=R0201
585       return -1
586   else:
587     def _SetWakeupFd(self, fd):
588       return self._set_wakeup_fd_fn(fd)
589
590   def __init__(self):
591     """Initializes this class.
592
593     """
594     (read_fd, write_fd) = os.pipe()
595
596     # Once these succeeded, the file descriptors will be closed automatically.
597     # Buffer size 0 is important, otherwise .read() with a specified length
598     # might buffer data and the file descriptors won't be marked readable.
599     self._read_fh = os.fdopen(read_fd, "r", 0)
600     self._write_fh = os.fdopen(write_fd, "w", 0)
601
602     self._previous = self._SetWakeupFd(self._write_fh.fileno())
603
604     # Utility functions
605     self.fileno = self._read_fh.fileno
606     self.read = self._read_fh.read
607
608   def Reset(self):
609     """Restores the previous wakeup file descriptor.
610
611     """
612     if hasattr(self, "_previous") and self._previous is not None:
613       self._SetWakeupFd(self._previous)
614       self._previous = None
615
616   def Notify(self):
617     """Notifies the wakeup file descriptor.
618
619     """
620     self._write_fh.write("\0")
621
622   def __del__(self):
623     """Called before object deletion.
624
625     """
626     self.Reset()
627
628
629 class SignalHandler(object):
630   """Generic signal handler class.
631
632   It automatically restores the original handler when deconstructed or
633   when L{Reset} is called. You can either pass your own handler
634   function in or query the L{called} attribute to detect whether the
635   signal was sent.
636
637   @type signum: list
638   @ivar signum: the signals we handle
639   @type called: boolean
640   @ivar called: tracks whether any of the signals have been raised
641
642   """
643   def __init__(self, signum, handler_fn=None, wakeup=None):
644     """Constructs a new SignalHandler instance.
645
646     @type signum: int or list of ints
647     @param signum: Single signal number or set of signal numbers
648     @type handler_fn: callable
649     @param handler_fn: Signal handling function
650
651     """
652     assert handler_fn is None or callable(handler_fn)
653
654     self.signum = set(signum)
655     self.called = False
656
657     self._handler_fn = handler_fn
658     self._wakeup = wakeup
659
660     self._previous = {}
661     try:
662       for signum in self.signum:
663         # Setup handler
664         prev_handler = signal.signal(signum, self._HandleSignal)
665         try:
666           self._previous[signum] = prev_handler
667         except:
668           # Restore previous handler
669           signal.signal(signum, prev_handler)
670           raise
671     except:
672       # Reset all handlers
673       self.Reset()
674       # Here we have a race condition: a handler may have already been called,
675       # but there's not much we can do about it at this point.
676       raise
677
678   def __del__(self):
679     self.Reset()
680
681   def Reset(self):
682     """Restore previous handler.
683
684     This will reset all the signals to their previous handlers.
685
686     """
687     for signum, prev_handler in self._previous.items():
688       signal.signal(signum, prev_handler)
689       # If successful, remove from dict
690       del self._previous[signum]
691
692   def Clear(self):
693     """Unsets the L{called} flag.
694
695     This function can be used in case a signal may arrive several times.
696
697     """
698     self.called = False
699
700   def _HandleSignal(self, signum, frame):
701     """Actual signal handling function.
702
703     """
704     # This is not nice and not absolutely atomic, but it appears to be the only
705     # solution in Python -- there are no atomic types.
706     self.called = True
707
708     if self._wakeup:
709       # Notify whoever is interested in signals
710       self._wakeup.Notify()
711
712     if self._handler_fn:
713       self._handler_fn(signum, frame)
714
715
716 class FieldSet(object):
717   """A simple field set.
718
719   Among the features are:
720     - checking if a string is among a list of static string or regex objects
721     - checking if a whole list of string matches
722     - returning the matching groups from a regex match
723
724   Internally, all fields are held as regular expression objects.
725
726   """
727   def __init__(self, *items):
728     self.items = [re.compile("^%s$" % value) for value in items]
729
730   def Extend(self, other_set):
731     """Extend the field set with the items from another one"""
732     self.items.extend(other_set.items)
733
734   def Matches(self, field):
735     """Checks if a field matches the current set
736
737     @type field: str
738     @param field: the string to match
739     @return: either None or a regular expression match object
740
741     """
742     for m in itertools.ifilter(None, (val.match(field) for val in self.items)):
743       return m
744     return None
745
746   def NonMatching(self, items):
747     """Returns the list of fields not matching the current set
748
749     @type items: list
750     @param items: the list of fields to check
751     @rtype: list
752     @return: list of non-matching fields
753
754     """
755     return [val for val in items if not self.Matches(val)]