Merge branch 'stable-2.6'
[ganeti-local] / tools / move-instance
index ef3dd8d..168ad81 100755 (executable)
@@ -22,7 +22,7 @@
 
 """
 
 
 """
 
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
 # C0103: Invalid name move-instance
 
 import os
 # C0103: Invalid name move-instance
 
 import os
@@ -36,10 +36,11 @@ from ganeti import cli
 from ganeti import constants
 from ganeti import utils
 from ganeti import workerpool
 from ganeti import constants
 from ganeti import utils
 from ganeti import workerpool
+from ganeti import objects
 from ganeti import compat
 from ganeti import rapi
 
 from ganeti import compat
 from ganeti import rapi
 
-import ganeti.rapi.client # pylint: disable-msg=W0611
+import ganeti.rapi.client # pylint: disable=W0611
 import ganeti.rapi.client_utils
 
 
 import ganeti.rapi.client_utils
 
 
@@ -148,18 +149,18 @@ class RapiClientFactory:
     self.src_cluster_name = src_cluster_name
     self.dest_cluster_name = dest_cluster_name
 
     self.src_cluster_name = src_cluster_name
     self.dest_cluster_name = dest_cluster_name
 
+    # TODO: Implement timeouts for RAPI connections
     # TODO: Support for using system default paths for verifying SSL certificate
     # TODO: Support for using system default paths for verifying SSL certificate
-    # (already implemented in CertAuthorityVerify)
     logging.debug("Using '%s' as source CA", options.src_ca_file)
     logging.debug("Using '%s' as source CA", options.src_ca_file)
-    src_ssl_config = rapi.client.CertAuthorityVerify(cafile=options.src_ca_file)
+    src_curl_config = rapi.client.GenericCurlConfig(cafile=options.src_ca_file)
 
     if options.dest_ca_file:
       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
 
     if options.dest_ca_file:
       logging.debug("Using '%s' as destination CA", options.dest_ca_file)
-      dest_ssl_config = \
-        rapi.client.CertAuthorityVerify(cafile=options.dest_ca_file)
+      dest_curl_config = \
+        rapi.client.GenericCurlConfig(cafile=options.dest_ca_file)
     else:
       logging.debug("Using source CA for destination")
     else:
       logging.debug("Using source CA for destination")
-      dest_ssl_config = src_ssl_config
+      dest_curl_config = src_curl_config
 
     logging.debug("Source RAPI server is %s:%s",
                   src_cluster_name, options.src_rapi_port)
 
     logging.debug("Source RAPI server is %s:%s",
                   src_cluster_name, options.src_rapi_port)
@@ -182,7 +183,7 @@ class RapiClientFactory:
     self.GetSourceClient = lambda: \
       rapi.client.GanetiRapiClient(src_cluster_name,
                                    port=options.src_rapi_port,
     self.GetSourceClient = lambda: \
       rapi.client.GanetiRapiClient(src_cluster_name,
                                    port=options.src_rapi_port,
-                                   config_ssl_verification=src_ssl_config,
+                                   curl_config_fn=src_curl_config,
                                    username=src_username,
                                    password=src_password)
 
                                    username=src_username,
                                    password=src_password)
 
@@ -212,7 +213,7 @@ class RapiClientFactory:
     self.GetDestClient = lambda: \
       rapi.client.GanetiRapiClient(dest_cluster_name,
                                    port=dest_rapi_port,
     self.GetDestClient = lambda: \
       rapi.client.GanetiRapiClient(dest_cluster_name,
                                    port=dest_rapi_port,
-                                   config_ssl_verification=dest_ssl_config,
+                                   curl_config_fn=dest_curl_config,
                                    username=dest_username,
                                    password=dest_password)
 
                                    username=dest_username,
                                    password=dest_password)
 
@@ -250,7 +251,7 @@ class MoveJobPollReportCb(cli.JobPollReportCbBase):
       return
 
     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
       return
 
     logging.info("[%s] %s", time.ctime(utils.MergeTime(timestamp)),
-                 utils.SafeEncode(log_msg))
+                 cli.FormatLogMessage(log_type, log_msg))
 
   def ReportNotChanged(self, job_id, status):
     """Called if a job hasn't changed in a while.
 
   def ReportNotChanged(self, job_id, status):
     """Called if a job hasn't changed in a while.
