Merge branch 'devel-2.1'
[ganeti-local] / lib / cmdlib.py
index bc54084..9f8b1ff 100644 (file)
@@ -46,6 +46,9 @@ from ganeti import serializer
 from ganeti import ssconf
 from ganeti import uidpool
 from ganeti import compat
+from ganeti import masterd
+
+import ganeti.masterd.instance # pylint: disable-msg=W0611
 
 
 class LogicalUnit(object):
@@ -920,13 +923,6 @@ def _FindFaultyInstanceDisks(cfg, rpc, instance, node_name, prereq):
   return faulty
 
 
-def _FormatTimestamp(secs):
-  """Formats a Unix timestamp with the local timezone.
-
-  """
-  return time.strftime("%F %T %Z", time.gmtime(secs))
-
-
 class LUPostInitCluster(LogicalUnit):
   """Logical unit for running hooks after cluster initialization.
 
@@ -1018,45 +1014,6 @@ class LUDestroyCluster(LogicalUnit):
     return master
 
 
-def _VerifyCertificateInner(filename, expired, not_before, not_after, now,
-                            warn_days=constants.SSL_CERT_EXPIRATION_WARN,
-                            error_days=constants.SSL_CERT_EXPIRATION_ERROR):
-  """Verifies certificate details for LUVerifyCluster.
-
-  """
-  if expired:
-    msg = "Certificate %s is expired" % filename
-
-    if not_before is not None and not_after is not None:
-      msg += (" (valid from %s to %s)" %
-              (_FormatTimestamp(not_before),
-               _FormatTimestamp(not_after)))
-    elif not_before is not None:
-      msg += " (valid from %s)" % _FormatTimestamp(not_before)
-    elif not_after is not None:
-      msg += " (valid until %s)" % _FormatTimestamp(not_after)
-
-    return (LUVerifyCluster.ETYPE_ERROR, msg)
-
-  elif not_before is not None and not_before > now:
-    return (LUVerifyCluster.ETYPE_WARNING,
-            "Certificate %s not yet valid (valid from %s)" %
-            (filename, _FormatTimestamp(not_before)))
-
-  elif not_after is not None:
-    remaining_days = int((not_after - now) / (24 * 3600))
-
-    msg = ("Certificate %s expires in %d days" % (filename, remaining_days))
-
-    if remaining_days <= error_days:
-      return (LUVerifyCluster.ETYPE_ERROR, msg)
-
-    if remaining_days <= warn_days:
-      return (LUVerifyCluster.ETYPE_WARNING, msg)
-
-  return (None, None)
-
-
 def _VerifyCertificate(filename):
   """Verifies a certificate for LUVerifyCluster.
 
@@ -1071,11 +1028,23 @@ def _VerifyCertificate(filename):
     return (LUVerifyCluster.ETYPE_ERROR,
             "Failed to load X509 certificate %s: %s" % (filename, err))
 
-  # Depending on the pyOpenSSL version, this can just return (None, None)
-  (not_before, not_after) = utils.GetX509CertValidity(cert)
+  (errcode, msg) = \
+    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
+                                constants.SSL_CERT_EXPIRATION_ERROR)
+
+  if msg:
+    fnamemsg = "While verifying %s: %s" % (filename, msg)
+  else:
+    fnamemsg = None
 
-  return _VerifyCertificateInner(filename, cert.has_expired(),
-                                 not_before, not_after, time.time())
+  if errcode is None:
+    return (None, fnamemsg)
+  elif errcode == utils.CERT_WARNING:
+    return (LUVerifyCluster.ETYPE_WARNING, fnamemsg)
+  elif errcode == utils.CERT_ERROR:
+    return (LUVerifyCluster.ETYPE_ERROR, fnamemsg)
+
+  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
 
 
 class LUVerifyCluster(LogicalUnit):
@@ -1689,6 +1658,7 @@ class LUVerifyCluster(LogicalUnit):
 
     vg_name = self.cfg.GetVGName()
     hypervisors = self.cfg.GetClusterInfo().enabled_hypervisors
+    cluster = self.cfg.GetClusterInfo()
     nodelist = utils.NiceSort(self.cfg.GetNodeList())
     nodeinfo = [self.cfg.GetNodeInfo(nname) for nname in nodelist]
     instancelist = utils.NiceSort(self.cfg.GetInstanceList())
@@ -1707,6 +1677,8 @@ class LUVerifyCluster(LogicalUnit):
     file_names = ssconf.SimpleStore().GetFileList()
     file_names.extend(constants.ALL_CERT_FILES)
     file_names.extend(master_files)
+    if cluster.modify_etc_hosts:
+      file_names.append(constants.ETC_HOSTS)
 
     local_checksums = utils.FingerprintFiles(file_names)
 
@@ -1770,7 +1742,6 @@ class LUVerifyCluster(LogicalUnit):
                                            self.cfg.GetClusterName())
     nvinfo_endtime = time.time()
 
