Merge branch 'devel-2.1'
authorGuido Trotter <ultrotter@google.com>
Thu, 10 Dec 2009 14:49:49 +0000 (14:49 +0000)
committerGuido Trotter <ultrotter@google.com>
Thu, 10 Dec 2009 14:49:49 +0000 (14:49 +0000)
* devel-2.1:
  Add disk cache control parameter for KVM
  Change pyinotify import for broader compatibility
  ClusterMasterQuery: add primary ip field
  confd ClusterMasterQuery: allow fields request
  Simplify utils.ReadFile
  DRBD: ignore unreadable meta devices
  gnt-cluster verify: Warn if node time diverges too far
  KVM: fail when a routed nic has no ip
  Enable batch mode for devel/upload
  cmdlib: Work around race condition in DRBD before version 8.0.13

daemons/ganeti-confd
devel/upload.in
lib/asyncnotifier.py
lib/backend.py
lib/bdev.py
lib/cmdlib.py
lib/confd/querylib.py
lib/constants.py
lib/hypervisor/hv_kvm.py
lib/utils.py
test/ganeti.constants_unittest.py

index a3b51f8..0454da7 100755 (executable)
@@ -29,9 +29,13 @@ It uses UDP+HMAC for authentication with a global cluster key.
 import os
 import sys
 import logging
-import pyinotify
 import time
 
+try:
+  from pyinotify import pyinotify
+except ImportError:
+  import pyinotify
+
 from optparse import OptionParser
 
 from ganeti import asyncnotifier
index 6c19862..8f31c3b 100644 (file)
@@ -100,7 +100,9 @@ echo ---
 # and now put it under $prefix on the target node(s)
 for host; do
   echo Uploading code to ${host}...
-  rsync -v -rlDc --exclude="*.py[oc]" --exclude="*.pdf" --exclude="*.html" \
+  rsync -v -rlDc \
+    -e "ssh -oBatchMode=yes" \
+    --exclude="*.py[oc]" --exclude="*.pdf" --exclude="*.html" \
     "$TXD/" \
     root@${host}:/ &
 done
@@ -109,7 +111,7 @@ wait
 if test -z "${NO_RESTART}"; then
   for host; do
     echo Restarting ganeti-noded on ${host}...
-    ssh root@${host} /etc/init.d/ganeti restart &
+    ssh -oBatchMode=yes root@${host} /etc/init.d/ganeti restart &
   done
   wait
 fi
index 1498ad0..a0fb2d0 100644 (file)
 """Asynchronous pyinotify implementation"""
 
 
-import pyinotify
 import asyncore
 
+try:
+    from pyinotify import pyinotify
+except ImportError:
+    import pyinotify
+
 
 class AsyncNotifier(asyncore.file_dispatcher):
   """An asyncore dispatcher for inotify events.
index f5f258b..cce99ab 100644 (file)
@@ -551,6 +551,10 @@ def VerifyNode(what, cluster_name):
       tmpr.append("The procfs filesystem doesn't seem to be mounted"
                   " under /proc, missing required directory /proc/sys and"
                   " the file /proc/sysrq-trigger")
+
+  if constants.NV_TIME in what:
+    result[constants.NV_TIME] = utils.SplitTime(time.time())
+
   return result
 
 
index 7991345..86f27d1 100644 (file)
@@ -34,6 +34,10 @@ from ganeti import constants
 from ganeti import objects
 
 
+# Size of reads in _CanReadDevice
+_DEVICE_READ_SIZE = 128 * 1024
+
+
 def _IgnoreError(fn, *args, **kwargs):
   """Executes the given function, ignoring BlockDeviceErrors.
 
@@ -66,6 +70,20 @@ def _ThrowError(msg, *args):
   raise errors.BlockDeviceError(msg)
 
 