@@ -269,7 +270,8 @@ class InstanceMove(object):
 
   """
   def __init__(self, src_instance_name, dest_instance_name,
 
   """
   def __init__(self, src_instance_name, dest_instance_name,
-               dest_pnode, dest_snode, dest_iallocator):
+               dest_pnode, dest_snode, dest_iallocator,
+               hvparams, beparams, osparams, nics):
     """Initializes this class.
 
     @type src_instance_name: string
     """Initializes this class.
 
     @type src_instance_name: string
@@ -282,6 +284,14 @@ class InstanceMove(object):
     @param dest_snode: Name of secondary node on destination cluster
     @type dest_iallocator: string or None
     @param dest_iallocator: Name of iallocator to use
     @param dest_snode: Name of secondary node on destination cluster
     @type dest_iallocator: string or None
     @param dest_iallocator: Name of iallocator to use
+    @type hvparams: dict or None
+    @param hvparams: Hypervisor parameters to override
+    @type beparams: dict or None
+    @param beparams: Backend parameters to override
+    @type osparams: dict or None
+    @param osparams: OS parameters to override
+    @type nics: dict or None
+    @param nics: NICs to override
 
     """
     self.src_instance_name = src_instance_name
 
     """
     self.src_instance_name = src_instance_name
@@ -289,8 +299,11 @@ class InstanceMove(object):
     self.dest_pnode = dest_pnode
     self.dest_snode = dest_snode
     self.dest_iallocator = dest_iallocator
     self.dest_pnode = dest_pnode
     self.dest_snode = dest_snode
     self.dest_iallocator = dest_iallocator
+    self.hvparams = hvparams
+    self.beparams = beparams
+    self.osparams = osparams
+    self.nics = nics
 
 
-    self.success = None
     self.error_message = None
 
 
     self.error_message = None
 
 
@@ -311,17 +324,12 @@ class MoveRuntime(object):
     self.source_to_dest = threading.Condition(self.lock)
     self.dest_to_source = threading.Condition(self.lock)
 
     self.source_to_dest = threading.Condition(self.lock)
     self.dest_to_source = threading.Condition(self.lock)
 
-    # Set when threads should abort
-    self.abort = None
-
     # Source information
     # Source information
-    self.src_success = None
     self.src_error_message = None
     self.src_expinfo = None
     self.src_instinfo = None
 
     # Destination information
     self.src_error_message = None
     self.src_expinfo = None
     self.src_instinfo = None
 
     # Destination information
-    self.dest_success = None
     self.dest_error_message = None
     self.dest_impinfo = None
 
     self.dest_error_message = None
     self.dest_impinfo = None
 
@@ -340,35 +348,30 @@ class MoveRuntime(object):
       # Call inner function
       fn(*args)
 
       # Call inner function
       fn(*args)
 
-      success = True
       errmsg = None
     except Abort:
       errmsg = None
     except Abort:
-      success = False
       errmsg = "Aborted"
     except Exception, err:
       logging.exception("Caught unhandled exception")
       errmsg = "Aborted"
     except Exception, err:
       logging.exception("Caught unhandled exception")
-      success = False
       errmsg = str(err)
 
       errmsg = str(err)
 
+    setattr(self, "%s_error_message" % prefix, errmsg)
+
     self.lock.acquire()
     try:
     self.lock.acquire()
     try:
-      # Tell all threads to abort
-      self.abort = True
       self.source_to_dest.notifyAll()
       self.dest_to_source.notifyAll()
     finally:
       self.lock.release()
 
       self.source_to_dest.notifyAll()
       self.dest_to_source.notifyAll()
     finally:
       self.lock.release()
 
-    setattr(self, "%s_success" % prefix, success)
-    setattr(self, "%s_error_message" % prefix, errmsg)
-
   def CheckAbort(self):
     """Check whether thread should be aborted.
 
     @raise Abort: When thread should be aborted
 
     """
   def CheckAbort(self):
     """Check whether thread should be aborted.
 
     @raise Abort: When thread should be aborted
 
     """
