htools: install hail in the iallocator dir
[ganeti-local] / tools / cluster-merge
old mode 100644 (file)
new mode 100755 (executable)
index b31638a..21750f9
@@ -42,28 +42,38 @@ from ganeti import ssh
 from ganeti import utils
 
 
+_GROUPS_MERGE = "merge"
+_GROUPS_RENAME = "rename"
+_CLUSTERMERGE_ECID = "clustermerge-ecid"
+
 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
                                   action="store", type="int",
                                   dest="pause_period",
                                   help=("Amount of time in seconds watcher"
                                         " should be suspended from running"))
+GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
+                            choices=(_GROUPS_MERGE, _GROUPS_RENAME),
+                            dest="groups",
+                            help=("How to handle groups that have the"
+                                  " same name (One of: %s/%s)" %
+                                  (_GROUPS_MERGE, _GROUPS_RENAME)))
 
 
-def Flatten(unflatten_list):
+def Flatten(unflattened_list):
   """Flattens a list.
 
-  @param unflatten_list: A list of unflatten list objects.
-  @return: A flatten list
+  @param unflattened_list: A list of unflattened list objects.
+  @return: A flattened list
 
   """
-  flatten_list = []
+  flattened_list = []
 
-  for item in unflatten_list:
+  for item in unflattened_list:
     if isinstance(item, list):
-      flatten_list.extend(Flatten(item))
+      flattened_list.extend(Flatten(item))
     else:
-      flatten_list.append(item)
-  return flatten_list
+      flattened_list.append(item)
+  return flattened_list
 
 
 class MergerData(object):
@@ -75,35 +85,37 @@ class MergerData(object):
 
     @param cluster: The name of the cluster
     @param key_path: Path to the ssh private key used for authentication
-    @param config_path: Path to the merging cluster config
     @param nodes: List of nodes in the merging cluster
     @param instances: List of instances running on merging cluster
+    @param config_path: Path to the merging cluster config
 
     """
     self.cluster = cluster
     self.key_path = key_path
-    self.config_path = config_path
-    self.instances = instances
     self.nodes = nodes
+    self.instances = instances
+    self.config_path = config_path
 
 
 class Merger(object):
   """Handling the merge.
 
   """
-  def __init__(self, clusters, pause_period):
+  def __init__(self, clusters, pause_period, groups):
     """Initialize object with sane defaults and infos required.
 
     @param clusters: The list of clusters to merge in
     @param pause_period: The time watcher shall be disabled for
+    @param groups: How to handle group conflicts
 
     """
     self.merger_data = []
     self.clusters = clusters
     self.pause_period = pause_period
     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
-    self.cluster_name = cli.GetClient().QueryConfigValues(["cluster_name"])
+    (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
     self.ssh_runner = ssh.SshRunner(self.cluster_name)
+    self.groups = groups
 
   def Setup(self):
     """Sets up our end so we can do the merger.
@@ -116,6 +128,10 @@ class Merger(object):
     """
     (remote_path, _, _) = ssh.GetUserFiles("root")
 
+    if self.cluster_name in self.clusters:
+      raise errors.CommandError("Cannot merge cluster %s with itself" %
+                                self.cluster_name)
+
     # Fetch remotes private key
     for cluster in self.clusters:
       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
@@ -125,7 +141,7 @@ class Merger(object):
                                  " key from %s. Fail reason: %s; output: %s" %
                                  (cluster, result.fail_reason, result.output))
 
-      key_path = os.path.join(self.work_dir, cluster)
+      key_path = utils.PathJoin(self.work_dir, cluster)
       utils.WriteFile(key_path, mode=0600, data=result.stdout)
 
       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
@@ -173,7 +189,7 @@ class Merger(object):
               ask_key=False):
     """Wrapping SshRunner.Run with default parameters.
 
