Add support for merging node groups
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50                                   action="store", type="int",
51                                   dest="pause_period",
52                                   help=("Amount of time in seconds watcher"
53                                         " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55                             choices=(_GROUPS_MERGE, _GROUPS_RENAME), dest="groups",
56                             help=("How to handle groups that have the"
57                                   " same name (One of: %s/%s)" %
58                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
59
60
61 def Flatten(unflattened_list):
62   """Flattens a list.
63
64   @param unflattened_list: A list of unflattened list objects.
65   @return: A flattened list
66
67   """
68   flattened_list = []
69
70   for item in unflattened_list:
71     if isinstance(item, list):
72       flattened_list.extend(Flatten(item))
73     else:
74       flattened_list.append(item)
75   return flattened_list
76
77
78 class MergerData(object):
79   """Container class to hold data used for merger.
80
81   """
82   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
83     """Initialize the container.
84
85     @param cluster: The name of the cluster
86     @param key_path: Path to the ssh private key used for authentication
87     @param nodes: List of nodes in the merging cluster
88     @param instances: List of instances running on merging cluster
89     @param config_path: Path to the merging cluster config
90
91     """
92     self.cluster = cluster
93     self.key_path = key_path
94     self.nodes = nodes
95     self.instances = instances
96     self.config_path = config_path
97
98
99 class Merger(object):
100   """Handling the merge.
101
102   """
103   def __init__(self, clusters, pause_period, groups):
104     """Initialize object with sane defaults and infos required.
105
106     @param clusters: The list of clusters to merge in
107     @param pause_period: The time watcher shall be disabled for
108     @param groups: How to handle group conflicts
109
110     """
111     self.merger_data = []
112     self.clusters = clusters
113     self.pause_period = pause_period
114     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
115     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
116     self.ssh_runner = ssh.SshRunner(self.cluster_name)
117     self.groups = groups
118
119   def Setup(self):
120     """Sets up our end so we can do the merger.
121
122     This method is setting us up as a preparation for the merger.
123     It makes the initial contact and gathers information needed.
124
125     @raise errors.RemoteError: for errors in communication/grabbing
126
127     """
128     (remote_path, _, _) = ssh.GetUserFiles("root")
129
130     if self.cluster_name in self.clusters:
131       raise errors.CommandError("Cannot merge cluster %s with itself" %
132                                 self.cluster_name)
133
134     # Fetch remotes private key
135     for cluster in self.clusters:
136       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
137                             ask_key=False)
138       if result.failed:
139         raise errors.RemoteError("There was an error while grabbing ssh private"
140                                  " key from %s. Fail reason: %s; output: %s" %
141                                  (cluster, result.fail_reason, result.output))
142
143       key_path = utils.PathJoin(self.work_dir, cluster)
144       utils.WriteFile(key_path, mode=0600, data=result.stdout)
145
146       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
147                             private_key=key_path)
148       if result.failed:
149         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
150                                  " Fail reason: %s; output: %s" %
151                                  (cluster, result.fail_reason, result.output))
152       nodes = result.stdout.splitlines()
153
154       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
155                             private_key=key_path)
156       if result.failed:
157         raise errors.RemoteError("Unable to retrieve list of instances from"
158                                  " %s. Fail reason: %s; output: %s" %
159                                  (cluster, result.fail_reason, result.output))
160       instances = result.stdout.splitlines()
161
162       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
163
164   def _PrepareAuthorizedKeys(self):
165     """Prepare the authorized_keys on every merging node.
166
167     This method add our public key to remotes authorized_key for further
168     communication.
169
170     """
171     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
172     pub_key = utils.ReadFile(pub_key_file)
173
174     for data in self.merger_data:
175       for node in data.nodes:
176         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
177                                      (auth_keys, pub_key)),
178                               private_key=data.key_path)
179
180         if result.failed:
181           raise errors.RemoteError("Unable to add our public key to %s in %s."
182                                    " Fail reason: %s; output: %s" %
183                                    (node, data.cluster, result.fail_reason,
184                                     result.output))
185
186   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
187               strict_host_check=False, private_key=None, batch=True,
188               ask_key=False):
189     """Wrapping SshRunner.Run with default parameters.
190
191     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
192
193     """
194     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
195                                use_cluster_key=use_cluster_key,
196                                strict_host_check=strict_host_check,
197                                private_key=private_key, batch=batch,
198                                ask_key=ask_key)
199
200   def _StopMergingInstances(self):
201     """Stop instances on merging clusters.
202
203     """
204     for cluster in self.clusters:
205       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
206                                      " --force-multiple")
207
208       if result.failed:
209         raise errors.RemoteError("Unable to stop instances on %s."
210                                  " Fail reason: %s; output: %s" %
211                                  (cluster, result.fail_reason, result.output))
212
213   def _DisableWatcher(self):
214     """Disable watch on all merging clusters, including ourself.
215
216     """
217     for cluster in ["localhost"] + self.clusters:
218       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
219                                      self.pause_period)
220
221       if result.failed:
222         raise errors.RemoteError("Unable to pause watcher on %s."
223                                  " Fail reason: %s; output: %s" %
224                                  (cluster, result.fail_reason, result.output))
225
226   def _StopDaemons(self):
227     """Stop all daemons on merging nodes.
228
229     """
230     cmd = "%s stop-all" % constants.DAEMON_UTIL
231     for data in self.merger_data:
232       for node in data.nodes:
233         result = self._RunCmd(node, cmd)
234
235         if result.failed:
236           raise errors.RemoteError("Unable to stop daemons on %s."
237                                    " Fail reason: %s; output: %s." %
238                                    (node, result.fail_reason, result.output))
239
240   def _FetchRemoteConfig(self):
241     """Fetches and stores remote cluster config from the master.
242
243     This step is needed before we can merge the config.
244
245     """
246     for data in self.merger_data:
247       result = self._RunCmd(data.cluster, "cat %s" %
248                                           constants.CLUSTER_CONF_FILE)
249
250       if result.failed:
251         raise errors.RemoteError("Unable to retrieve remote config on %s."
252                                  " Fail reason: %s; output %s" %
253                                  (data.cluster, result.fail_reason,
254                                   result.output))
255
256       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
257                                         data.cluster)
258       utils.WriteFile(data.config_path, data=result.stdout)
259
260   # R0201: Method could be a function
261   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
262     """Kills the local master daemon.
263
264     @raise errors.CommandError: If unable to kill
265
266     """
267     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
268     if result.failed:
269       raise errors.CommandError("Unable to stop master daemons."
270                                 " Fail reason: %s; output: %s" %
271                                 (result.fail_reason, result.output))
272
273   def _MergeConfig(self):
274     """Merges all foreign config into our own config.
275
276     """
277     my_config = config.ConfigWriter(offline=True)
278     fake_ec_id = 0 # Needs to be uniq over the whole config merge
279
280     for data in self.merger_data:
281       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
282       self._MergeNodeGroups(my_config, other_config)
283
284       for node in other_config.GetNodeList():
285         node_info = other_config.GetNodeInfo(node)
286         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
287         fake_ec_id += 1
288
289       for instance in other_config.GetInstanceList():
290         instance_info = other_config.GetInstanceInfo(instance)
291
292         # Update the DRBD port assignments
293         # This is a little bit hackish
294         for dsk in instance_info.disks:
295           if dsk.dev_type in constants.LDS_DRBD:
296             port = my_config.AllocatePort()
297
298             logical_id = list(dsk.logical_id)
299             logical_id[2] = port
300             dsk.logical_id = tuple(logical_id)
301
302             physical_id = list(dsk.physical_id)
303             physical_id[1] = physical_id[3] = port
304             dsk.physical_id = tuple(physical_id)
305
306         my_config.AddInstance(instance_info,
307                               _CLUSTERMERGE_ECID + str(fake_ec_id))
308         fake_ec_id += 1
309
310   # R0201: Method could be a function
311   def _MergeNodeGroups(self, my_config, other_config):
312     """Adds foreign node groups
313
314     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
315     """
316     # pylint: disable-msg=R0201
317     logging.info("Node group conflict strategy: %s" % self.groups)
318
319     my_grps = my_config.GetAllNodeGroupsInfo().values()
320     other_grps = other_config.GetAllNodeGroupsInfo().values()
321
322     # Check for node group naming conflicts:
323     conflicts = []
324     for other_grp in other_grps:
325       for my_grp in my_grps:
326         if other_grp.name == my_grp.name:
327           conflicts.append(other_grp)
328
329     if conflicts:
330       conflict_names = utils.CommaJoin([g.name for g in conflicts])
331       logging.info("Node groups in both local and remote cluster: %s" %
332                    conflict_names)
333
334       # User hasn't specified how to handle conflicts
335       if not self.groups:
336         raise errors.CommandError("The following node group(s) are in both"
337                                   " clusters, and no merge strategy has been"
338                                   " supplied (see the --groups option): %s" %
339                                   conflict_names)
340
341       # User wants to rename conflicts
342       elif self.groups == _GROUPS_RENAME:
343         for grp in conflicts:
344           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
345           logging.info("Renaming remote node group from %s to %s"
346                        " to resolve conflict" % (grp.name, new_name))
347           grp.name = new_name
348
349       # User wants to merge conflicting groups
350       elif self.groups == 'merge':
351         for other_grp in conflicts:
352           logging.info("Merging local and remote '%s' groups" % other_grp.name)
353           for node_name in other_grp.members[:]:
354             node = other_config.GetNodeInfo(node_name)
355             other_config._UnlockedRemoveNodeFromGroup(node)
356
357             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
358             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
359             node.group = my_grp_uuid
360           # Remove from list of groups to add
361           other_grps.remove(other_grp)
362
363     for grp in other_grps:
364       #TODO: handle node group conflicts
365       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
366
367   # R0201: Method could be a function
368   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
369     """Starts the local master daemon.
370
371     @param no_vote: Should the masterd started without voting? default: False
372     @raise errors.CommandError: If unable to start daemon.
373
374     """
375     env = {}
376     if no_vote:
377       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
378
379     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
380     if result.failed:
381       raise errors.CommandError("Couldn't start ganeti master."
382                                 " Fail reason: %s; output: %s" %
383                                 (result.fail_reason, result.output))
384
385   def _ReaddMergedNodesAndRedist(self):
386     """Readds all merging nodes and make sure their config is up-to-date.
387
388     @raise errors.CommandError: If anything fails.
389
390     """
391     for data in self.merger_data:
392       for node in data.nodes:
393         result = utils.RunCmd(["gnt-node", "add", "--readd",
394                                "--no-ssh-key-check", "--force-join", node])
395         if result.failed:
396           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
397                                     " output: %s" % (node, result.fail_reason,
398                                                      result.output))
399
400     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
401     if result.failed:
402       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
403                                 " output: %s" % (result.fail_reason,
404                                                 result.output))
405
406   # R0201: Method could be a function
407   def _StartupAllInstances(self): # pylint: disable-msg=R0201
408     """Starts up all instances (locally).
409
410     @raise errors.CommandError: If unable to start clusters
411
412     """
413     result = utils.RunCmd(["gnt-instance", "startup", "--all",
414                            "--force-multiple"])
415     if result.failed:
416       raise errors.CommandError("Unable to start all instances."
417                                 " Fail reason: %s; output: %s" %
418                                 (result.fail_reason, result.output))
419
420   # R0201: Method could be a function
421   def _VerifyCluster(self): # pylint: disable-msg=R0201
422     """Runs gnt-cluster verify to verify the health.
423
424     @raise errors.ProgrammError: If cluster fails on verification
425
426     """
427     result = utils.RunCmd(["gnt-cluster", "verify"])
428     if result.failed:
429       raise errors.CommandError("Verification of cluster failed."
430                                 " Fail reason: %s; output: %s" %
431                                 (result.fail_reason, result.output))
432
433   def Merge(self):
434     """Does the actual merge.
435
436     It runs all the steps in the right order and updates the user about steps
437     taken. Also it keeps track of rollback_steps to undo everything.
438
439     """
440     rbsteps = []
441     try:
442       logging.info("Pre cluster verification")
443       self._VerifyCluster()
444
445       logging.info("Prepare authorized_keys")
446       rbsteps.append("Remove our key from authorized_keys on nodes:"
447                      " %(nodes)s")
448       self._PrepareAuthorizedKeys()
449
450       rbsteps.append("Start all instances again on the merging"
451                      " clusters: %(clusters)s")
452       logging.info("Stopping merging instances (takes a while)")
453       self._StopMergingInstances()
454
455       logging.info("Disable watcher")
456       self._DisableWatcher()
457       logging.info("Stop daemons on merging nodes")
458       self._StopDaemons()
459       logging.info("Merging config")
460       self._FetchRemoteConfig()
461
462       logging.info("Stopping master daemon")
463       self._KillMasterDaemon()
464
465       rbsteps.append("Restore %s from another master candidate"
466                      " and restart master daemon" %
467                      constants.CLUSTER_CONF_FILE)
468       self._MergeConfig()
469       self._StartMasterDaemon(no_vote=True)
470
471       # Point of no return, delete rbsteps
472       del rbsteps[:]
473
474       logging.warning("We are at the point of no return. Merge can not easily"
475                       " be undone after this point.")
476       logging.info("Readd nodes")
477       self._ReaddMergedNodesAndRedist()
478
479       logging.info("Merge done, restart master daemon normally")
480       self._KillMasterDaemon()
481       self._StartMasterDaemon()
482
483       logging.info("Starting instances again")
484       self._StartupAllInstances()
485       logging.info("Post cluster verification")
486       self._VerifyCluster()
487     except errors.GenericError, e:
488       logging.exception(e)
489
490       if rbsteps:
491         nodes = Flatten([data.nodes for data in self.merger_data])
492         info = {
493           "clusters": self.clusters,
494           "nodes": nodes,
495           }
496         logging.critical("In order to rollback do the following:")
497         for step in rbsteps:
498           logging.critical("  * %s", step % info)
499       else:
500         logging.critical("Nothing to rollback.")
501
502       # TODO: Keep track of steps done for a flawless resume?
503
504   def Cleanup(self):
505     """Clean up our environment.
506
507     This cleans up remote private keys and configs and after that
508     deletes the temporary directory.
509
510     """
511     shutil.rmtree(self.work_dir)
512
513
514 def SetupLogging(options):
515   """Setting up logging infrastructure.
516
517   @param options: Parsed command line options
518
519   """
520   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
521
522   stderr_handler = logging.StreamHandler()
523   stderr_handler.setFormatter(formatter)
524   if options.debug:
525     stderr_handler.setLevel(logging.NOTSET)
526   elif options.verbose:
527     stderr_handler.setLevel(logging.INFO)
528   else:
529     stderr_handler.setLevel(logging.ERROR)
530
531   root_logger = logging.getLogger("")
532   root_logger.setLevel(logging.NOTSET)
533   root_logger.addHandler(stderr_handler)
534
535
536 def main():
537   """Main routine.
538
539   """
540   program = os.path.basename(sys.argv[0])
541
542   parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
543                                         " [--watcher-pause-period SECONDS]"
544                                         " [--groups [%s|%s]]"
545                                         " <cluster> [<cluster...>]" %
546                                         (_GROUPS_MERGE, _GROUPS_RENAME)),
547                                         prog=program)
548   parser.add_option(cli.DEBUG_OPT)
549   parser.add_option(cli.VERBOSE_OPT)
550   parser.add_option(PAUSE_PERIOD_OPT)
551   parser.add_option(GROUPS_OPT)
552
553   (options, args) = parser.parse_args()
554
555   SetupLogging(options)
556
557   if not args:
558     parser.error("No clusters specified")
559
560   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
561                           options.groups)
562   try:
563     try:
564       cluster_merger.Setup()
565       cluster_merger.Merge()
566     except errors.GenericError, e:
567       logging.exception(e)
568       return constants.EXIT_FAILURE
569   finally:
570     cluster_merger.Cleanup()
571
572   return constants.EXIT_SUCCESS
573
574
575 if __name__ == "__main__":
576   sys.exit(main())