-    if self.abort:
+    if not (self.src_error_message is None and
+            self.dest_error_message is None):
       logging.info("Aborting")
       raise Abort()
 
       logging.info("Aborting")
       raise Abort()
 
@@ -425,7 +428,9 @@ class MoveDestExecutor(object):
     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
                                   mrt.move.dest_pnode, mrt.move.dest_snode,
                                   mrt.move.dest_iallocator,
     job_id = self._CreateInstance(dest_client, mrt.move.dest_instance_name,
                                   mrt.move.dest_pnode, mrt.move.dest_snode,
                                   mrt.move.dest_iallocator,
-                                  mrt.src_instinfo, mrt.src_expinfo)
+                                  mrt.src_instinfo, mrt.src_expinfo,
+                                  mrt.move.hvparams, mrt.move.beparams,
+                                  mrt.move.beparams, mrt.move.nics)
     mrt.PollJob(dest_client, job_id,
                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
 
     mrt.PollJob(dest_client, job_id,
                 remote_import_fn=compat.partial(self._SetImportInfo, mrt))
 
@@ -448,7 +453,9 @@ class MoveDestExecutor(object):
       mrt.dest_to_source.release()
 
   @staticmethod
       mrt.dest_to_source.release()
 
   @staticmethod
-  def _CreateInstance(cl, name, snode, pnode, iallocator, instance, expinfo):
+  def _CreateInstance(cl, name, pnode, snode, iallocator, instance, expinfo,
+                      override_hvparams, override_beparams, override_osparams,
+                      override_nics):
     """Starts the instance creation in remote import mode.
 
     @type cl: L{rapi.client.GanetiRapiClient}
     """Starts the instance creation in remote import mode.
 
     @type cl: L{rapi.client.GanetiRapiClient}
@@ -465,29 +472,57 @@ class MoveDestExecutor(object):
     @param instance: Instance details from source cluster
     @type expinfo: dict
     @param expinfo: Prepared export information from source cluster
     @param instance: Instance details from source cluster
     @type expinfo: dict
     @param expinfo: Prepared export information from source cluster
+    @type override_hvparams: dict or None
+    @param override_hvparams: Hypervisor parameters to override
+    @type override_beparams: dict or None
+    @param override_beparams: Backend parameters to override
+    @type override_osparams: dict or None
+    @param override_osparams: OS parameters to override
+    @type override_nics: dict or None
+    @param override_nics: NICs to override
     @return: Job ID
 
     """
     disk_template = instance["disk_template"]
 
     disks = [{
     @return: Job ID
 
     """
     disk_template = instance["disk_template"]
 
     disks = [{
-      "size": i["size"],
-      "mode": i["mode"],
+      constants.IDISK_SIZE: i["size"],
+      constants.IDISK_MODE: i["mode"],
       } for i in instance["disks"]]
 
     nics = [{
       } for i in instance["disks"]]
 
     nics = [{
-      "ip": ip,
-      "mac": mac,
-      "mode": mode,
-      "link": link,
+      constants.INIC_IP: ip,
+      constants.INIC_MAC: mac,
+      constants.INIC_MODE: mode,
+      constants.INIC_LINK: link,
       } for ip, mac, mode, link in instance["nics"]]
 
       } for ip, mac, mode, link in instance["nics"]]
 
+    if len(override_nics) > len(nics):
+      raise Error("Can not create new NICs")
+
+    if override_nics:
+      assert len(override_nics) <= len(nics)
+      for idx, (nic, override) in enumerate(zip(nics, override_nics)):
+        nics[idx] = objects.FillDict(nic, override)
+
     # TODO: Should this be the actual up/down status? (run_state)
     start = (instance["config_state"] == "up")
 
     assert len(disks) == len(instance["disks"])
     assert len(nics) == len(instance["nics"])
 
     # TODO: Should this be the actual up/down status? (run_state)
     start = (instance["config_state"] == "up")
 
     assert len(disks) == len(instance["disks"])
     assert len(nics) == len(instance["nics"])
 