-    For explanation of parameters see L{ssh.SshRunner.Run}.
+    For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
 
     """
     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
@@ -208,30 +224,14 @@ class Merger(object):
                                  " Fail reason: %s; output: %s" %
                                  (cluster, result.fail_reason, result.output))
 
-
-  # R0201: Method could be a function
-  def _EnableWatcher(self): # pylint: disable-msg=R0201
-    """Reenable watcher (locally).
-
-    """
-    result = utils.RunCmd(["gnt-cluster", "watcher", "continue"])
-
-    if result.failed:
-      logging.warning("Unable to continue watcher. Fail reason: %s;"
-                      " output: %s", result.fail_reason, result.output)
-
   def _StopDaemons(self):
     """Stop all daemons on merging nodes.
 
     """
-    # FIXME: Worth to put this into constants?
-    cmds = []
-    for daemon in (constants.RAPI, constants.MASTERD,
-                   constants.NODED, constants.CONFD):
-      cmds.append("%s stop %s" % (constants.DAEMON_UTIL, daemon))
+    cmd = "%s stop-all" % constants.DAEMON_UTIL
     for data in self.merger_data:
       for node in data.nodes:
-        result = self._RunCmd(node, " && ".join(cmds))
+        result = self._RunCmd(node, cmd)
 
         if result.failed:
           raise errors.RemoteError("Unable to stop daemons on %s."
@@ -254,8 +254,8 @@ class Merger(object):
                                  (data.cluster, result.fail_reason,
                                   result.output))
 
-      data.config_path = os.path.join(self.work_dir, "%s_config.data" %
-                                                     data.cluster)
+      data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
+                                        data.cluster)
       utils.WriteFile(data.config_path, data=result.stdout)
 
   # R0201: Method could be a function
@@ -279,12 +279,12 @@ class Merger(object):
     fake_ec_id = 0 # Needs to be uniq over the whole config merge
 
     for data in self.merger_data:
-      other_config = config.ConfigWriter(data.config_path)
+      other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
+      self._MergeNodeGroups(my_config, other_config)
 
       for node in other_config.GetNodeList():
         node_info = other_config.GetNodeInfo(node)
-        node_info.master_candidate = False
-        my_config.AddNode(node_info, str(fake_ec_id))
+        my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
         fake_ec_id += 1
 
       for instance in other_config.GetInstanceList():
@@ -304,10 +304,75 @@ class Merger(object):
             physical_id[1] = physical_id[3] = port
             dsk.physical_id = tuple(physical_id)
 
-        my_config.AddInstance(instance_info, str(fake_ec_id))
+        my_config.AddInstance(instance_info,
+                              _CLUSTERMERGE_ECID + str(fake_ec_id))
         fake_ec_id += 1
 
   # R0201: Method could be a function
+  def _MergeNodeGroups(self, my_config, other_config):
+    """Adds foreign node groups
+
+    ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
+    """
+    # pylint: disable-msg=R0201
+    logging.info("Node group conflict strategy: %s", self.groups)
+
+    my_grps = my_config.GetAllNodeGroupsInfo().values()
+    other_grps = other_config.GetAllNodeGroupsInfo().values()
+
+    # Check for node group naming conflicts:
+    conflicts = []
+    for other_grp in other_grps:
+      for my_grp in my_grps:
+        if other_grp.name == my_grp.name:
+          conflicts.append(other_grp)
+
+    if conflicts:
+      conflict_names = utils.CommaJoin([g.name for g in conflicts])
+      logging.info("Node groups in both local and remote cluster: %s",
+                   conflict_names)
+
+      # User hasn't specified how to handle conflicts
+      if not self.groups:
+        raise errors.CommandError("The following node group(s) are in both"
+                                  " clusters, and no merge strategy has been"
+                                  " supplied (see the --groups option): %s" %
+                                  conflict_names)
+
+      # User wants to rename conflicts
+      elif self.groups == _GROUPS_RENAME:
+        for grp in conflicts:
+          new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
+          logging.info("Renaming remote node group from %s to %s"
+                       " to resolve conflict", grp.name, new_name)
+          grp.name = new_name
+
+      # User wants to merge conflicting groups
+      elif self.groups == 'merge':
+        for other_grp in conflicts:
+          logging.info("Merging local and remote '%s' groups", other_grp.name)
+          for node_name in other_grp.members[:]:
+            node = other_config.GetNodeInfo(node_name)
+            # Access to a protected member of a client class
+            # pylint: disable-msg=W0212
+            other_config._UnlockedRemoveNodeFromGroup(node)
+
+            # Access to a protected member of a client class
+            # pylint: disable-msg=W0212
+            my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
+
+            # Access to a protected member of a client class
+            # pylint: disable-msg=W0212
+            my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
+            node.group = my_grp_uuid
+          # Remove from list of groups to add
+          other_grps.remove(other_grp)
+
+    for grp in other_grps:
+      #TODO: handle node group conflicts
+      my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
+
+  # R0201: Method could be a function
   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
     """Starts the local master daemon.
 
@@ -334,7 +399,7 @@ class Merger(object):
     for data in self.merger_data:
       for node in data.nodes:
         result = utils.RunCmd(["gnt-node", "add", "--readd",
-                               "--no-ssh-key-check", node])
+                               "--no-ssh-key-check", "--force-join", node])
         if result.failed:
           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
                                     " output: %s" % (node, result.fail_reason,
@@ -401,9 +466,12 @@ class Merger(object):
       self._StopDaemons()
       logging.info("Merging config")
       self._FetchRemoteConfig()
+
+      logging.info("Stopping master daemon")
       self._KillMasterDaemon()
 
-      rbsteps.append("Restore %s from another master candidate" %
+      rbsteps.append("Restore %s from another master candidate"
+                     " and restart master daemon" %
                      constants.CLUSTER_CONF_FILE)
       self._MergeConfig()
       self._StartMasterDaemon(no_vote=True)
@@ -413,10 +481,13 @@ class Merger(object):
 
       logging.warning("We are at the point of no return. Merge can not easily"
                       " be undone after this point.")
-      logging.info("Readd nodes and redistribute config")
+      logging.info("Readd nodes")
       self._ReaddMergedNodesAndRedist()
+
+      logging.info("Merge done, restart master daemon normally")
       self._KillMasterDaemon()
       self._StartMasterDaemon()
+
       logging.info("Starting instances again")
       self._StartupAllInstances()
       logging.info("Post cluster verification")
@@ -476,13 +547,16 @@ def main():
   """
   program = os.path.basename(sys.argv[0])
 
-  parser = optparse.OptionParser(usage=("%prog [--debug|--verbose]"
+  parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
                                         " [--watcher-pause-period SECONDS]"
-                                        " <cluster> <cluster...>"),
+                                        " [--groups [%s|%s]]"
+                                        " <cluster> [<cluster...>]" %
+                                        (_GROUPS_MERGE, _GROUPS_RENAME)),
                                         prog=program)
   parser.add_option(cli.DEBUG_OPT)
   parser.add_option(cli.VERBOSE_OPT)
   parser.add_option(PAUSE_PERIOD_OPT)
+  parser.add_option(GROUPS_OPT)
 
   (options, args) = parser.parse_args()
 
@@ -491,7 +565,8 @@ def main():
   if not args:
     parser.error("No clusters specified")
 
-  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period)
+  cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
+                          options.groups)
   try:
     try:
       cluster_merger.Setup()