4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50 action="store", type="int",
52 help=("Amount of time in seconds watcher"
53 " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55 choices=(_GROUPS_MERGE, _GROUPS_RENAME), dest="groups",
56 help=("How to handle groups that have the"
57 " same name (One of: %s/%s)" %
58 (_GROUPS_MERGE, _GROUPS_RENAME)))
61 def Flatten(unflattened_list):
64 @param unflattened_list: A list of unflattened list objects.
65 @return: A flattened list
70 for item in unflattened_list:
71 if isinstance(item, list):
72 flattened_list.extend(Flatten(item))
74 flattened_list.append(item)
78 class MergerData(object):
79 """Container class to hold data used for merger.
82 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
83 """Initialize the container.
85 @param cluster: The name of the cluster
86 @param key_path: Path to the ssh private key used for authentication
87 @param nodes: List of nodes in the merging cluster
88 @param instances: List of instances running on merging cluster
89 @param config_path: Path to the merging cluster config
92 self.cluster = cluster
93 self.key_path = key_path
95 self.instances = instances
96 self.config_path = config_path
100 """Handling the merge.
103 def __init__(self, clusters, pause_period, groups):
104 """Initialize object with sane defaults and infos required.
106 @param clusters: The list of clusters to merge in
107 @param pause_period: The time watcher shall be disabled for
108 @param groups: How to handle group conflicts
111 self.merger_data = []
112 self.clusters = clusters
113 self.pause_period = pause_period
114 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
115 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
116 self.ssh_runner = ssh.SshRunner(self.cluster_name)
120 """Sets up our end so we can do the merger.
122 This method is setting us up as a preparation for the merger.
123 It makes the initial contact and gathers information needed.
125 @raise errors.RemoteError: for errors in communication/grabbing
128 (remote_path, _, _) = ssh.GetUserFiles("root")
130 if self.cluster_name in self.clusters:
131 raise errors.CommandError("Cannot merge cluster %s with itself" %
134 # Fetch remotes private key
135 for cluster in self.clusters:
136 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
139 raise errors.RemoteError("There was an error while grabbing ssh private"
140 " key from %s. Fail reason: %s; output: %s" %
141 (cluster, result.fail_reason, result.output))
143 key_path = utils.PathJoin(self.work_dir, cluster)
144 utils.WriteFile(key_path, mode=0600, data=result.stdout)
146 result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
147 private_key=key_path)
149 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
150 " Fail reason: %s; output: %s" %
151 (cluster, result.fail_reason, result.output))
152 nodes = result.stdout.splitlines()
154 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
155 private_key=key_path)
157 raise errors.RemoteError("Unable to retrieve list of instances from"
158 " %s. Fail reason: %s; output: %s" %
159 (cluster, result.fail_reason, result.output))
160 instances = result.stdout.splitlines()
162 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
164 def _PrepareAuthorizedKeys(self):
165 """Prepare the authorized_keys on every merging node.
167 This method add our public key to remotes authorized_key for further
171 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
172 pub_key = utils.ReadFile(pub_key_file)
174 for data in self.merger_data:
175 for node in data.nodes:
176 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
177 (auth_keys, pub_key)),
178 private_key=data.key_path)
181 raise errors.RemoteError("Unable to add our public key to %s in %s."
182 " Fail reason: %s; output: %s" %
183 (node, data.cluster, result.fail_reason,
186 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
187 strict_host_check=False, private_key=None, batch=True,
189 """Wrapping SshRunner.Run with default parameters.
191 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
194 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
195 use_cluster_key=use_cluster_key,
196 strict_host_check=strict_host_check,
197 private_key=private_key, batch=batch,
200 def _StopMergingInstances(self):
201 """Stop instances on merging clusters.
204 for cluster in self.clusters:
205 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
209 raise errors.RemoteError("Unable to stop instances on %s."
210 " Fail reason: %s; output: %s" %
211 (cluster, result.fail_reason, result.output))
213 def _DisableWatcher(self):
214 """Disable watch on all merging clusters, including ourself.
217 for cluster in ["localhost"] + self.clusters:
218 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
222 raise errors.RemoteError("Unable to pause watcher on %s."
223 " Fail reason: %s; output: %s" %
224 (cluster, result.fail_reason, result.output))
226 def _StopDaemons(self):
227 """Stop all daemons on merging nodes.
230 cmd = "%s stop-all" % constants.DAEMON_UTIL
231 for data in self.merger_data:
232 for node in data.nodes:
233 result = self._RunCmd(node, cmd)
236 raise errors.RemoteError("Unable to stop daemons on %s."
237 " Fail reason: %s; output: %s." %
238 (node, result.fail_reason, result.output))
240 def _FetchRemoteConfig(self):
241 """Fetches and stores remote cluster config from the master.
243 This step is needed before we can merge the config.
246 for data in self.merger_data:
247 result = self._RunCmd(data.cluster, "cat %s" %
248 constants.CLUSTER_CONF_FILE)
251 raise errors.RemoteError("Unable to retrieve remote config on %s."
252 " Fail reason: %s; output %s" %
253 (data.cluster, result.fail_reason,
256 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258 utils.WriteFile(data.config_path, data=result.stdout)
260 # R0201: Method could be a function
261 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
262 """Kills the local master daemon.
264 @raise errors.CommandError: If unable to kill
267 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269 raise errors.CommandError("Unable to stop master daemons."
270 " Fail reason: %s; output: %s" %
271 (result.fail_reason, result.output))
273 def _MergeConfig(self):
274 """Merges all foreign config into our own config.
277 my_config = config.ConfigWriter(offline=True)
278 fake_ec_id = 0 # Needs to be uniq over the whole config merge
280 for data in self.merger_data:
281 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
282 self._MergeNodeGroups(my_config, other_config)
284 for node in other_config.GetNodeList():
285 node_info = other_config.GetNodeInfo(node)
286 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
289 for instance in other_config.GetInstanceList():
290 instance_info = other_config.GetInstanceInfo(instance)
292 # Update the DRBD port assignments
293 # This is a little bit hackish
294 for dsk in instance_info.disks:
295 if dsk.dev_type in constants.LDS_DRBD:
296 port = my_config.AllocatePort()
298 logical_id = list(dsk.logical_id)
300 dsk.logical_id = tuple(logical_id)
302 physical_id = list(dsk.physical_id)
303 physical_id[1] = physical_id[3] = port
304 dsk.physical_id = tuple(physical_id)
306 my_config.AddInstance(instance_info,
307 _CLUSTERMERGE_ECID + str(fake_ec_id))
310 # R0201: Method could be a function
311 def _MergeNodeGroups(self, my_config, other_config):
312 """Adds foreign node groups
314 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
316 # pylint: disable-msg=R0201
317 logging.info("Node group conflict strategy: %s" % self.groups)
319 my_grps = my_config.GetAllNodeGroupsInfo().values()
320 other_grps = other_config.GetAllNodeGroupsInfo().values()
322 # Check for node group naming conflicts:
324 for other_grp in other_grps:
325 for my_grp in my_grps:
326 if other_grp.name == my_grp.name:
327 conflicts.append(other_grp)
330 conflict_names = utils.CommaJoin([g.name for g in conflicts])
331 logging.info("Node groups in both local and remote cluster: %s" %
334 # User hasn't specified how to handle conflicts
336 raise errors.CommandError("The following node group(s) are in both"
337 " clusters, and no merge strategy has been"
338 " supplied (see the --groups option): %s" %
341 # User wants to rename conflicts
342 elif self.groups == _GROUPS_RENAME:
343 for grp in conflicts:
344 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
345 logging.info("Renaming remote node group from %s to %s"
346 " to resolve conflict" % (grp.name, new_name))
349 # User wants to merge conflicting groups
350 elif self.groups == 'merge':
351 for other_grp in conflicts:
352 logging.info("Merging local and remote '%s' groups" % other_grp.name)
353 for node_name in other_grp.members[:]:
354 node = other_config.GetNodeInfo(node_name)
355 other_config._UnlockedRemoveNodeFromGroup(node)
357 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
358 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
359 node.group = my_grp_uuid
360 # Remove from list of groups to add
361 other_grps.remove(other_grp)
363 for grp in other_grps:
364 #TODO: handle node group conflicts
365 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
367 # R0201: Method could be a function
368 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
369 """Starts the local master daemon.
371 @param no_vote: Should the masterd started without voting? default: False
372 @raise errors.CommandError: If unable to start daemon.
377 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
379 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
381 raise errors.CommandError("Couldn't start ganeti master."
382 " Fail reason: %s; output: %s" %
383 (result.fail_reason, result.output))
385 def _ReaddMergedNodesAndRedist(self):
386 """Readds all merging nodes and make sure their config is up-to-date.
388 @raise errors.CommandError: If anything fails.
391 for data in self.merger_data:
392 for node in data.nodes:
393 result = utils.RunCmd(["gnt-node", "add", "--readd",
394 "--no-ssh-key-check", "--force-join", node])
396 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
397 " output: %s" % (node, result.fail_reason,
400 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
402 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
403 " output: %s" % (result.fail_reason,
406 # R0201: Method could be a function
407 def _StartupAllInstances(self): # pylint: disable-msg=R0201
408 """Starts up all instances (locally).
410 @raise errors.CommandError: If unable to start clusters
413 result = utils.RunCmd(["gnt-instance", "startup", "--all",
416 raise errors.CommandError("Unable to start all instances."
417 " Fail reason: %s; output: %s" %
418 (result.fail_reason, result.output))
420 # R0201: Method could be a function
421 def _VerifyCluster(self): # pylint: disable-msg=R0201
422 """Runs gnt-cluster verify to verify the health.
424 @raise errors.ProgrammError: If cluster fails on verification
427 result = utils.RunCmd(["gnt-cluster", "verify"])
429 raise errors.CommandError("Verification of cluster failed."
430 " Fail reason: %s; output: %s" %
431 (result.fail_reason, result.output))
434 """Does the actual merge.
436 It runs all the steps in the right order and updates the user about steps
437 taken. Also it keeps track of rollback_steps to undo everything.
442 logging.info("Pre cluster verification")
443 self._VerifyCluster()
445 logging.info("Prepare authorized_keys")
446 rbsteps.append("Remove our key from authorized_keys on nodes:"
448 self._PrepareAuthorizedKeys()
450 rbsteps.append("Start all instances again on the merging"
451 " clusters: %(clusters)s")
452 logging.info("Stopping merging instances (takes a while)")
453 self._StopMergingInstances()
455 logging.info("Disable watcher")
456 self._DisableWatcher()
457 logging.info("Stop daemons on merging nodes")
459 logging.info("Merging config")
460 self._FetchRemoteConfig()
462 logging.info("Stopping master daemon")
463 self._KillMasterDaemon()
465 rbsteps.append("Restore %s from another master candidate"
466 " and restart master daemon" %
467 constants.CLUSTER_CONF_FILE)
469 self._StartMasterDaemon(no_vote=True)
471 # Point of no return, delete rbsteps
474 logging.warning("We are at the point of no return. Merge can not easily"
475 " be undone after this point.")
476 logging.info("Readd nodes")
477 self._ReaddMergedNodesAndRedist()
479 logging.info("Merge done, restart master daemon normally")
480 self._KillMasterDaemon()
481 self._StartMasterDaemon()
483 logging.info("Starting instances again")
484 self._StartupAllInstances()
485 logging.info("Post cluster verification")
486 self._VerifyCluster()
487 except errors.GenericError, e:
491 nodes = Flatten([data.nodes for data in self.merger_data])
493 "clusters": self.clusters,
496 logging.critical("In order to rollback do the following:")
498 logging.critical(" * %s", step % info)
500 logging.critical("Nothing to rollback.")
502 # TODO: Keep track of steps done for a flawless resume?
505 """Clean up our environment.
507 This cleans up remote private keys and configs and after that
508 deletes the temporary directory.
511 shutil.rmtree(self.work_dir)
514 def SetupLogging(options):
515 """Setting up logging infrastructure.
517 @param options: Parsed command line options
520 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
522 stderr_handler = logging.StreamHandler()
523 stderr_handler.setFormatter(formatter)
525 stderr_handler.setLevel(logging.NOTSET)
526 elif options.verbose:
527 stderr_handler.setLevel(logging.INFO)
529 stderr_handler.setLevel(logging.ERROR)
531 root_logger = logging.getLogger("")
532 root_logger.setLevel(logging.NOTSET)
533 root_logger.addHandler(stderr_handler)
540 program = os.path.basename(sys.argv[0])
542 parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
543 " [--watcher-pause-period SECONDS]"
544 " [--groups [%s|%s]]"
545 " <cluster> [<cluster...>]" %
546 (_GROUPS_MERGE, _GROUPS_RENAME)),
548 parser.add_option(cli.DEBUG_OPT)
549 parser.add_option(cli.VERBOSE_OPT)
550 parser.add_option(PAUSE_PERIOD_OPT)
551 parser.add_option(GROUPS_OPT)
553 (options, args) = parser.parse_args()
555 SetupLogging(options)
558 parser.error("No clusters specified")
560 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
564 cluster_merger.Setup()
565 cluster_merger.Merge()
566 except errors.GenericError, e:
568 return constants.EXIT_FAILURE
570 cluster_merger.Cleanup()
572 return constants.EXIT_SUCCESS
575 if __name__ == "__main__":