+    inst_beparams = instance["be_instance"]
+    if not inst_beparams:
+      inst_beparams = {}
+
+    inst_hvparams = instance["hv_instance"]
+    if not inst_hvparams:
+      inst_hvparams = {}
+
+    inst_osparams = instance["os_instance"]
+    if not inst_osparams:
+      inst_osparams = {}
+
     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
                              name, disk_template, disks, nics,
                              os=instance["os"],
     return cl.CreateInstance(constants.INSTANCE_REMOTE_IMPORT,
                              name, disk_template, disks, nics,
                              os=instance["os"],
@@ -500,8 +535,12 @@ class MoveDestExecutor(object):
                              source_handshake=expinfo["handshake"],
                              source_x509_ca=expinfo["x509_ca"],
                              source_instance_name=instance["name"],
                              source_handshake=expinfo["handshake"],
                              source_x509_ca=expinfo["x509_ca"],
                              source_instance_name=instance["name"],
-                             beparams=instance["be_instance"],
-                             hvparams=instance["hv_instance"])
+                             beparams=objects.FillDict(inst_beparams,
+                                                       override_beparams),
+                             hvparams=objects.FillDict(inst_hvparams,
+                                                       override_hvparams),
+                             osparams=objects.FillDict(inst_osparams,
+                                                       override_osparams))
 
 
 class MoveSourceExecutor(object):
 
 
 class MoveSourceExecutor(object):
@@ -623,7 +662,7 @@ class MoveSourceExecutor(object):
 
 
 class MoveSourceWorker(workerpool.BaseWorker):
 
 
 class MoveSourceWorker(workerpool.BaseWorker):
-  def RunTask(self, rapi_factory, move): # pylint: disable-msg=W0221
+  def RunTask(self, rapi_factory, move): # pylint: disable=W0221
     """Executes an instance move.
 
     @type rapi_factory: L{RapiClientFactory}
     """Executes an instance move.
 
     @type rapi_factory: L{RapiClientFactory}
@@ -652,15 +691,13 @@ class MoveSourceWorker(workerpool.BaseWorker):
       finally:
         dest_thread.join()
 
       finally:
         dest_thread.join()
 
-      move.success = (mrt.src_success and mrt.dest_success)
       if mrt.src_error_message or mrt.dest_error_message:
         move.error_message = ("Source error: %s, destination error: %s" %
                               (mrt.src_error_message, mrt.dest_error_message))
       else:
         move.error_message = None
       if mrt.src_error_message or mrt.dest_error_message:
         move.error_message = ("Source error: %s, destination error: %s" %
                               (mrt.src_error_message, mrt.dest_error_message))
       else:
         move.error_message = None
-    except Exception, err: # pylint: disable-msg=W0703
+    except Exception, err: # pylint: disable=W0703
       logging.exception("Caught unhandled exception")
       logging.exception("Caught unhandled exception")
-      move.success = False
       move.error_message = str(err)
 
 
       move.error_message = str(err)
 
 
@@ -720,6 +757,10 @@ def ParseOptions():
   parser.add_option(cli.DEBUG_OPT)
   parser.add_option(cli.VERBOSE_OPT)
   parser.add_option(cli.IALLOCATOR_OPT)
   parser.add_option(cli.DEBUG_OPT)
   parser.add_option(cli.VERBOSE_OPT)
   parser.add_option(cli.IALLOCATOR_OPT)
+  parser.add_option(cli.BACKEND_OPT)
+  parser.add_option(cli.HVOPTS_OPT)
+  parser.add_option(cli.OSPARAMS_OPT)
+  parser.add_option(cli.NET_OPT)
   parser.add_option(SRC_RAPI_PORT_OPT)
   parser.add_option(SRC_CA_FILE_OPT)
   parser.add_option(SRC_USERNAME_OPT)
   parser.add_option(SRC_RAPI_PORT_OPT)
   parser.add_option(SRC_CA_FILE_OPT)
   parser.add_option(SRC_USERNAME_OPT)
@@ -769,13 +810,24 @@ def CheckOptions(parser, options, args):
             options.dest_primary_node or
             options.dest_secondary_node):
       parser.error("An iallocator or the destination node is required")
             options.dest_primary_node or
             options.dest_secondary_node):
       parser.error("An iallocator or the destination node is required")
