cluster-merge: only operate on online nodes
[ganeti-local] / tools / cluster-merge
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Tool to merge two or more clusters together.
22
23 The clusters have to run the same version of Ganeti!
24
25 """
26
27 # pylint: disable-msg=C0103
28 # C0103: Invalid name cluster-merge
29
30 import logging
31 import os
32 import optparse
33 import shutil
34 import sys
35 import tempfile
36
37 from ganeti import cli
38 from ganeti import config
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ssh
42 from ganeti import utils
43
44
45 _GROUPS_MERGE = "merge"
46 _GROUPS_RENAME = "rename"
47 _CLUSTERMERGE_ECID = "clustermerge-ecid"
48
49 PAUSE_PERIOD_OPT = cli.cli_option("-p", "--watcher-pause-period", default=1800,
50                                   action="store", type="int",
51                                   dest="pause_period",
52                                   help=("Amount of time in seconds watcher"
53                                         " should be suspended from running"))
54 GROUPS_OPT = cli.cli_option("--groups", default=None, metavar="STRATEGY",
55                             choices=(_GROUPS_MERGE, _GROUPS_RENAME),
56                             dest="groups",
57                             help=("How to handle groups that have the"
58                                   " same name (One of: %s/%s)" %
59                                   (_GROUPS_MERGE, _GROUPS_RENAME)))
60
61
62 def Flatten(unflattened_list):
63   """Flattens a list.
64
65   @param unflattened_list: A list of unflattened list objects.
66   @return: A flattened list
67
68   """
69   flattened_list = []
70
71   for item in unflattened_list:
72     if isinstance(item, list):
73       flattened_list.extend(Flatten(item))
74     else:
75       flattened_list.append(item)
76   return flattened_list
77
78
79 class MergerData(object):
80   """Container class to hold data used for merger.
81
82   """
83   def __init__(self, cluster, key_path, nodes, instances, config_path=None):
84     """Initialize the container.
85
86     @param cluster: The name of the cluster
87     @param key_path: Path to the ssh private key used for authentication
88     @param nodes: List of online nodes in the merging cluster
89     @param instances: List of instances running on merging cluster
90     @param config_path: Path to the merging cluster config
91
92     """
93     self.cluster = cluster
94     self.key_path = key_path
95     self.nodes = nodes
96     self.instances = instances
97     self.config_path = config_path
98
99
100 class Merger(object):
101   """Handling the merge.
102
103   """
104   def __init__(self, clusters, pause_period, groups):
105     """Initialize object with sane defaults and infos required.
106
107     @param clusters: The list of clusters to merge in
108     @param pause_period: The time watcher shall be disabled for
109     @param groups: How to handle group conflicts
110
111     """
112     self.merger_data = []
113     self.clusters = clusters
114     self.pause_period = pause_period
115     self.work_dir = tempfile.mkdtemp(suffix="cluster-merger")
116     (self.cluster_name, ) = cli.GetClient().QueryConfigValues(["cluster_name"])
117     self.ssh_runner = ssh.SshRunner(self.cluster_name)
118     self.groups = groups
119
120   def Setup(self):
121     """Sets up our end so we can do the merger.
122
123     This method is setting us up as a preparation for the merger.
124     It makes the initial contact and gathers information needed.
125
126     @raise errors.RemoteError: for errors in communication/grabbing
127
128     """
129     (remote_path, _, _) = ssh.GetUserFiles("root")
130
131     if self.cluster_name in self.clusters:
132       raise errors.CommandError("Cannot merge cluster %s with itself" %
133                                 self.cluster_name)
134
135     # Fetch remotes private key
136     for cluster in self.clusters:
137       result = self._RunCmd(cluster, "cat %s" % remote_path, batch=False,
138                             ask_key=False)
139       if result.failed:
140         raise errors.RemoteError("There was an error while grabbing ssh private"
141                                  " key from %s. Fail reason: %s; output: %s" %
142                                  (cluster, result.fail_reason, result.output))
143
144       key_path = utils.PathJoin(self.work_dir, cluster)
145       utils.WriteFile(key_path, mode=0600, data=result.stdout)
146
147       result = self._RunCmd(cluster, "gnt-node list -o name,offline"
148                             " --no-header --separator=,", private_key=key_path)
149       if result.failed:
150         raise errors.RemoteError("Unable to retrieve list of nodes from %s."
151                                  " Fail reason: %s; output: %s" %
152                                  (cluster, result.fail_reason, result.output))
153       nodes_statuses = [line.split(',') for line in result.stdout.splitlines()]
154       nodes = [node_status[0] for node_status in nodes_statuses
155                if node_status[1] == "N"]
156
157       result = self._RunCmd(cluster, "gnt-instance list -o name --no-header",
158                             private_key=key_path)
159       if result.failed:
160         raise errors.RemoteError("Unable to retrieve list of instances from"
161                                  " %s. Fail reason: %s; output: %s" %
162                                  (cluster, result.fail_reason, result.output))
163       instances = result.stdout.splitlines()
164
165       self.merger_data.append(MergerData(cluster, key_path, nodes, instances))
166
167   def _PrepareAuthorizedKeys(self, merge_data, node):
168     """Prepare the authorized_keys on merging nodes.
169
170     This method add our public key to remotes authorized_key for further
171     communication.
172
173     """
174     (_, pub_key_file, auth_keys) = ssh.GetUserFiles("root")
175     pub_key = utils.ReadFile(pub_key_file)
176
177     result = self._RunCmd(node, ("cat >> %s << '!EOF.'\n%s!EOF.\n" %
178                                  (auth_keys, pub_key)),
179                           private_key=merge_data.key_path)
180
181     if result.failed:
182       raise errors.RemoteError("Unable to add our public key to %s in %s."
183                                " Fail reason: %s; output: %s" %
184                                (node, data.cluster, result.fail_reason,
185                                 result.output))
186
187   def _RunCmd(self, hostname, command, user="root", use_cluster_key=False,
188               strict_host_check=False, private_key=None, batch=True,
189               ask_key=False):
190     """Wrapping SshRunner.Run with default parameters.
191
192     For explanation of parameters see L{ganeti.ssh.SshRunner.Run}.
193
194     """
195     return self.ssh_runner.Run(hostname=hostname, command=command, user=user,
196                                use_cluster_key=use_cluster_key,
197                                strict_host_check=strict_host_check,
198                                private_key=private_key, batch=batch,
199                                ask_key=ask_key)
200
201   def _StopMergingInstances(self):
202     """Stop instances on merging clusters.
203
204     """
205     for cluster in self.clusters:
206       result = self._RunCmd(cluster, "gnt-instance shutdown --all"
207                                      " --force-multiple")
208
209       if result.failed:
210         raise errors.RemoteError("Unable to stop instances on %s."
211                                  " Fail reason: %s; output: %s" %
212                                  (cluster, result.fail_reason, result.output))
213
214   def _DisableWatcher(self):
215     """Disable watch on all merging clusters, including ourself.
216
217     """
218     for cluster in ["localhost"] + self.clusters:
219       result = self._RunCmd(cluster, "gnt-cluster watcher pause %d" %
220                                      self.pause_period)
221
222       if result.failed:
223         raise errors.RemoteError("Unable to pause watcher on %s."
224                                  " Fail reason: %s; output: %s" %
225                                  (cluster, result.fail_reason, result.output))
226
227   def _StopDaemons(self):
228     """Stop all daemons on merging nodes.
229
230     """
231     cmd = "%s stop-all" % constants.DAEMON_UTIL
232     for data in self.merger_data:
233       for node in data.nodes:
234         result = self._RunCmd(node, cmd)
235
236         if result.failed:
237           raise errors.RemoteError("Unable to stop daemons on %s."
238                                    " Fail reason: %s; output: %s." %
239                                    (node, result.fail_reason, result.output))
240
241   def _FetchRemoteConfig(self):
242     """Fetches and stores remote cluster config from the master.
243
244     This step is needed before we can merge the config.
245
246     """
247     for data in self.merger_data:
248       result = self._RunCmd(data.cluster, "cat %s" %
249                                           constants.CLUSTER_CONF_FILE)
250
251       if result.failed:
252         raise errors.RemoteError("Unable to retrieve remote config on %s."
253                                  " Fail reason: %s; output %s" %
254                                  (data.cluster, result.fail_reason,
255                                   result.output))
256
257       data.config_path = utils.PathJoin(self.work_dir, "%s_config.data" %
258                                         data.cluster)
259       utils.WriteFile(data.config_path, data=result.stdout)
260
261   # R0201: Method could be a function
262   def _KillMasterDaemon(self): # pylint: disable-msg=R0201
263     """Kills the local master daemon.
264
265     @raise errors.CommandError: If unable to kill
266
267     """
268     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
269     if result.failed:
270       raise errors.CommandError("Unable to stop master daemons."
271                                 " Fail reason: %s; output: %s" %
272                                 (result.fail_reason, result.output))
273
274   def _MergeConfig(self):
275     """Merges all foreign config into our own config.
276
277     """
278     my_config = config.ConfigWriter(offline=True)
279     fake_ec_id = 0 # Needs to be uniq over the whole config merge
280
281     for data in self.merger_data:
282       other_config = config.ConfigWriter(data.config_path, accept_foreign=True)
283       self._MergeNodeGroups(my_config, other_config)
284
285       for node in other_config.GetNodeList():
286         node_info = other_config.GetNodeInfo(node)
287         if not node_info.offline:
288           self._PrepareAuthorizedKeys(data, node_info.name)
289           my_config.AddNode(node_info, _CLUSTERMERGE_ECID + str(fake_ec_id))
290         fake_ec_id += 1
291
292       for instance in other_config.GetInstanceList():
293         instance_info = other_config.GetInstanceInfo(instance)
294
295         # Update the DRBD port assignments
296         # This is a little bit hackish
297         for dsk in instance_info.disks:
298           if dsk.dev_type in constants.LDS_DRBD:
299             port = my_config.AllocatePort()
300
301             logical_id = list(dsk.logical_id)
302             logical_id[2] = port
303             dsk.logical_id = tuple(logical_id)
304
305             physical_id = list(dsk.physical_id)
306             physical_id[1] = physical_id[3] = port
307             dsk.physical_id = tuple(physical_id)
308
309         my_config.AddInstance(instance_info,
310                               _CLUSTERMERGE_ECID + str(fake_ec_id))
311         fake_ec_id += 1
312
313   # R0201: Method could be a function
314   def _MergeNodeGroups(self, my_config, other_config):
315     """Adds foreign node groups
316
317     ConfigWriter.AddNodeGroup takes care of making sure there are no conflicts.
318     """
319     # pylint: disable-msg=R0201
320     logging.info("Node group conflict strategy: %s", self.groups)
321
322     my_grps = my_config.GetAllNodeGroupsInfo().values()
323     other_grps = other_config.GetAllNodeGroupsInfo().values()
324
325     # Check for node group naming conflicts:
326     conflicts = []
327     for other_grp in other_grps:
328       for my_grp in my_grps:
329         if other_grp.name == my_grp.name:
330           conflicts.append(other_grp)
331
332     if conflicts:
333       conflict_names = utils.CommaJoin([g.name for g in conflicts])
334       logging.info("Node groups in both local and remote cluster: %s",
335                    conflict_names)
336
337       # User hasn't specified how to handle conflicts
338       if not self.groups:
339         raise errors.CommandError("The following node group(s) are in both"
340                                   " clusters, and no merge strategy has been"
341                                   " supplied (see the --groups option): %s" %
342                                   conflict_names)
343
344       # User wants to rename conflicts
345       elif self.groups == _GROUPS_RENAME:
346         for grp in conflicts:
347           new_name = "%s-%s" % (grp.name, other_config.GetClusterName())
348           logging.info("Renaming remote node group from %s to %s"
349                        " to resolve conflict", grp.name, new_name)
350           grp.name = new_name
351
352       # User wants to merge conflicting groups
353       elif self.groups == 'merge':
354         for other_grp in conflicts:
355           logging.info("Merging local and remote '%s' groups", other_grp.name)
356           for node_name in other_grp.members[:]:
357             node = other_config.GetNodeInfo(node_name)
358             # Access to a protected member of a client class
359             # pylint: disable-msg=W0212
360             other_config._UnlockedRemoveNodeFromGroup(node)
361
362             # Access to a protected member of a client class
363             # pylint: disable-msg=W0212
364             my_grp_uuid = my_config._UnlockedLookupNodeGroup(other_grp.name)
365
366             # Access to a protected member of a client class
367             # pylint: disable-msg=W0212
368             my_config._UnlockedAddNodeToGroup(node, my_grp_uuid)
369             node.group = my_grp_uuid
370           # Remove from list of groups to add
371           other_grps.remove(other_grp)
372
373     for grp in other_grps:
374       #TODO: handle node group conflicts
375       my_config.AddNodeGroup(grp, _CLUSTERMERGE_ECID)
376
377   # R0201: Method could be a function
378   def _StartMasterDaemon(self, no_vote=False): # pylint: disable-msg=R0201
379     """Starts the local master daemon.
380
381     @param no_vote: Should the masterd started without voting? default: False
382     @raise errors.CommandError: If unable to start daemon.
383
384     """
385     env = {}
386     if no_vote:
387       env["EXTRA_MASTERD_ARGS"] = "--no-voting --yes-do-it"
388
389     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
390     if result.failed:
391       raise errors.CommandError("Couldn't start ganeti master."
392                                 " Fail reason: %s; output: %s" %
393                                 (result.fail_reason, result.output))
394
395   def _ReaddMergedNodesAndRedist(self):
396     """Readds all merging nodes and make sure their config is up-to-date.
397
398     @raise errors.CommandError: If anything fails.
399
400     """
401     for data in self.merger_data:
402       for node in data.nodes:
403         result = utils.RunCmd(["gnt-node", "add", "--readd",
404                                "--no-ssh-key-check", "--force-join", node])
405         if result.failed:
406           raise errors.CommandError("Couldn't readd node %s. Fail reason: %s;"
407                                     " output: %s" % (node, result.fail_reason,
408                                                      result.output))
409
410     result = utils.RunCmd(["gnt-cluster", "redist-conf"])
411     if result.failed:
412       raise errors.CommandError("Redistribution failed. Fail reason: %s;"
413                                 " output: %s" % (result.fail_reason,
414                                                 result.output))
415
416   # R0201: Method could be a function
417   def _StartupAllInstances(self): # pylint: disable-msg=R0201
418     """Starts up all instances (locally).
419
420     @raise errors.CommandError: If unable to start clusters
421
422     """
423     result = utils.RunCmd(["gnt-instance", "startup", "--all",
424                            "--force-multiple"])
425     if result.failed:
426       raise errors.CommandError("Unable to start all instances."
427                                 " Fail reason: %s; output: %s" %
428                                 (result.fail_reason, result.output))
429
430   # R0201: Method could be a function
431   def _VerifyCluster(self): # pylint: disable-msg=R0201
432     """Runs gnt-cluster verify to verify the health.
433
434     @raise errors.ProgrammError: If cluster fails on verification
435
436     """
437     result = utils.RunCmd(["gnt-cluster", "verify"])
438     if result.failed:
439       raise errors.CommandError("Verification of cluster failed."
440                                 " Fail reason: %s; output: %s" %
441                                 (result.fail_reason, result.output))
442
443   def Merge(self):
444     """Does the actual merge.
445
446     It runs all the steps in the right order and updates the user about steps
447     taken. Also it keeps track of rollback_steps to undo everything.
448
449     """
450     rbsteps = []
451     try:
452       logging.info("Pre cluster verification")
453       self._VerifyCluster()
454
455       rbsteps.append("Start all instances again on the merging"
456                      " clusters: %(clusters)s")
457       logging.info("Stopping merging instances (takes a while)")
458       self._StopMergingInstances()
459
460       logging.info("Disable watcher")
461       self._DisableWatcher()
462       logging.info("Stop daemons on merging nodes")
463       self._StopDaemons()
464       logging.info("Merging config")
465       self._FetchRemoteConfig()
466
467       logging.info("Stopping master daemon")
468       self._KillMasterDaemon()
469
470       rbsteps.append("Restore %s from another master candidate"
471                      " and restart master daemon" %
472                      constants.CLUSTER_CONF_FILE)
473       self._MergeConfig()
474       self._StartMasterDaemon(no_vote=True)
475
476       # Point of no return, delete rbsteps
477       del rbsteps[:]
478
479       logging.warning("We are at the point of no return. Merge can not easily"
480                       " be undone after this point.")
481       logging.info("Readd nodes")
482       self._ReaddMergedNodesAndRedist()
483
484       logging.info("Merge done, restart master daemon normally")
485       self._KillMasterDaemon()
486       self._StartMasterDaemon()
487
488       logging.info("Starting instances again")
489       self._StartupAllInstances()
490       logging.info("Post cluster verification")
491       self._VerifyCluster()
492     except errors.GenericError, e:
493       logging.exception(e)
494
495       if rbsteps:
496         nodes = Flatten([data.nodes for data in self.merger_data])
497         info = {
498           "clusters": self.clusters,
499           "nodes": nodes,
500           }
501         logging.critical("In order to rollback do the following:")
502         for step in rbsteps:
503           logging.critical("  * %s", step % info)
504       else:
505         logging.critical("Nothing to rollback.")
506
507       # TODO: Keep track of steps done for a flawless resume?
508
509   def Cleanup(self):
510     """Clean up our environment.
511
512     This cleans up remote private keys and configs and after that
513     deletes the temporary directory.
514
515     """
516     shutil.rmtree(self.work_dir)
517
518
519 def SetupLogging(options):
520   """Setting up logging infrastructure.
521
522   @param options: Parsed command line options
523
524   """
525   formatter = logging.Formatter("%(asctime)s: %(levelname)s %(message)s")
526
527   stderr_handler = logging.StreamHandler()
528   stderr_handler.setFormatter(formatter)
529   if options.debug:
530     stderr_handler.setLevel(logging.NOTSET)
531   elif options.verbose:
532     stderr_handler.setLevel(logging.INFO)
533   else:
534     stderr_handler.setLevel(logging.ERROR)
535
536   root_logger = logging.getLogger("")
537   root_logger.setLevel(logging.NOTSET)
538   root_logger.addHandler(stderr_handler)
539
540
541 def main():
542   """Main routine.
543
544   """
545   program = os.path.basename(sys.argv[0])
546
547   parser = optparse.OptionParser(usage=("%%prog [--debug|--verbose]"
548                                         " [--watcher-pause-period SECONDS]"
549                                         " [--groups [%s|%s]]"
550                                         " <cluster> [<cluster...>]" %
551                                         (_GROUPS_MERGE, _GROUPS_RENAME)),
552                                         prog=program)
553   parser.add_option(cli.DEBUG_OPT)
554   parser.add_option(cli.VERBOSE_OPT)
555   parser.add_option(PAUSE_PERIOD_OPT)
556   parser.add_option(GROUPS_OPT)
557
558   (options, args) = parser.parse_args()
559
560   SetupLogging(options)
561
562   if not args:
563     parser.error("No clusters specified")
564
565   cluster_merger = Merger(utils.UniqueSequence(args), options.pause_period,
566                           options.groups)
567   try:
568     try:
569       cluster_merger.Setup()
570       cluster_merger.Merge()
571     except errors.GenericError, e:
572       logging.exception(e)
573       return constants.EXIT_FAILURE
574   finally:
575     cluster_merger.Cleanup()
576
577   return constants.EXIT_SUCCESS
578
579
580 if __name__ == "__main__":
581   sys.exit(main())