-    cluster = self.cfg.GetClusterInfo()
     master_node = self.cfg.GetMasterNode()
     all_drbd_map = self.cfg.ComputeDRBDMap()
 
@@ -2881,6 +2852,12 @@ class LURemoveNode(LogicalUnit):
       self.LogWarning("Errors encountered on the remote node while leaving"
                       " the cluster: %s", msg)
 
+    # Remove node from our /etc/hosts
+    if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
+      utils.RemoveHostFromEtcHosts(node.name)
+      _RedistributeAncillaryFiles(self)
+
 
 class LUQueryNodes(NoHooksLU):
   """Logical unit for querying nodes.
@@ -3455,6 +3432,7 @@ class LUAddNode(LogicalUnit):
 
     # Add node to our /etc/hosts, and add key to known_hosts
     if self.cfg.GetClusterInfo().modify_etc_hosts:
+      # FIXME: this should be done via an rpc call to node daemon
       utils.AddHostToEtcHosts(new_node.name)
 
     if new_node.secondary_ip != new_node.primary_ip:
@@ -6806,18 +6784,30 @@ class LUCreateInstance(LogicalUnit):
 
       elif self.op.mode == constants.INSTANCE_IMPORT:
         feedback_fn("* running the instance OS import scripts...")
-        src_node = self.op.src_node
-        src_images = self.src_images
-        cluster_name = self.cfg.GetClusterName()
-        # FIXME: pass debug option from opcode to backend
-        import_result = self.rpc.call_instance_os_import(pnode_name, iobj,
-                                                         src_node, src_images,
-                                                         cluster_name,
-                                                         self.op.debug_level)
-        msg = import_result.fail_msg
-        if msg:
-          self.LogWarning("Error while importing the disk images for instance"
-                          " %s on node %s: %s" % (instance, pnode_name, msg))
+
+        transfers = []
+
+        for idx, image in enumerate(self.src_images):
+          if not image:
+            continue
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("disk/%s" % idx,
+                                             constants.IEIO_FILE, (image, ),
+                                             constants.IEIO_SCRIPT,
+                                             (iobj.disks[idx], idx),
+                                             None)
+          transfers.append(dt)
+
+        import_result = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                self.op.src_node, pnode_name,
+                                                self.pnode.secondary_ip,
+                                                iobj, transfers)
+        if not compat.all(import_result):
+          self.LogWarning("Some disks for instance %s on node %s were not"
+                          " imported successfully" % (instance, pnode_name))
+
       else:
         # also checked in the prereq part
         raise errors.ProgrammerError("Unknown OS initialization mode '%s'"
@@ -9015,7 +9005,6 @@ class LUExportInstance(LogicalUnit):
 
     try:
       # per-disk results
-      dresults = []
       removed_snaps = [False] * len(instance.disks)
 
       snap_disks = None
@@ -9038,28 +9027,36 @@ class LUExportInstance(LogicalUnit):
 
         # TODO: check for size
 
-        cluster_name = self.cfg.GetClusterName()
+        def _TransferFinished(idx):
+          logging.debug("Transfer %s finished", idx)
+          if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
+            removed_snaps[idx] = True
+
+        transfers = []
+
         for idx, dev in enumerate(snap_disks):
-          feedback_fn("Exporting snapshot %s from %s to %s" %
-                      (idx, src_node, dst_node.name))
-          if dev:
-            # FIXME: pass debug from opcode to backend
-            result = self.rpc.call_snapshot_export(src_node, dev, dst_node.name,
-                                                   instance, cluster_name,
-                                                   idx, self.op.debug_level)
-            msg = result.fail_msg
-            if msg:
-              self.LogWarning("Could not export disk/%s from node %s to"
-                              " node %s: %s", idx, src_node, dst_node.name, msg)
-              dresults.append(False)
-            else:
-              dresults.append(True)
+          if not dev:
+            transfers.append(None)
+            continue
 
-            # Remove snapshot
-            if self._RemoveSnapshot(feedback_fn, snap_disks, idx):
-              removed_snaps[idx] = True
-          else:
-            dresults.append(False)
+          path = utils.PathJoin(constants.EXPORT_DIR, "%s.new" % instance.name,
+                                dev.physical_id[1])
+
+          finished_fn = compat.partial(_TransferFinished, idx)
+
+          # FIXME: pass debug option from opcode to backend
+          dt = masterd.instance.DiskTransfer("snapshot/%s" % idx,
+                                             constants.IEIO_SCRIPT, (dev, idx),
+                                             constants.IEIO_FILE, (path, ),
+                                             finished_fn)
+          transfers.append(dt)
+
+        # Actually export data
+        dresults = \
+          masterd.instance.TransferInstanceData(self, feedback_fn,
+                                                src_node, dst_node.name,
+                                                dst_node.secondary_ip,
+                                                instance, transfers)
 
         assert len(dresults) == len(instance.disks)