+
+    if options.hvparams:
+      utils.ForceDictType(options.hvparams, constants.HVS_PARAMETER_TYPES)
+
+    if options.beparams:
+      utils.ForceDictType(options.beparams, constants.BES_PARAMETER_TYPES)
+
+    if options.nics:
+      options.nics = cli.ParseNicOption(options.nics)
   else:
     # Moving more than one instance
     if (options.dest_instance_name or options.dest_primary_node or
   else:
     # Moving more than one instance
     if (options.dest_instance_name or options.dest_primary_node or
-        options.dest_secondary_node):
-      parser.error("The options --dest-instance-name, --dest-primary-node and"
-                   " --dest-secondary-node can only be used when moving exactly"
-                   " one instance")
+        options.dest_secondary_node or options.hvparams or
+        options.beparams or options.osparams or options.nics):
+      parser.error("The options --dest-instance-name, --dest-primary-node,"
+                   " --dest-secondary-node, --hypervisor-parameters,"
+                   " --backend-parameters, --os-parameters and --net can"
+                   " only be used when moving exactly one instance")
 
     if not options.iallocator:
       parser.error("An iallocator must be specified for moving more than one"
 
     if not options.iallocator:
       parser.error("An iallocator must be specified for moving more than one"
@@ -784,6 +836,7 @@ def CheckOptions(parser, options, args):
   return (src_cluster_name, dest_cluster_name, instance_names)
 
 
   return (src_cluster_name, dest_cluster_name, instance_names)
 
 
+@rapi.client.UsesRapiClient
 def main():
   """Main routine.
 
 def main():
   """Main routine.
 
@@ -808,6 +861,9 @@ def main():
   assert len(instance_names) == 1 or options.iallocator
   assert (len(instance_names) > 1 or options.iallocator or
           options.dest_primary_node or options.dest_secondary_node)
   assert len(instance_names) == 1 or options.iallocator
   assert (len(instance_names) > 1 or options.iallocator or
           options.dest_primary_node or options.dest_secondary_node)
+  assert (len(instance_names) == 1 or
+          not (options.hvparams or options.beparams or options.osparams or
+               options.nics))
 
   # Prepare list of instance moves
   moves = []
 
   # Prepare list of instance moves
   moves = []
@@ -822,7 +878,9 @@ def main():
     moves.append(InstanceMove(src_instance_name, dest_instance_name,
                               options.dest_primary_node,
                               options.dest_secondary_node,
     moves.append(InstanceMove(src_instance_name, dest_instance_name,
                               options.dest_primary_node,
                               options.dest_secondary_node,
-                              options.iallocator))
+                              options.iallocator, options.hvparams,
+                              options.beparams, options.osparams,
+                              options.nics))
 
   assert len(moves) == len(instance_names)
 
 
   assert len(moves) == len(instance_names)
 
@@ -831,7 +889,7 @@ def main():
   try:
     # Add instance moves to workerpool
     for move in moves:
   try:
     # Add instance moves to workerpool
     for move in moves:
-      wp.AddTask(rapi_factory, move)
+      wp.AddTask((rapi_factory, move))
 
     # Wait for all moves to finish
     wp.Quiesce()
 
     # Wait for all moves to finish
     wp.Quiesce()
@@ -850,17 +908,17 @@ def main():
     else:
       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
 
     else:
       name = "%s as %s" % (move.src_instance_name, move.dest_instance_name)
 
-    if move.success and not move.error_message:
-      msg = "Success"
-    else:
+    if move.error_message:
       msg = "Failed (%s)" % move.error_message
       msg = "Failed (%s)" % move.error_message
+    else:
+      msg = "Success"
 
     logging.info("%s: %s", name, msg)
 
 
     logging.info("%s: %s", name, msg)
 
-  if compat.all(move.success for move in moves):
-    sys.exit(constants.EXIT_SUCCESS)
+  if compat.any(move.error_message for move in moves):
+    sys.exit(constants.EXIT_FAILURE)
 
 
-  sys.exit(constants.EXIT_FAILURE)
+  sys.exit(constants.EXIT_SUCCESS)
 
 
 if __name__ == "__main__":
 
 
 if __name__ == "__main__":