Fix pylint warnings
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50                                   action="store", type="int",
51                                   dest="pause_period",
52                                   help=("Amount of time in seconds watcher"
53                                         " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
56                             dest="groups",
57                             help=("How to handle groups that have the"
58                                   " same name (One of: %s/%s)" %
59                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
60
61
62 def Flatten(unflattened_list):
63   """Flattens a list.
64
65   @param unflattened_list: A list of unflattened list objects.
66   @return: A flattened list
67
68   """
69   flattened_list = []
70
71   for item in unflattened_list:
72     if isinstance(item, list):
73       flattened_list.extend(Flatten(item))
74     else:
75       flattened_list.append(item)
76   return flattened_list
77
78
79 class MergerData(object):
80   """Container class to hold data used for merger.
81
82   """
83   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84     """Initialize the container.
85
86     @param cluster: The name of the cluster
87     @param key_path: Path to the ssh private key used for authentication
88     @param nodes: List of nodes in the merging cluster
89     @param instances: List of instances running on merging cluster
90     @param config_path: Path to the merging cluster config
91
92     """
93     self.cluster = cluster
94     self.key_path = key_path
95     self.nodes = nodes
96     self.instances = instances
97     self.config_path = config_path
98
99
100 class Merger(object):
101   """Handling the merge.
102
103   """
104   def __init__(self, clusters, pause_period, groups):
105     """Initialize object with sane defaults and infos required.
106
107     @param clusters: The list of clusters to merge in
108     @param pause_period: The time watcher shall be disabled for
109     @param groups: How to handle group conflicts
110
111     """
112     self.merger_data = []
113     self.clusters = clusters
114     self.pause_period = pause_period
115     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117     self.ssh_runner = ssh.SshRunner(self.cluster_name)
118     self.groups = groups
119
120   def Setup(self):
121     """Sets up our end so we can do the merger.
122
123     This method is setting us up as a preparation for the merger.
124     It makes the initial contact and gathers information needed.
125
126     @raise errors.RemoteError: for errors in communication/grabbing
127
128     """
129     (remote_path, _, _) = ssh.GetUserFiles("root")
130
131     if self.cluster_name in self.clusters:
132       raise errors.CommandError("Cannot merge cluster %s with itself" %
133                                 self.cluster_name)
134
135     # Fetch remotes private key
136     for cluster in self.clusters:
137       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
138                             ask_key=False)
139       if result.failed:
140         raise errors.RemoteError("There was an error while grabbing ssh private"
141                                  " key from %s. Fail reason: %s; output: %s" %
142                                  (cluster, result.fail_reason, result.output))
143
144       key_path = utils.PathJoin(self.work_dir, cluster)
145       utils.WriteFile(key_path, mode=0600, data=result.stdout)
146
147       result = self._RunCmd(cluster, "gnt-node list -o name --no-header",
148                             private_key=key_path)
149       if result.failed:
150         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151                                  " Fail reason: %s; output: %s" %
152                                  (cluster, result.fail_reason, result.output))
153       nodes = result.stdout.splitlines()
154
155       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
156                             private_key=key_path)
157       if result.failed:
158         raise errors.RemoteError("Unable to retrieve list of instances from"
159                                  " %s. Fail reason: %s; output: %s" %
160                                  (cluster, result.fail_reason, result.output))
161       instances = result.stdout.splitlines()
162
163       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
164
165   def _PrepareAuthorizedKeys(self):
166     """Prepare the authorized_keys on every merging node.
167
168     This method add our public key to remotes authorized_key for further
169     communication.
170
171     """
172     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
173     pub_key = utils.ReadFile(pub_key_file)
174
175     for data in self.merger_data:
176       for node in data.nodes:
177         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
178                                      (auth_keys, pub_key)),
179                               private_key=data.key_path)
180
181         if result.failed:
182           raise errors.RemoteError("Unable to add our public key to %s in %s."
183                                    " Fail reason: %s; output: %s" %
184                                    (node, data.cluster, result.fail_reason,
185                                     result.output))
186
187   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
188               strict_host_check=False, private_key=None, batch=True,
189               ask_key=False):
190     """Wrapping SshRunner.Run with default parameters.
191
192     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
193
194     """
195     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
196                                use_cluster_key=use_cluster_key,
197                                strict_host_check=strict_host_check,
198                                private_key=private_key, batch=batch,
199                                ask_key=ask_key)
200
201   def _StopMergingInstances(self):
202     """Stop instances on merging clusters.
203
204     """
205     for cluster in self.clusters:
206       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
207                                      " --force-multiple")
208
209       if result.failed:
210         raise errors.RemoteError("Unable to stop instances on %s."
211                                  " Fail reason: %s; output: %s" %
212                                  (cluster, result.fail_reason, result.output))
213
214   def _DisableWatcher(self):
215     """Disable watch on all merging clusters, including ourself.
216
217     """
218     for cluster in ["localhost"] + self.clusters:
219       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
220                                      self.pause_period)
221
222       if result.failed:
223         raise errors.RemoteError("Unable to pause watcher on %s."
224                                  " Fail reason: %s; output: %s" %
225                                  (cluster, result.fail_reason, result.output))
226
227   def _StopDaemons(self):
228     """Stop all daemons on merging nodes.
229
230     """
231     cmd = "%s stop-all" % constants.DAEMON_UTIL
232     for data in self.merger_data:
233       for node in data.nodes:
234         result = self._RunCmd(node, cmd)
235
236         if result.failed:
237           raise errors.RemoteError("Unable to stop daemons on %s."
238                                    " Fail reason: %s; output: %s." %
239                                    (node, result.fail_reason, result.output))
240
241   def _FetchRemoteConfig(self):
242     """Fetches and stores remote cluster config from the master.
243
244     This step is needed before we can merge the config.
245
246     """
247     for data in self.merger_data:
248       result = self._RunCmd(data.cluster, "cat %s" %
249                                           constants.CLUSTER_CONF_FILE)
250
251       if result.failed:
252         raise errors.RemoteError("Unable to retrieve remote config on %s."
253                                  " Fail reason: %s; output %s" %
254                                  (data.cluster, result.fail_reason,
255                                   result.output))
256
257       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258                                         data.cluster)
259       utils.WriteFile(data.config_path, data=result.stdout)
260
261   # R0201: Method could be a function
262   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263     """Kills the local master daemon.
264
265     @raise errors.CommandError: If unable to kill
266
267     """
268     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269     if result.failed:
270       raise errors.CommandError("Unable to stop master daemons."
271                                 " Fail reason: %s; output: %s" %
272                                 (result.fail_reason, result.output))
273
274   def _MergeConfig(self):
275     """Merges all foreign config into our own config.
276
277     """
278     my_config = config.ConfigWriter(offline=True)
279     fake_ec_id = 0 # Needs to be uniq over the whole config merge
280
281     for data in self.merger_data:
282       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
283       self._MergeNodeGroups(my_config, other_config)
284
285       for node in other_config.GetNodeList():
286         node_info = other_config.GetNodeInfo(node)
287         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
288         fake_ec_id += 1
289
290       for instance in other_config.GetInstanceList():
291         instance_info = other_config.GetInstanceInfo(instance)
292
293         # Update the DRBD port assignments
294         # This is a little bit hackish
295         for dsk in instance_info.disks:
296           if dsk.dev_type in constants.LDS_DRBD:
297             port = my_config.AllocatePort()
298
299             logical_id = list(dsk.logical_id)
300             logical_id[2] = port
301             dsk.logical_id = tuple(logical_id)
302
303             physical_id = list(dsk.physical_id)
304             physical_id[1] = physical_id[3] = port
305             dsk.physical_id = tuple(physical_id)
306
307         my_config.AddInstance(instance_info,
308                               _CLUSTERMERGE_ECID + str(fake_ec_id))
309         fake_ec_id += 1
310
311   # R0201: Method could be a function
312   def _MergeNodeGroups(self, my_config, other_config):
313     """Adds foreign node groups
314
315     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
316     """
317     # pylint: disable-msg=R0201
318     logging.info("Node group conflict strategy: %s", self.groups)
319
320     my_grps = my_config.GetAllNodeGroupsInfo().values()
321     other_grps = other_config.GetAllNodeGroupsInfo().values()
322
323     # Check for node group naming conflicts:
324     conflicts = []
325     for other_grp in other_grps:
326       for my_grp in my_grps:
327         if other_grp.name == my_grp.name:
328           conflicts.append(other_grp)
329
330     if conflicts:
331       conflict_names = utils.CommaJoin([g.name for g in conflicts])
332       logging.info("Node groups in both local and remote cluster: %s",
333                    conflict_names)
334
335       # User hasn't specified how to handle conflicts
336       if not self.groups:
337         raise errors.CommandError("The following node group(s) are in both"
338                                   " clusters, and no merge strategy has been"
339                                   " supplied (see the --groups option): %s" %
340                                   conflict_names)
341
342       # User wants to rename conflicts
343       elif self.groups == _GROUPS_RENAME:
344         for grp in conflicts:
345           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
346           logging.info("Renaming remote node group from %s to %s"
347                        " to resolve conflict", grp.name, new_name)
348           grp.name = new_name
349
350       # User wants to merge conflicting groups
351       elif self.groups == 'merge':
352         for other_grp in conflicts:
353           logging.info("Merging local and remote '%s' groups", other_grp.name)
354           for node_name in other_grp.members[:]:
355             node = other_config.GetNodeInfo(node_name)
356             # Access to a protected member of a client class
357             # pylint: disable-msg=W0212
358             other_config._UnlockedRemoveNodeFromGroup(node)
359
360             # Access to a protected member of a client class
361             # pylint: disable-msg=W0212
362             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
363
364             # Access to a protected member of a client class
365             # pylint: disable-msg=W0212
366             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
367             node.group = my_grp_uuid
368           # Remove from list of groups to add
369           other_grps.remove(other_grp)
370
371     for grp in other_grps:
372       #TODO: handle node group conflicts
373       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
374
375   # R0201: Method could be a function
376   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
377     """Starts the local master daemon.
378
379     @param no_vote: Should the masterd started without voting? default: False
380     @raise errors.CommandError: If unable to start daemon.
381
382     """
383     env = {}
384     if no_vote:
385       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
386
387     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
388     if result.failed:
389       raise errors.CommandError("Couldn't start ganeti master."
390                                 " Fail reason: %s; output: %s" %
391                                 (result.fail_reason, result.output))
392
393   def _ReaddMergedNodesAndRedist(self):
394     """Readds all merging nodes and make sure their config is up-to-date.
395
396     @raise errors.CommandError: If anything fails.
397
398     """
399     for data in self.merger_data:
400       for node in data.nodes:
401         result = utils.RunCmd(["gnt-node", "add", "--readd",
402                                "--no-ssh-key-check", "--force-join", node])
403         if result.failed:
404           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
405                                     " output: %s" % (node, result.fail_reason,
406                                                      result.output))
407
408     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
409     if result.failed:
410       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
411                                 " output: %s" % (result.fail_reason,
412                                                 result.output))
413
414   # R0201: Method could be a function
415   def _StartupAllInstances(self): # pylint: disable-msg=R0201
416     """Starts up all instances (locally).
417
418     @raise errors.CommandError: If unable to start clusters
419
420     """
421     result = utils.RunCmd(["gnt-instance", "startup", "--all",
422                            "--force-multiple"])
423     if result.failed:
424       raise errors.CommandError("Unable to start all instances."
425                                 " Fail reason: %s; output: %s" %
426                                 (result.fail_reason, result.output))
427
428   # R0201: Method could be a function
429   def _VerifyCluster(self): # pylint: disable-msg=R0201
430     """Runs gnt-cluster verify to verify the health.
431
432     @raise errors.ProgrammError: If cluster fails on verification
433
434     """
435     result = utils.RunCmd(["gnt-cluster", "verify"])
436     if result.failed:
437       raise errors.CommandError("Verification of cluster failed."
438                                 " Fail reason: %s; output: %s" %
439                                 (result.fail_reason, result.output))
440
441   def Merge(self):
442     """Does the actual merge.
443
444     It runs all the steps in the right order and updates the user about steps
445     taken. Also it keeps track of rollback_steps to undo everything.
446
447     """
448     rbsteps = []
449     try:
450       logging.info("Pre cluster verification")
451       self._VerifyCluster()
452
453       logging.info("Prepare authorized_keys")
454       rbsteps.append("Remove our key from authorized_keys on nodes:"
455                      " %(nodes)s")
456       self._PrepareAuthorizedKeys()
457
458       rbsteps.append("Start all instances again on the merging"
459                      " clusters: %(clusters)s")
460       logging.info("Stopping merging instances (takes a while)")
461       self._StopMergingInstances()
462
463       logging.info("Disable watcher")
464       self._DisableWatcher()
465       logging.info("Stop daemons on merging nodes")
466       self._StopDaemons()
467       logging.info("Merging config")
468       self._FetchRemoteConfig()
469
470       logging.info("Stopping master daemon")
471       self._KillMasterDaemon()
472
473       rbsteps.append("Restore %s from another master candidate"
474                      " and restart master daemon" %
475                      constants.CLUSTER_CONF_FILE)
476       self._MergeConfig()
477       self._StartMasterDaemon(no_vote=True)
478
479       # Point of no return, delete rbsteps
480       del rbsteps[:]
481
482       logging.warning("We are at the point of no return. Merge can not easily"
483                       " be undone after this point.")
484       logging.info("Readd nodes")
485       self._ReaddMergedNodesAndRedist()
486
487       logging.info("Merge done, restart master daemon normally")
488       self._KillMasterDaemon()
489       self._StartMasterDaemon()
490
491       logging.info("Starting instances again")
492       self._StartupAllInstances()
493       logging.info("Post cluster verification")
494       self._VerifyCluster()
495     except errors.GenericError, e:
496       logging.exception(e)
497
498       if rbsteps:
499         nodes = Flatten([data.nodes for data in self.merger_data])
500         info = {
501           "clusters": self.clusters,
502           "nodes": nodes,
503           }
504         logging.critical("In order to rollback do the following:")
505         for step in rbsteps:
506           logging.critical("  * %s", step % info)
507       else:
508         logging.critical("Nothing to rollback.")
509
510       # TODO: Keep track of steps done for a flawless resume?
511
512   def Cleanup(self):
513     """Clean up our environment.
514
515     This cleans up remote private keys and configs and after that
516     deletes the temporary directory.
517
518     """
519     shutil.rmtree(self.work_dir)
520
521
522 def SetupLogging(options):
523   """Setting up logging infrastructure.
524
525   @param options: Parsed command line options
526
527   """
528   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
529
530   stderr_handler = logging.StreamHandler()
531   stderr_handler.setFormatter(formatter)
532   if options.debug:
533     stderr_handler.setLevel(logging.NOTSET)
534   elif options.verbose:
535     stderr_handler.setLevel(logging.INFO)
536   else:
537     stderr_handler.setLevel(logging.ERROR)
538
539   root_logger = logging.getLogger("")
540   root_logger.setLevel(logging.NOTSET)
541   root_logger.addHandler(stderr_handler)
542
543
544 def main():
545   """Main routine.
546
547   """
548   program = os.path.basename(sys.argv[0])
549
550   parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
551                                         " [--watcher-pause-period SECONDS]"
552                                         " [--groups [%s|%s]]"
553                                         " <cluster> [<cluster...>]" %
554                                         (_GROUPS_MERGE, _GROUPS_RENAME)),
555                                         prog=program)
556   parser.add_option(cli.DEBUG_OPT)
557   parser.add_option(cli.VERBOSE_OPT)
558   parser.add_option(PAUSE_PERIOD_OPT)
559   parser.add_option(GROUPS_OPT)
560
561   (options, args) = parser.parse_args()
562
563   SetupLogging(options)
564
565   if not args:
566     parser.error("No clusters specified")
567
568   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
569                           options.groups)
570   try:
571     try:
572       cluster_merger.Setup()
573       cluster_merger.Merge()
574     except errors.GenericError, e:
575       logging.exception(e)
576       return constants.EXIT_FAILURE
577   finally:
578     cluster_merger.Cleanup()
579
580   return constants.EXIT_SUCCESS
581
582
583 if __name__ == "__main__":
584   sys.exit(main())