4 # Copyright (C) 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Tool to merge two or more clusters together.
23 The clusters have to run the same version of Ganeti!
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50 action="store", type="int",
52 help=("Amount of time in seconds watcher"
53 " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55 choices=(_GROUPS_MERGE, _GROUPS_RENAME),
57 help=("How to handle groups that have the"
58 " same name (One of: %s/%s)" %
59 (_GROUPS_MERGE, _GROUPS_RENAME)))
62 def Flatten(unflattened_list):
65 @param unflattened_list: A list of unflattened list objects.
66 @return: A flattened list
71 for item in unflattened_list:
72 if isinstance(item, list):
73 flattened_list.extend(Flatten(item))
75 flattened_list.append(item)
79 class MergerData(object):
80 """Container class to hold data used for merger.
83 def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84 """Initialize the container.
86 @param cluster: The name of the cluster
87 @param key_path: Path to the ssh private key used for authentication
88 @param nodes: List of nodes in the merging cluster
89 @param instances: List of instances running on merging cluster
90 @param config_path: Path to the merging cluster config
93 self.cluster = cluster
94 self.key_path = key_path
96 self.instances = instances
97 self.config_path = config_path
100 class Merger(object):
101 """Handling the merge.
104 def __init__(self, clusters, pause_period, groups):
105 """Initialize object with sane defaults and infos required.
107 @param clusters: The list of clusters to merge in
108 @param pause_period: The time watcher shall be disabled for
109 @param groups: How to handle group conflicts
112 self.merger_data = []
113 self.clusters = clusters
114 self.pause_period = pause_period
115 self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116 (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117 self.ssh_runner = ssh.SshRunner(self.cluster_name)
121 """Sets up our end so we can do the merger.
123 This method is setting us up as a preparation for the merger.
124 It makes the initial contact and gathers information needed.
126 @raise errors.RemoteError: for errors in communication/grabbing
129 (remote_path, _, _) = ssh.GetUserFiles("root")
131 if self.cluster_name in self.clusters:
132 raise errors.CommandError("Cannot merge cluster %s with itself" %
135 # Fetch remotes private key
136 for cluster in self.clusters:
137 result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
140 raise errors.RemoteError("There was an error while grabbing ssh private"
141 " key from %s. Fail reason: %s; output: %s" %
142 (cluster, result.fail_reason, result.output))
144 key_path = utils.PathJoin(self.work_dir, cluster)
145 utils.WriteFile(key_path, mode=0600, data=result.stdout)
147 result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
148 private_key=key_path)
150 raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151 " Fail reason: %s; output: %s" %
152 (cluster, result.fail_reason, result.output))
153 nodes = result.stdout.splitlines()
155 result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
156 private_key=key_path)
158 raise errors.RemoteError("Unable to retrieve list of instances from"
159 " %s. Fail reason: %s; output: %s" %
160 (cluster, result.fail_reason, result.output))
161 instances = result.stdout.splitlines()
163 self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
165 def _PrepareAuthorizedKeys(self):
166 """Prepare the authorized_keys on every merging node.
168 This method add our public key to remotes authorized_key for further
172 (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
173 pub_key = utils.ReadFile(pub_key_file)
175 for data in self.merger_data:
176 for node in data.nodes:
177 result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
178 (auth_keys, pub_key)),
179 private_key=data.key_path)
182 raise errors.RemoteError("Unable to add our public key to %s in %s."
183 " Fail reason: %s; output: %s" %
184 (node, data.cluster, result.fail_reason,
187 def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
188 strict_host_check=False, private_key=None, batch=True,
190 """Wrapping SshRunner.Run with default parameters.
192 For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
195 return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
196 use_cluster_key=use_cluster_key,
197 strict_host_check=strict_host_check,
198 private_key=private_key, batch=batch,
201 def _StopMergingInstances(self):
202 """Stop instances on merging clusters.
205 for cluster in self.clusters:
206 result = self._RunCmd(cluster, "gnt-instance shutdown --all"
210 raise errors.RemoteError("Unable to stop instances on %s."
211 " Fail reason: %s; output: %s" %
212 (cluster, result.fail_reason, result.output))
214 def _DisableWatcher(self):
215 """Disable watch on all merging clusters, including ourself.
218 for cluster in ["localhost"] + self.clusters:
219 result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
223 raise errors.RemoteError("Unable to pause watcher on %s."
224 " Fail reason: %s; output: %s" %
225 (cluster, result.fail_reason, result.output))
227 def _StopDaemons(self):
228 """Stop all daemons on merging nodes.
231 cmd = "%s stop-all" % constants.DAEMON_UTIL
232 for data in self.merger_data:
233 for node in data.nodes:
234 result = self._RunCmd(node, cmd)
237 raise errors.RemoteError("Unable to stop daemons on %s."
238 " Fail reason: %s; output: %s." %
239 (node, result.fail_reason, result.output))
241 def _FetchRemoteConfig(self):
242 """Fetches and stores remote cluster config from the master.
244 This step is needed before we can merge the config.
247 for data in self.merger_data:
248 result = self._RunCmd(data.cluster, "cat %s" %
249 constants.CLUSTER_CONF_FILE)
252 raise errors.RemoteError("Unable to retrieve remote config on %s."
253 " Fail reason: %s; output %s" %
254 (data.cluster, result.fail_reason,
257 data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
259 utils.WriteFile(data.config_path, data=result.stdout)
261 # R0201: Method could be a function
262 def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263 """Kills the local master daemon.
265 @raise errors.CommandError: If unable to kill
268 result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
270 raise errors.CommandError("Unable to stop master daemons."
271 " Fail reason: %s; output: %s" %
272 (result.fail_reason, result.output))
274 def _MergeConfig(self):
275 """Merges all foreign config into our own config.
278 my_config = config.ConfigWriter(offline=True)
279 fake_ec_id = 0 # Needs to be uniq over the whole config merge
281 for data in self.merger_data:
282 other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
283 self._MergeNodeGroups(my_config, other_config)
285 for node in other_config.GetNodeList():
286 node_info = other_config.GetNodeInfo(node)
287 my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
290 for instance in other_config.GetInstanceList():
291 instance_info = other_config.GetInstanceInfo(instance)
293 # Update the DRBD port assignments
294 # This is a little bit hackish
295 for dsk in instance_info.disks:
296 if dsk.dev_type in constants.LDS_DRBD:
297 port = my_config.AllocatePort()
299 logical_id = list(dsk.logical_id)
301 dsk.logical_id = tuple(logical_id)
303 physical_id = list(dsk.physical_id)
304 physical_id[1] = physical_id[3] = port
305 dsk.physical_id = tuple(physical_id)
307 my_config.AddInstance(instance_info,
308 _CLUSTERMERGE_ECID + str(fake_ec_id))
311 # R0201: Method could be a function
312 def _MergeNodeGroups(self, my_config, other_config):
313 """Adds foreign node groups
315 ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
317 # pylint: disable-msg=R0201
318 logging.info("Node group conflict strategy: %s", self.groups)
320 my_grps = my_config.GetAllNodeGroupsInfo().values()
321 other_grps = other_config.GetAllNodeGroupsInfo().values()
323 # Check for node group naming conflicts:
325 for other_grp in other_grps:
326 for my_grp in my_grps:
327 if other_grp.name == my_grp.name:
328 conflicts.append(other_grp)
331 conflict_names = utils.CommaJoin([g.name for g in conflicts])
332 logging.info("Node groups in both local and remote cluster: %s",
335 # User hasn't specified how to handle conflicts
337 raise errors.CommandError("The following node group(s) are in both"
338 " clusters, and no merge strategy has been"
339 " supplied (see the --groups option): %s" %
342 # User wants to rename conflicts
343 elif self.groups == _GROUPS_RENAME:
344 for grp in conflicts:
345 new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
346 logging.info("Renaming remote node group from %s to %s"
347 " to resolve conflict", grp.name, new_name)
350 # User wants to merge conflicting groups
351 elif self.groups == 'merge':
352 for other_grp in conflicts:
353 logging.info("Merging local and remote '%s' groups", other_grp.name)
354 for node_name in other_grp.members[:]:
355 node = other_config.GetNodeInfo(node_name)
356 # Access to a protected member of a client class
357 # pylint: disable-msg=W0212
358 other_config._UnlockedRemoveNodeFromGroup(node)
360 # Access to a protected member of a client class
361 # pylint: disable-msg=W0212
362 my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
364 # Access to a protected member of a client class
365 # pylint: disable-msg=W0212
366 my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
367 node.group = my_grp_uuid
368 # Remove from list of groups to add
369 other_grps.remove(other_grp)
371 for grp in other_grps:
372 #TODO: handle node group conflicts
373 my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
375 # R0201: Method could be a function
376 def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
377 """Starts the local master daemon.
379 @param no_vote: Should the masterd started without voting? default: False
380 @raise errors.CommandError: If unable to start daemon.
385 env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
387 result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
389 raise errors.CommandError("Couldn't start ganeti master."
390 " Fail reason: %s; output: %s" %
391 (result.fail_reason, result.output))
393 def _ReaddMergedNodesAndRedist(self):
394 """Readds all merging nodes and make sure their config is up-to-date.
396 @raise errors.CommandError: If anything fails.
399 for data in self.merger_data:
400 for node in data.nodes:
401 result = utils.RunCmd(["gnt-node", "add", "--readd",
402 "--no-ssh-key-check", "--force-join", node])
404 raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
405 " output: %s" % (node, result.fail_reason,
408 result = utils.RunCmd(["gnt-cluster", "redist-conf"])
410 raise errors.CommandError("Redistribution failed. Fail reason: %s;"
411 " output: %s" % (result.fail_reason,
414 # R0201: Method could be a function
415 def _StartupAllInstances(self): # pylint: disable-msg=R0201
416 """Starts up all instances (locally).
418 @raise errors.CommandError: If unable to start clusters
421 result = utils.RunCmd(["gnt-instance", "startup", "--all",
424 raise errors.CommandError("Unable to start all instances."
425 " Fail reason: %s; output: %s" %
426 (result.fail_reason, result.output))
428 # R0201: Method could be a function
429 def _VerifyCluster(self): # pylint: disable-msg=R0201
430 """Runs gnt-cluster verify to verify the health.
432 @raise errors.ProgrammError: If cluster fails on verification
435 result = utils.RunCmd(["gnt-cluster", "verify"])
437 raise errors.CommandError("Verification of cluster failed."
438 " Fail reason: %s; output: %s" %
439 (result.fail_reason, result.output))
442 """Does the actual merge.
444 It runs all the steps in the right order and updates the user about steps
445 taken. Also it keeps track of rollback_steps to undo everything.
450 logging.info("Pre cluster verification")
451 self._VerifyCluster()
453 logging.info("Prepare authorized_keys")
454 rbsteps.append("Remove our key from authorized_keys on nodes:"
456 self._PrepareAuthorizedKeys()
458 rbsteps.append("Start all instances again on the merging"
459 " clusters: %(clusters)s")
460 logging.info("Stopping merging instances (takes a while)")
461 self._StopMergingInstances()
463 logging.info("Disable watcher")
464 self._DisableWatcher()
465 logging.info("Stop daemons on merging nodes")
467 logging.info("Merging config")
468 self._FetchRemoteConfig()
470 logging.info("Stopping master daemon")
471 self._KillMasterDaemon()
473 rbsteps.append("Restore %s from another master candidate"
474 " and restart master daemon" %
475 constants.CLUSTER_CONF_FILE)
477 self._StartMasterDaemon(no_vote=True)
479 # Point of no return, delete rbsteps
482 logging.warning("We are at the point of no return. Merge can not easily"
483 " be undone after this point.")
484 logging.info("Readd nodes")
485 self._ReaddMergedNodesAndRedist()
487 logging.info("Merge done, restart master daemon normally")
488 self._KillMasterDaemon()
489 self._StartMasterDaemon()
491 logging.info("Starting instances again")
492 self._StartupAllInstances()
493 logging.info("Post cluster verification")
494 self._VerifyCluster()
495 except errors.GenericError, e:
499 nodes = Flatten([data.nodes for data in self.merger_data])
501 "clusters": self.clusters,
504 logging.critical("In order to rollback do the following:")
506 logging.critical(" * %s", step % info)
508 logging.critical("Nothing to rollback.")
510 # TODO: Keep track of steps done for a flawless resume?
513 """Clean up our environment.
515 This cleans up remote private keys and configs and after that
516 deletes the temporary directory.
519 shutil.rmtree(self.work_dir)
522 def SetupLogging(options):
523 """Setting up logging infrastructure.
525 @param options: Parsed command line options
528 formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
530 stderr_handler = logging.StreamHandler()
531 stderr_handler.setFormatter(formatter)
533 stderr_handler.setLevel(logging.NOTSET)
534 elif options.verbose:
535 stderr_handler.setLevel(logging.INFO)
537 stderr_handler.setLevel(logging.ERROR)
539 root_logger = logging.getLogger("")
540 root_logger.setLevel(logging.NOTSET)
541 root_logger.addHandler(stderr_handler)
548 program = os.path.basename(sys.argv[0])
550 parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
551 " [--watcher-pause-period SECONDS]"
552 " [--groups [%s|%s]]"
553 " <cluster> [<cluster...>]" %
554 (_GROUPS_MERGE, _GROUPS_RENAME)),
556 parser.add_option(cli.DEBUG_OPT)
557 parser.add_option(cli.VERBOSE_OPT)
558 parser.add_option(PAUSE_PERIOD_OPT)
559 parser.add_option(GROUPS_OPT)
561 (options, args) = parser.parse_args()
563 SetupLogging(options)
566 parser.error("No clusters specified")
568 cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
572 cluster_merger.Setup()
573 cluster_merger.Merge()
574 except errors.GenericError, e:
576 return constants.EXIT_FAILURE
578 cluster_merger.Cleanup()
580 return constants.EXIT_SUCCESS
583 if __name__ == "__main__":