Revert "Only merge nodes that are known to not be offline"
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50                                   action="store", type="int",
51                                   dest="pause_period",
52                                   help=("Amount of time in seconds watcher"
53                                         " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
56                             dest="groups",
57                             help=("How to handle groups that have the"
58                                   " same name (One of: %s/%s)" %
59                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
60
61
62 def Flatten(unflattened_list):
63   """Flattens a list.
64
65   @param unflattened_list: A list of unflattened list objects.
66   @return: A flattened list
67
68   """
69   flattened_list = []
70
71   for item in unflattened_list:
72     if isinstance(item, list):
73       flattened_list.extend(Flatten(item))
74     else:
75       flattened_list.append(item)
76   return flattened_list
77
78
79 class MergerData(object):
80   """Container class to hold data used for merger.
81
82   """
83   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84     """Initialize the container.
85
86     @param cluster: The name of the cluster
87     @param key_path: Path to the ssh private key used for authentication
88     @param nodes: List of online nodes in the merging cluster
89     @param instances: List of instances running on merging cluster
90     @param config_path: Path to the merging cluster config
91
92     """
93     self.cluster = cluster
94     self.key_path = key_path
95     self.nodes = nodes
96     self.instances = instances
97     self.config_path = config_path
98
99
100 class Merger(object):
101   """Handling the merge.
102
103   """
104   def __init__(self, clusters, pause_period, groups):
105     """Initialize object with sane defaults and infos required.
106
107     @param clusters: The list of clusters to merge in
108     @param pause_period: The time watcher shall be disabled for
109     @param groups: How to handle group conflicts
110
111     """
112     self.merger_data = []
113     self.clusters = clusters
114     self.pause_period = pause_period
115     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117     self.ssh_runner = ssh.SshRunner(self.cluster_name)
118     self.groups = groups
119
120   def Setup(self):
121     """Sets up our end so we can do the merger.
122
123     This method is setting us up as a preparation for the merger.
124     It makes the initial contact and gathers information needed.
125
126     @raise errors.RemoteError: for errors in communication/grabbing
127
128     """
129     (remote_path, _, _) = ssh.GetUserFiles("root")
130
131     if self.cluster_name in self.clusters:
132       raise errors.CommandError("Cannot merge cluster %s with itself" %
133                                 self.cluster_name)
134
135     # Fetch remotes private key
136     for cluster in self.clusters:
137       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
138                             ask_key=False)
139       if result.failed:
140         raise errors.RemoteError("There was an error while grabbing ssh private"
141                                  " key from %s. Fail reason: %s; output: %s" %
142                                  (cluster, result.fail_reason, result.output))
143
144       key_path = utils.PathJoin(self.work_dir, cluster)
145       utils.WriteFile(key_path, mode=0600, data=result.stdout)
146
147       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
148                             " --no-header --separator=,", private_key=key_path)
149       if result.failed:
150         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151                                  " Fail reason: %s; output: %s" %
152                                  (cluster, result.fail_reason, result.output))
153       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
154       nodes = [node_status[0] for node_status in nodes_statuses
155                if node_status[1] == "N"]
156
157       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
158                             private_key=key_path)
159       if result.failed:
160         raise errors.RemoteError("Unable to retrieve list of instances from"
161                                  " %s. Fail reason: %s; output: %s" %
162                                  (cluster, result.fail_reason, result.output))
163       instances = result.stdout.splitlines()
164
165       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
166
167   def _PrepareAuthorizedKeys(self):
168     """Prepare the authorized_keys on every merging node.
169
170     This method add our public key to remotes authorized_key for further
171     communication.
172
173     """
174     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
175     pub_key = utils.ReadFile(pub_key_file)
176
177     for data in self.merger_data:
178       for node in data.nodes:
179         result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
180                                      (auth_keys, pub_key)),
181                               private_key=data.key_path)
182
183         if result.failed:
184           raise errors.RemoteError("Unable to add our public key to %s in %s."
185                                    " Fail reason: %s; output: %s" %
186                                    (node, data.cluster, result.fail_reason,
187                                     result.output))
188
189   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
190               strict_host_check=False, private_key=None, batch=True,
191               ask_key=False):
192     """Wrapping SshRunner.Run with default parameters.
193
194     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
195
196     """
197     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
198                                use_cluster_key=use_cluster_key,
199                                strict_host_check=strict_host_check,
200                                private_key=private_key, batch=batch,
201                                ask_key=ask_key)
202
203   def _StopMergingInstances(self):
204     """Stop instances on merging clusters.
205
206     """
207     for cluster in self.clusters:
208       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
209                                      " --force-multiple")
210
211       if result.failed:
212         raise errors.RemoteError("Unable to stop instances on %s."
213                                  " Fail reason: %s; output: %s" %
214                                  (cluster, result.fail_reason, result.output))
215
216   def _DisableWatcher(self):
217     """Disable watch on all merging clusters, including ourself.
218
219     """
220     for cluster in ["localhost"] + self.clusters:
221       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
222                                      self.pause_period)
223
224       if result.failed:
225         raise errors.RemoteError("Unable to pause watcher on %s."
226                                  " Fail reason: %s; output: %s" %
227                                  (cluster, result.fail_reason, result.output))
228
229   def _StopDaemons(self):
230     """Stop all daemons on merging nodes.
231
232     """
233     cmd = "%s stop-all" % constants.DAEMON_UTIL
234     for data in self.merger_data:
235       for node in data.nodes:
236         result = self._RunCmd(node, cmd)
237
238         if result.failed:
239           raise errors.RemoteError("Unable to stop daemons on %s."
240                                    " Fail reason: %s; output: %s." %
241                                    (node, result.fail_reason, result.output))
242
243   def _FetchRemoteConfig(self):
244     """Fetches and stores remote cluster config from the master.
245
246     This step is needed before we can merge the config.
247
248     """
249     for data in self.merger_data:
250       result = self._RunCmd(data.cluster, "cat %s" %
251                                           constants.CLUSTER_CONF_FILE)
252
253       if result.failed:
254         raise errors.RemoteError("Unable to retrieve remote config on %s."
255                                  " Fail reason: %s; output %s" %
256                                  (data.cluster, result.fail_reason,
257                                   result.output))
258
259       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
260                                         data.cluster)
261       utils.WriteFile(data.config_path, data=result.stdout)
262
263   # R0201: Method could be a function
264   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
265     """Kills the local master daemon.
266
267     @raise errors.CommandError: If unable to kill
268
269     """
270     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
271     if result.failed:
272       raise errors.CommandError("Unable to stop master daemons."
273                                 " Fail reason: %s; output: %s" %
274                                 (result.fail_reason, result.output))
275
276   def _MergeConfig(self):
277     """Merges all foreign config into our own config.
278
279     """
280     my_config = config.ConfigWriter(offline=True)
281     fake_ec_id = 0 # Needs to be uniq over the whole config merge
282
283     for data in self.merger_data:
284       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
285       self._MergeNodeGroups(my_config, other_config)
286
287       for node in other_config.GetNodeList():
288         node_info = other_config.GetNodeInfo(node)
289         my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
290         fake_ec_id += 1
291
292       for instance in other_config.GetInstanceList():
293         instance_info = other_config.GetInstanceInfo(instance)
294
295         # Update the DRBD port assignments
296         # This is a little bit hackish
297         for dsk in instance_info.disks:
298           if dsk.dev_type in constants.LDS_DRBD:
299             port = my_config.AllocatePort()
300
301             logical_id = list(dsk.logical_id)
302             logical_id[2] = port
303             dsk.logical_id = tuple(logical_id)
304
305             physical_id = list(dsk.physical_id)
306             physical_id[1] = physical_id[3] = port
307             dsk.physical_id = tuple(physical_id)
308
309         my_config.AddInstance(instance_info,
310                               _CLUSTERMERGE_ECID + str(fake_ec_id))
311         fake_ec_id += 1
312
313   # R0201: Method could be a function
314   def _MergeNodeGroups(self, my_config, other_config):
315     """Adds foreign node groups
316
317     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
318     """
319     # pylint: disable-msg=R0201
320     logging.info("Node group conflict strategy: %s", self.groups)
321
322     my_grps = my_config.GetAllNodeGroupsInfo().values()
323     other_grps = other_config.GetAllNodeGroupsInfo().values()
324
325     # Check for node group naming conflicts:
326     conflicts = []
327     for other_grp in other_grps:
328       for my_grp in my_grps:
329         if other_grp.name == my_grp.name:
330           conflicts.append(other_grp)
331
332     if conflicts:
333       conflict_names = utils.CommaJoin([g.name for g in conflicts])
334       logging.info("Node groups in both local and remote cluster: %s",
335                    conflict_names)
336
337       # User hasn't specified how to handle conflicts
338       if not self.groups:
339         raise errors.CommandError("The following node group(s) are in both"
340                                   " clusters, and no merge strategy has been"
341                                   " supplied (see the --groups option): %s" %
342                                   conflict_names)
343
344       # User wants to rename conflicts
345       elif self.groups == _GROUPS_RENAME:
346         for grp in conflicts:
347           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
348           logging.info("Renaming remote node group from %s to %s"
349                        " to resolve conflict", grp.name, new_name)
350           grp.name = new_name
351
352       # User wants to merge conflicting groups
353       elif self.groups == 'merge':
354         for other_grp in conflicts:
355           logging.info("Merging local and remote '%s' groups", other_grp.name)
356           for node_name in other_grp.members[:]:
357             node = other_config.GetNodeInfo(node_name)
358             # Access to a protected member of a client class
359             # pylint: disable-msg=W0212
360             other_config._UnlockedRemoveNodeFromGroup(node)
361
362             # Access to a protected member of a client class
363             # pylint: disable-msg=W0212
364             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
365
366             # Access to a protected member of a client class
367             # pylint: disable-msg=W0212
368             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
369             node.group = my_grp_uuid
370           # Remove from list of groups to add
371           other_grps.remove(other_grp)
372
373     for grp in other_grps:
374       #TODO: handle node group conflicts
375       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
376
377   # R0201: Method could be a function
378   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
379     """Starts the local master daemon.
380
381     @param no_vote: Should the masterd started without voting? default: False
382     @raise errors.CommandError: If unable to start daemon.
383
384     """
385     env = {}
386     if no_vote:
387       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
388
389     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
390     if result.failed:
391       raise errors.CommandError("Couldn't start ganeti master."
392                                 " Fail reason: %s; output: %s" %
393                                 (result.fail_reason, result.output))
394
395   def _ReaddMergedNodesAndRedist(self):
396     """Readds all merging nodes and make sure their config is up-to-date.
397
398     @raise errors.CommandError: If anything fails.
399
400     """
401     for data in self.merger_data:
402       for node in data.nodes:
403         result = utils.RunCmd(["gnt-node", "add", "--readd",
404                                "--no-ssh-key-check", "--force-join", node])
405         if result.failed:
406           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
407                                     " output: %s" % (node, result.fail_reason,
408                                                      result.output))
409
410     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
411     if result.failed:
412       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
413                                 " output: %s" % (result.fail_reason,
414                                                 result.output))
415
416   # R0201: Method could be a function
417   def _StartupAllInstances(self): # pylint: disable-msg=R0201
418     """Starts up all instances (locally).
419
420     @raise errors.CommandError: If unable to start clusters
421
422     """
423     result = utils.RunCmd(["gnt-instance", "startup", "--all",
424                            "--force-multiple"])
425     if result.failed:
426       raise errors.CommandError("Unable to start all instances."
427                                 " Fail reason: %s; output: %s" %
428                                 (result.fail_reason, result.output))
429
430   # R0201: Method could be a function
431   def _VerifyCluster(self): # pylint: disable-msg=R0201
432     """Runs gnt-cluster verify to verify the health.
433
434     @raise errors.ProgrammError: If cluster fails on verification
435
436     """
437     result = utils.RunCmd(["gnt-cluster", "verify"])
438     if result.failed:
439       raise errors.CommandError("Verification of cluster failed."
440                                 " Fail reason: %s; output: %s" %
441                                 (result.fail_reason, result.output))
442
443   def Merge(self):
444     """Does the actual merge.
445
446     It runs all the steps in the right order and updates the user about steps
447     taken. Also it keeps track of rollback_steps to undo everything.
448
449     """
450     rbsteps = []
451     try:
452       logging.info("Pre cluster verification")
453       self._VerifyCluster()
454
455       logging.info("Prepare authorized_keys")
456       rbsteps.append("Remove our key from authorized_keys on nodes:"
457                      " %(nodes)s")
458       self._PrepareAuthorizedKeys()
459
460       rbsteps.append("Start all instances again on the merging"
461                      " clusters: %(clusters)s")
462       logging.info("Stopping merging instances (takes a while)")
463       self._StopMergingInstances()
464
465       logging.info("Disable watcher")
466       self._DisableWatcher()
467       logging.info("Stop daemons on merging nodes")
468       self._StopDaemons()
469       logging.info("Merging config")
470       self._FetchRemoteConfig()
471
472       logging.info("Stopping master daemon")
473       self._KillMasterDaemon()
474
475       rbsteps.append("Restore %s from another master candidate"
476                      " and restart master daemon" %
477                      constants.CLUSTER_CONF_FILE)
478       self._MergeConfig()
479       self._StartMasterDaemon(no_vote=True)
480
481       # Point of no return, delete rbsteps
482       del rbsteps[:]
483
484       logging.warning("We are at the point of no return. Merge can not easily"
485                       " be undone after this point.")
486       logging.info("Readd nodes")
487       self._ReaddMergedNodesAndRedist()
488
489       logging.info("Merge done, restart master daemon normally")
490       self._KillMasterDaemon()
491       self._StartMasterDaemon()
492
493       logging.info("Starting instances again")
494       self._StartupAllInstances()
495       logging.info("Post cluster verification")
496       self._VerifyCluster()
497     except errors.GenericError, e:
498       logging.exception(e)
499
500       if rbsteps:
501         nodes = Flatten([data.nodes for data in self.merger_data])
502         info = {
503           "clusters": self.clusters,
504           "nodes": nodes,
505           }
506         logging.critical("In order to rollback do the following:")
507         for step in rbsteps:
508           logging.critical("  * %s", step % info)
509       else:
510         logging.critical("Nothing to rollback.")
511
512       # TODO: Keep track of steps done for a flawless resume?
513
514   def Cleanup(self):
515     """Clean up our environment.
516
517     This cleans up remote private keys and configs and after that
518     deletes the temporary directory.
519
520     """
521     shutil.rmtree(self.work_dir)
522
523
524 def SetupLogging(options):
525   """Setting up logging infrastructure.
526
527   @param options: Parsed command line options
528
529   """
530   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
531
532   stderr_handler = logging.StreamHandler()
533   stderr_handler.setFormatter(formatter)
534   if options.debug:
535     stderr_handler.setLevel(logging.NOTSET)
536   elif options.verbose:
537     stderr_handler.setLevel(logging.INFO)
538   else:
539     stderr_handler.setLevel(logging.ERROR)
540
541   root_logger = logging.getLogger("")
542   root_logger.setLevel(logging.NOTSET)
543   root_logger.addHandler(stderr_handler)
544
545
546 def main():
547   """Main routine.
548
549   """
550   program = os.path.basename(sys.argv[0])
551
552   parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
553                                         " [--watcher-pause-period SECONDS]"
554                                         " [--groups [%s|%s]]"
555                                         " <cluster> [<cluster...>]" %
556                                         (_GROUPS_MERGE, _GROUPS_RENAME)),
557                                         prog=program)
558   parser.add_option(cli.DEBUG_OPT)
559   parser.add_option(cli.VERBOSE_OPT)
560   parser.add_option(PAUSE_PERIOD_OPT)
561   parser.add_option(GROUPS_OPT)
562
563   (options, args) = parser.parse_args()
564
565   SetupLogging(options)
566
567   if not args:
568     parser.error("No clusters specified")
569
570   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
571                           options.groups)
572   try:
573     try:
574       cluster_merger.Setup()
575       cluster_merger.Merge()
576     except errors.GenericError, e:
577       logging.exception(e)
578       return constants.EXIT_FAILURE
579   finally:
580     cluster_merger.Cleanup()
581
582   return constants.EXIT_SUCCESS
583
584
585 if __name__ == "__main__":
586   sys.exit(main())