+def _CanReadDevice(path):
+  """Check if we can read from the given device.
+
+  This tries to read the first 128k of the device.
+
+  """
+  try:
+    utils.ReadFile(path, size=_DEVICE_READ_SIZE)
+    return True
+  except EnvironmentError, err:
+    logging.warning("Can't read from device %s", path, exc_info=True)
+    return False
+
+
 class BlockDev(object):
   """Block device abstract class.
 
@@ -960,6 +978,17 @@ class DRBD8(BaseDRBD):
   def __init__(self, unique_id, children, size):
     if children and children.count(None) > 0:
       children = []
+    if len(children) not in (0, 2):
+      raise ValueError("Invalid configuration data %s" % str(children))
+    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
+      raise ValueError("Invalid configuration data %s" % str(unique_id))
+    (self._lhost, self._lport,
+     self._rhost, self._rport,
+     self._aminor, self._secret) = unique_id
+    if children:
+      if not _CanReadDevice(children[1].dev_path):
+        logging.info("drbd%s: Ignoring unreadable meta device", self._aminor)
+        children = []
     super(DRBD8, self).__init__(unique_id, children, size)
     self.major = self._DRBD_MAJOR
     version = self._GetVersion()
@@ -968,13 +997,6 @@ class DRBD8(BaseDRBD):
                   " usage: kernel is %s.%s, ganeti wants 8.x",
                   version['k_major'], version['k_minor'])
 
-    if len(children) not in (0, 2):
-      raise ValueError("Invalid configuration data %s" % str(children))
-    if not isinstance(unique_id, (tuple, list)) or len(unique_id) != 6:
-      raise ValueError("Invalid configuration data %s" % str(unique_id))
-    (self._lhost, self._lport,
-     self._rhost, self._rport,
-     self._aminor, self._secret) = unique_id
     if (self._lhost is not None and self._lhost == self._rhost and
         self._lport == self._rport):
       raise ValueError("Invalid configuration data, same local/remote %s" %
index 102816f..7260a46 100644 (file)
@@ -944,6 +944,7 @@ class LUVerifyCluster(LogicalUnit):
   ENODESSH = (TNODE, "ENODESSH")
   ENODEVERSION = (TNODE, "ENODEVERSION")
   ENODESETUP = (TNODE, "ENODESETUP")
+  ENODETIME = (TNODE, "ENODETIME")
 
   ETYPE_FIELD = "code"
   ETYPE_ERROR = "ERROR"
@@ -1326,14 +1327,23 @@ class LUVerifyCluster(LogicalUnit):
       constants.NV_VERSION: None,
       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
       constants.NV_NODESETUP: None,
+      constants.NV_TIME: None,
       }
+
     if vg_name is not None:
       node_verify_param[constants.NV_VGLIST] = None
       node_verify_param[constants.NV_LVLIST] = vg_name
       node_verify_param[constants.NV_PVLIST] = [vg_name]
       node_verify_param[constants.NV_DRBDLIST] = None
+
+    # Due to the way our RPC system works, exact response times cannot be
+    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
+    # time before and after executing the request, we can at least have a time
+    # window.
+    nvinfo_starttime = time.time()
     all_nvinfo = self.rpc.call_node_verify(nodelist, node_verify_param,
                                            self.cfg.GetClusterName())
+    nvinfo_endtime = time.time()
 
     cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
@@ -1380,6 +1390,7 @@ class LUVerifyCluster(LogicalUnit):
         else:
           instance = instanceinfo[instance]
           node_drbd[minor] = (instance.name, instance.admin_up)
+
       self._VerifyNode(node_i, file_names, local_checksums,
                        nresult, master_files, node_drbd, vg_name)
 
@@ -1413,6 +1424,27 @@ class LUVerifyCluster(LogicalUnit):
       if test:
         continue
 
+      # Node time
+      ntime = nresult.get(constants.NV_TIME, None)
+      try:
+        ntime_merged = utils.MergeTime(ntime)
+      except (ValueError, TypeError):
+        _ErrorIf(test, self.ENODETIME, node, "Node returned invalid time")
+
+      if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
+        ntime_diff = abs(nvinfo_starttime - ntime_merged)
+      elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
+        ntime_diff = abs(ntime_merged - nvinfo_endtime)
+      else:
+        ntime_diff = None
+
+      _ErrorIf(ntime_diff is not None, self.ENODETIME, node,
+               "Node time diverges by at least %0.1fs from master node time",
+               ntime_diff)
+
+      if ntime_diff is not None:
+        continue
+
       try:
         node_info[node] = {
           "mfree": int(nodeinfo['memory_free']),
@@ -7179,6 +7211,14 @@ class LUGrowDisk(LogicalUnit):
       self.cfg.SetDiskID(disk, node)
       result = self.rpc.call_blockdev_grow(node, disk, self.op.amount)
       result.Raise("Grow request failed to node %s" % node)
+
+      # TODO: Rewrite code to work properly
+      # DRBD goes into sync mode for a short amount of time after executing the
+      # "resize" command. DRBD 8.x below version 8.0.13 contains a bug whereby
+      # calling "resize" in sync mode fails. Sleeping for a short amount of
+      # time is a work-around.
+      time.sleep(5)
+
     disk.RecordGrow(self.op.amount)
     self.cfg.Update(instance, feedback_fn)
     if self.op.wait_for_sync:
index 17621dd..eda060a 100644 (file)
@@ -94,16 +94,38 @@ class ClusterMasterQuery(ConfdQuery):
   It accepts no arguments, and returns the current cluster master.
 
   """
+  def _GetMasterNode(self):
+    return self.reader.GetMasterNode()
+
   def Exec(self, query):
     """ClusterMasterQuery main execution
 
     """
-    if query is None:
+    if isinstance(query, dict):
+      if constants.CONFD_REQQ_FIELDS in query:
+        status = constants.CONFD_REPL_STATUS_OK
+        req_fields = query[constants.CONFD_REQQ_FIELDS]
+        if not isinstance(req_fields, (list, tuple)):
+          logging.debug("FIELDS request should be a list")
+          return QUERY_ARGUMENT_ERROR
+
+        answer = []
+        for field in req_fields:
+          if field == constants.CONFD_REQFIELD_NAME:
+            answer.append(self._GetMasterNode())
+          elif field == constants.CONFD_REQFIELD_IP:
+            answer.append(self.reader.GetMasterIP())
+          elif field == constants.CONFD_REQFIELD_MNODE_PIP:
+            answer.append(self.reader.GetNodePrimaryIp(self._GetMasterNode()))
+      else:
+        logging.debug("missing FIELDS in query dict")
+        return QUERY_ARGUMENT_ERROR
+    elif not query:
       status = constants.CONFD_REPL_STATUS_OK
       answer = self.reader.GetMasterNode()
     else:
-      status = constants.CONFD_REPL_STATUS_ERROR
-      answer = 'master query accepts no query argument'
+      logging.debug("Invalid master query argument: not dict or empty")
+      return QUERY_ARGUMENT_ERROR
 
     return status, answer
 
index c6e9168..c353878 100644 (file)
@@ -314,7 +314,7 @@ DEFAULT_MAC_PREFIX = "aa:00:00"
 LVM_STRIPECOUNT = _autoconf.LVM_STRIPECOUNT
 # default maximum instance wait time, in seconds.
 DEFAULT_SHUTDOWN_TIMEOUT = 120
-
+NODE_MAX_CLOCK_SKEW = 150
 
 # RPC constants
 (RPC_ENCODING_NONE,
@@ -531,6 +531,7 @@ NV_LVLIST = "lvlist"
 NV_PVLIST = "pvlist"
 NV_DRBDLIST = "drbd-list"
 NV_NODESETUP = "nodesetup"
+NV_TIME = "time"
 
 # Allocator framework constants
 IALLOCATOR_VERSION = 2
@@ -692,6 +693,11 @@ CONFD_REQ_INSTANCES_IPS_LIST = 6
 CONFD_REQQ_LINK = "0"
 CONFD_REQQ_IP = "1"
 CONFD_REQQ_IPLIST = "2"
+CONFD_REQQ_FIELDS = "3"
+
+CONFD_REQFIELD_NAME = "0"
+CONFD_REQFIELD_IP = "1"
+CONFD_REQFIELD_MNODE_PIP = "2"
 
 CONFD_REQS = frozenset([
   CONFD_REQ_PING,
@@ -728,7 +734,7 @@ CONFD_ERROR_ARGUMENT = 3
 # Each request is "salted" by the current timestamp.
 # This constants decides how many seconds of skew to accept.
 # TODO: make this a default and allow the value to be more configurable
-CONFD_MAX_CLOCK_SKEW = 300
+CONFD_MAX_CLOCK_SKEW = 2 * NODE_MAX_CLOCK_SKEW
 
 # When we haven't reloaded the config for more than this amount of seconds, we
 # force a test to see if inotify is betraying us.
index 94e5651..7caae96 100644 (file)
@@ -185,6 +185,8 @@ class KVMHypervisor(hv_base.BaseHypervisor):
       script.write("  # Connect the interface to the bridge\n")
       script.write("  /usr/sbin/brctl addif $BRIDGE $INTERFACE\n")
     elif nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_ROUTED:
+      if not nic.ip:
+        raise errors.HypervisorError("nic/%d is routed, but has no ip." % seq)
       script.write("  # Route traffic targeted at the IP to the interface\n")
       if nic.nicparams[constants.NIC_LINK]:
         script.write("  while /sbin/ip rule del dev $INTERFACE; do :; done\n")
index 91baa88..f08e75c 100644 (file)
@@ -1226,21 +1226,18 @@ def EnsureDirs(dirs):
       raise errors.GenericError("%s is not a directory" % dir_name)
 
 
-def ReadFile(file_name, size=None):
+def ReadFile(file_name, size=-1):
   """Reads a file.
 
-  @type size: None or int
-  @param size: Read at most size bytes
+  @type size: int
+  @param size: Read at most size bytes (if negative, entire file)
   @rtype: str
   @return: the (possibly partial) content of the file
 
   """
   f = open(file_name, "r")
   try:
-    if size is None:
-      return f.read()
-    else:
-      return f.read(size)
+    return f.read(size)
   finally:
     f.close()
 
index 219afee..7f1f0cb 100755 (executable)
@@ -60,6 +60,10 @@ class TestConstants(unittest.TestCase):
     self.failUnless(constants.LDS_OKAY < constants.LDS_UNKNOWN)
     self.failUnless(constants.LDS_UNKNOWN < constants.LDS_FAULTY)
 
+  def testClockSkew(self):
+    self.failUnless(constants.NODE_MAX_CLOCK_SKEW <
+                    (0.8 * constants.CONFD_MAX_CLOCK_SKEW))
+
 
 class TestParameterNames(unittest.TestCase):
   """HV/BE parameter tests"""
@@ -81,7 +85,7 @@ class TestConfdConstants(unittest.TestCase):
 
   def testFourCc(self):
     self.failUnlessEqual(len(constants.CONFD_MAGIC_FOURCC), 4,
-                    "Invalid fourcc len, should be 4")
+                         "Invalid fourcc len, should be 4")
 
   def _IsUniqueSequence(self, sequence):
     seen = set()